package com.zy.core.thread.support; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component public class StationTaskLocationRegistry { private static final long DEFAULT_STALE_THRESHOLD_MS = 2_000L; private static final long LOOKUP_WARN_INTERVAL_MS = 5_000L; private final ConcurrentHashMap loadingTaskLocationMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap lookupWarnAtMap = new ConcurrentHashMap<>(); public void update(Integer taskNo, Integer deviceNo, Integer stationId, boolean loading, boolean runBlock, long updateTime) { if (taskNo == null || taskNo <= 0) { return; } if (!loading || deviceNo == null || stationId == null) { remove(taskNo, null, null); return; } loadingTaskLocationMap.put(taskNo, new TaskLocationSnapshot(taskNo, deviceNo, stationId, true, runBlock, updateTime)); } public void remove(Integer taskNo, Integer deviceNo, Integer stationId) { if (taskNo == null || taskNo <= 0) { return; } loadingTaskLocationMap.computeIfPresent(taskNo, (key, snapshot) -> { if (snapshot == null) { return null; } if (deviceNo != null && !Objects.equals(deviceNo, snapshot.getDeviceNo())) { return snapshot; } if (stationId != null && !Objects.equals(stationId, snapshot.getStationId())) { return snapshot; } return null; }); } public TaskLocationSnapshot findActive(Integer taskNo) { return findActive(taskNo, DEFAULT_STALE_THRESHOLD_MS); } public TaskLocationSnapshot findActive(Integer taskNo, long staleThresholdMs) { if (taskNo == null || taskNo <= 0) { return null; } TaskLocationSnapshot snapshot = loadingTaskLocationMap.get(taskNo); if (snapshot == null) { warnLookup(taskNo, "miss", null); return null; } long ageMs = Math.max(0L, System.currentTimeMillis() - snapshot.getUpdateTime()); if (ageMs > staleThresholdMs) { warnLookup(taskNo, "stale", ageMs); return null; } return snapshot; } public void cleanupByDevice(Integer deviceNo, Iterable activeTaskNoList) { if (deviceNo == null) { return; } ConcurrentHashMap activeMap = new ConcurrentHashMap<>(); if (activeTaskNoList != null) { for (Integer taskNo : activeTaskNoList) { if (taskNo != null && taskNo > 0) { activeMap.put(taskNo, Boolean.TRUE); } } } for (Map.Entry entry : loadingTaskLocationMap.entrySet()) { TaskLocationSnapshot snapshot = entry.getValue(); if (snapshot == null || !Objects.equals(deviceNo, snapshot.getDeviceNo())) { continue; } if (!activeMap.containsKey(entry.getKey())) { loadingTaskLocationMap.remove(entry.getKey(), snapshot); } } } private void warnLookup(Integer taskNo, String reason, Long ageMs) { long now = System.currentTimeMillis(); Long lastWarnAt = lookupWarnAtMap.get(taskNo); if (lastWarnAt != null && now - lastWarnAt < LOOKUP_WARN_INTERVAL_MS) { return; } lookupWarnAtMap.put(taskNo, now); if (ageMs == null) { log.warn("task-location-registry miss, taskNo={}", taskNo); return; } log.warn("task-location-registry stale, taskNo={}, ageMs={}", taskNo, ageMs); } public static class TaskLocationSnapshot { private final Integer taskNo; private final Integer deviceNo; private final Integer stationId; private final boolean loading; private final boolean runBlock; private final long updateTime; public TaskLocationSnapshot(Integer taskNo, Integer deviceNo, Integer stationId, boolean loading, boolean runBlock, long updateTime) { this.taskNo = taskNo; this.deviceNo = deviceNo; this.stationId = stationId; this.loading = loading; this.runBlock = runBlock; this.updateTime = updateTime; } public Integer getTaskNo() { return taskNo; } public Integer getDeviceNo() { return deviceNo; } public Integer getStationId() { return stationId; } public boolean isLoading() { return loading; } public boolean isRunBlock() { return runBlock; } public long getUpdateTime() { return updateTime; } } }