package com.zy.core.trace; import com.zy.asrs.domain.vo.StationTaskTraceEventVo; import com.zy.asrs.domain.vo.StationTaskTraceSegmentVo; import com.zy.asrs.domain.vo.StationTaskTraceVo; import com.zy.core.model.command.StationCommand; import lombok.Data; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @Component public class StationTaskTraceRegistry { private static final int MAX_EVENT_COUNT = 200; private static final long TERMINAL_KEEP_MS = 30L * 60L * 1000L; public static final String STATUS_WAITING = "WAITING"; public static final String STATUS_RUNNING = "RUNNING"; public static final String STATUS_BLOCKED = "BLOCKED"; public static final String STATUS_CANCELLED = "CANCELLED"; public static final String STATUS_TIMEOUT = "TIMEOUT"; public static final String STATUS_FINISHED = "FINISHED"; public static final String STATUS_REROUTED = "REROUTED"; private final Map taskStateMap = new ConcurrentHashMap<>(); public TraceRegistration registerPlan(Integer taskNo, String threadImpl, Integer startStationId, Integer currentStationId, Integer finalTargetStationId, List localPathStationIds, List localSegmentList) { if (taskNo == null || taskNo <= 0) { return new TraceRegistration(); } cleanupExpired(); TraceTaskState taskState = taskStateMap.computeIfAbsent(taskNo, TraceTaskState::new); return taskState.registerPlan(threadImpl, startStationId, currentStationId, finalTargetStationId, copyIntegerList(localPathStationIds), copySegmentList(localSegmentList)); } public void markSegmentIssued(Integer taskNo, Integer traceVersion, StationCommand command, String eventType, String message, Map details) { TraceTaskState state = taskStateMap.get(taskNo); if (state == null) { return; } state.markSegmentIssued(traceVersion, command, eventType, message, details); } public void updateProgress(Integer taskNo, Integer traceVersion, Integer currentStationId, String eventType, String message, Map details) { TraceTaskState state = taskStateMap.get(taskNo); if (state == null) { return; } state.updateProgress(traceVersion, currentStationId, eventType, message, details); } public void markBlocked(Integer taskNo, Integer traceVersion, Integer currentStationId, Map details) { TraceTaskState state = taskStateMap.get(taskNo); if (state == null) { return; } state.markTerminal(traceVersion, STATUS_BLOCKED, currentStationId, currentStationId, "BLOCKED", "输送任务运行堵塞", details); } public void markCancelled(Integer taskNo, Integer traceVersion, Integer currentStationId, Map details) { TraceTaskState state = taskStateMap.get(taskNo); if (state == null) { return; } state.markTerminal(traceVersion, STATUS_CANCELLED, currentStationId, null, "CANCELLED", "输送任务收到取消信号", details); } public void markTimeout(Integer taskNo, Integer traceVersion, Integer currentStationId, Map details) { TraceTaskState state = taskStateMap.get(taskNo); if (state == null) { return; } state.markTerminal(traceVersion, STATUS_TIMEOUT, currentStationId, null, "TIMEOUT", "输送任务长时间无法定位当前位置", details); } public void markFinished(Integer taskNo, Integer traceVersion, Integer currentStationId, Map details) { TraceTaskState state = taskStateMap.get(taskNo); if (state == null) { return; } state.markTerminal(traceVersion, STATUS_FINISHED, currentStationId, null, "FINISHED", "输送任务轨迹完成", details); } public List listLatestTraces() { cleanupExpired(); List result = new ArrayList<>(); for (TraceTaskState state : taskStateMap.values()) { if (state != null) { result.add(state.toVo()); } } result.sort(new Comparator() { @Override public int compare(StationTaskTraceVo a, StationTaskTraceVo b) { long av = a.getUpdatedAt() == null ? 0L : a.getUpdatedAt(); long bv = b.getUpdatedAt() == null ? 0L : b.getUpdatedAt(); return Long.compare(bv, av); } }); return result; } private void cleanupExpired() { long now = System.currentTimeMillis(); for (Map.Entry entry : taskStateMap.entrySet()) { TraceTaskState state = entry.getValue(); if (state != null && state.shouldRemove(now)) { taskStateMap.remove(entry.getKey(), state); } } } private static List copyIntegerList(List source) { List result = new ArrayList<>(); if (source == null) { return result; } for (Integer item : source) { if (item != null) { result.add(item); } } return result; } private static List copySegmentList(List source) { List result = new ArrayList<>(); if (source == null) { return result; } for (StationTaskTraceSegmentVo item : source) { if (item == null) { continue; } StationTaskTraceSegmentVo copy = new StationTaskTraceSegmentVo(); copy.setSegmentNo(item.getSegmentNo()); copy.setSegmentCount(item.getSegmentCount()); copy.setStationId(item.getStationId()); copy.setTargetStationId(item.getTargetStationId()); copy.setSegmentStartIndex(item.getSegmentStartIndex()); copy.setSegmentEndIndex(item.getSegmentEndIndex()); copy.setSegmentPath(copyIntegerList(item.getSegmentPath())); copy.setIssued(Boolean.TRUE.equals(item.getIssued())); result.add(copy); } return result; } private static Map copyDetails(Map source) { Map result = new LinkedHashMap<>(); if (source == null || source.isEmpty()) { return result; } for (Map.Entry entry : source.entrySet()) { Object value = entry.getValue(); if (value instanceof List) { result.put(entry.getKey(), new ArrayList<>((List) value)); } else if (value instanceof Map) { result.put(entry.getKey(), new LinkedHashMap<>((Map) value)); } else { result.put(entry.getKey(), value); } } return result; } private static boolean isTerminalStatus(String status) { return STATUS_BLOCKED.equals(status) || STATUS_CANCELLED.equals(status) || STATUS_TIMEOUT.equals(status) || STATUS_FINISHED.equals(status); } @Data public static class TraceRegistration { private Integer traceVersion; private Integer pathOffset; private List fullPathStationIds = new ArrayList<>(); private boolean rerouted; } private static class TraceTaskState { private final Integer taskNo; private String threadImpl; private String status = STATUS_WAITING; private Integer traceVersion = 0; private Integer startStationId; private Integer currentStationId; private Integer finalTargetStationId; private Integer blockedStationId; private List fullPathStationIds = new ArrayList<>(); private List issuedStationIds = new ArrayList<>(); private List passedStationIds = new ArrayList<>(); private List pendingStationIds = new ArrayList<>(); private List latestIssuedSegmentPath = new ArrayList<>(); private List segmentList = new ArrayList<>(); private Integer issuedSegmentCount = 0; private Integer totalSegmentCount = 0; private final List events = new ArrayList<>(); private Long updatedAt = System.currentTimeMillis(); private Long terminalExpireAt; private TraceTaskState(Integer taskNo) { this.taskNo = taskNo; } private synchronized TraceRegistration registerPlan(String threadImpl, Integer startStationId, Integer currentStationId, Integer finalTargetStationId, List localPathStationIds, List localSegmentList) { TraceRegistration registration = new TraceRegistration(); boolean rerouted = !isTerminalStatus(this.status) && this.traceVersion != null && this.traceVersion > 0; int nextTraceVersion = rerouted ? this.traceVersion + 1 : 1; int pathOffset = rerouted ? this.passedStationIds.size() : 0; Integer planCurrentStationId = currentStationId != null ? currentStationId : startStationId; List nextFullPath = buildFullPathForRegistration(rerouted, planCurrentStationId, localPathStationIds); List nextSegmentList = shiftSegments(copySegmentList(localSegmentList), pathOffset); this.threadImpl = threadImpl; this.traceVersion = nextTraceVersion; this.startStationId = rerouted && this.startStationId != null ? this.startStationId : startStationId; this.currentStationId = planCurrentStationId; this.finalTargetStationId = finalTargetStationId; this.blockedStationId = null; this.fullPathStationIds = nextFullPath; this.segmentList = nextSegmentList; this.issuedSegmentCount = 0; this.totalSegmentCount = nextSegmentList.size(); this.issuedStationIds = new ArrayList<>(); this.latestIssuedSegmentPath = new ArrayList<>(); this.status = rerouted ? STATUS_REROUTED : STATUS_WAITING; this.terminalExpireAt = null; rebuildProgress(planCurrentStationId); this.updatedAt = System.currentTimeMillis(); Map details = new LinkedHashMap<>(); details.put("traceVersion", nextTraceVersion); details.put("fullPathStationIds", copyIntegerList(this.fullPathStationIds)); details.put("segmentCount", this.totalSegmentCount); details.put("pathOffset", pathOffset); details.put("currentStationId", this.currentStationId); appendEvent(rerouted ? "REROUTED" : "PLAN_READY", rerouted ? "输送任务路径已重算并续接轨迹" : "输送任务分段计划已建立", details); registration.setTraceVersion(nextTraceVersion); registration.setPathOffset(pathOffset); registration.setFullPathStationIds(copyIntegerList(this.fullPathStationIds)); registration.setRerouted(rerouted); return registration; } private synchronized void markSegmentIssued(Integer traceVersion, StationCommand command, String eventType, String message, Map details) { if (!acceptTraceVersion(traceVersion)) { return; } this.status = STATUS_RUNNING; this.blockedStationId = null; int currentIssued = this.issuedSegmentCount == null ? 0 : this.issuedSegmentCount; int nextIssued = command == null || command.getSegmentNo() == null ? currentIssued : command.getSegmentNo(); this.issuedSegmentCount = Math.max(currentIssued, nextIssued); this.latestIssuedSegmentPath = copyIntegerList(command == null ? null : command.getNavigatePath()); int segmentEndIndex = command == null || command.getSegmentEndIndex() == null ? -1 : command.getSegmentEndIndex(); if (segmentEndIndex >= 0 && !this.fullPathStationIds.isEmpty()) { int end = Math.min(segmentEndIndex + 1, this.fullPathStationIds.size()); this.issuedStationIds = copyIntegerList(this.fullPathStationIds.subList(0, end)); } this.updatedAt = System.currentTimeMillis(); Map nextDetails = copyDetails(details); if (command != null) { nextDetails.put("segmentNo", command.getSegmentNo()); nextDetails.put("segmentCount", command.getSegmentCount()); nextDetails.put("commandStationId", command.getStationId()); nextDetails.put("commandTargetStationId", command.getTargetStaNo()); nextDetails.put("segmentPath", copyIntegerList(command.getNavigatePath())); nextDetails.put("issuedSegmentCount", this.issuedSegmentCount); nextDetails.put("totalSegmentCount", this.totalSegmentCount); } appendEvent(eventType, message, nextDetails); } private synchronized void updateProgress(Integer traceVersion, Integer currentStationId, String eventType, String message, Map details) { if (!acceptTraceVersion(traceVersion)) { return; } boolean changed = !Objects.equals(this.currentStationId, currentStationId); rebuildProgress(currentStationId); if (!isTerminalStatus(this.status)) { this.status = STATUS_RUNNING; this.blockedStationId = null; } this.updatedAt = System.currentTimeMillis(); if (changed && eventType != null) { Map nextDetails = copyDetails(details); nextDetails.put("currentStationId", currentStationId); nextDetails.put("passedStationIds", copyIntegerList(this.passedStationIds)); nextDetails.put("pendingStationIds", copyIntegerList(this.pendingStationIds)); appendEvent(eventType, message, nextDetails); } } private synchronized void markTerminal(Integer traceVersion, String terminalStatus, Integer currentStationId, Integer blockedStationId, String eventType, String message, Map details) { if (!acceptTraceVersion(traceVersion)) { return; } if (currentStationId != null) { rebuildProgress(currentStationId); } this.status = terminalStatus; this.blockedStationId = blockedStationId; this.updatedAt = System.currentTimeMillis(); this.terminalExpireAt = this.updatedAt + TERMINAL_KEEP_MS; Map nextDetails = copyDetails(details); nextDetails.put("currentStationId", this.currentStationId); nextDetails.put("blockedStationId", this.blockedStationId); nextDetails.put("passedStationIds", copyIntegerList(this.passedStationIds)); nextDetails.put("pendingStationIds", copyIntegerList(this.pendingStationIds)); appendEvent(eventType, message, nextDetails); } private synchronized boolean shouldRemove(long now) { return terminalExpireAt != null && terminalExpireAt <= now; } private synchronized StationTaskTraceVo toVo() { StationTaskTraceVo vo = new StationTaskTraceVo(); vo.setTaskNo(taskNo); vo.setThreadImpl(threadImpl); vo.setStatus(status); vo.setTraceVersion(traceVersion); vo.setStartStationId(startStationId); vo.setCurrentStationId(currentStationId); vo.setFinalTargetStationId(finalTargetStationId); vo.setBlockedStationId(blockedStationId); vo.setFullPathStationIds(copyIntegerList(fullPathStationIds)); vo.setIssuedStationIds(copyIntegerList(issuedStationIds)); vo.setPassedStationIds(copyIntegerList(passedStationIds)); vo.setPendingStationIds(copyIntegerList(pendingStationIds)); vo.setLatestIssuedSegmentPath(copyIntegerList(latestIssuedSegmentPath)); vo.setSegmentList(copySegmentListWithIssued(segmentList, issuedSegmentCount)); vo.setIssuedSegmentCount(issuedSegmentCount); vo.setTotalSegmentCount(totalSegmentCount); vo.setUpdatedAt(updatedAt); vo.setEvents(new ArrayList<>(events)); return vo; } private List buildFullPathForRegistration(boolean rerouted, Integer planCurrentStationId, List localPathStationIds) { List localPath = copyIntegerList(localPathStationIds); if (!rerouted) { if (localPath.isEmpty() && planCurrentStationId != null) { localPath.add(planCurrentStationId); } return localPath; } List result = new ArrayList<>(copyIntegerList(this.passedStationIds)); if (planCurrentStationId != null) { result.add(planCurrentStationId); } if (!localPath.isEmpty()) { int startIdx = 0; if (planCurrentStationId != null && Objects.equals(localPath.get(0), planCurrentStationId)) { startIdx = 1; } for (int i = startIdx; i < localPath.size(); i++) { Integer stationId = localPath.get(i); if (stationId != null) { result.add(stationId); } } } return result; } private void rebuildProgress(Integer nextCurrentStationId) { this.currentStationId = nextCurrentStationId; List fullPath = copyIntegerList(this.fullPathStationIds); this.passedStationIds = new ArrayList<>(); this.pendingStationIds = copyIntegerList(fullPath); int currentIndex = nextCurrentStationId == null ? -1 : fullPath.indexOf(nextCurrentStationId); if (currentIndex < 0) { return; } this.passedStationIds = copyIntegerList(fullPath.subList(0, currentIndex)); this.pendingStationIds = copyIntegerList(fullPath.subList(currentIndex + 1, fullPath.size())); } private boolean acceptTraceVersion(Integer incomingTraceVersion) { return incomingTraceVersion != null && this.traceVersion != null && incomingTraceVersion.intValue() == this.traceVersion.intValue(); } private void appendEvent(String eventType, String message, Map details) { if (eventType == null) { return; } StationTaskTraceEventVo event = new StationTaskTraceEventVo(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(eventType); event.setMessage(message); event.setStatus(this.status); event.setCurrentStationId(this.currentStationId); event.setTargetStationId(this.finalTargetStationId); event.setTraceVersion(this.traceVersion); event.setDetails(copyDetails(details)); this.events.add(event); if (this.events.size() > MAX_EVENT_COUNT) { this.events.remove(0); } } private List shiftSegments(List source, int pathOffset) { List result = new ArrayList<>(); for (StationTaskTraceSegmentVo item : source) { if (item == null) { continue; } StationTaskTraceSegmentVo copy = new StationTaskTraceSegmentVo(); copy.setSegmentNo(item.getSegmentNo()); copy.setSegmentCount(item.getSegmentCount()); copy.setStationId(item.getStationId()); copy.setTargetStationId(item.getTargetStationId()); copy.setSegmentStartIndex(item.getSegmentStartIndex() == null ? null : item.getSegmentStartIndex() + pathOffset); copy.setSegmentEndIndex(item.getSegmentEndIndex() == null ? null : item.getSegmentEndIndex() + pathOffset); copy.setSegmentPath(copyIntegerList(item.getSegmentPath())); copy.setIssued(Boolean.FALSE); result.add(copy); } return result; } private List copySegmentListWithIssued(List source, Integer issuedSegmentCount) { List result = new ArrayList<>(); int issuedCount = issuedSegmentCount == null ? 0 : issuedSegmentCount; for (StationTaskTraceSegmentVo item : source) { if (item == null) { continue; } StationTaskTraceSegmentVo copy = new StationTaskTraceSegmentVo(); copy.setSegmentNo(item.getSegmentNo()); copy.setSegmentCount(item.getSegmentCount()); copy.setStationId(item.getStationId()); copy.setTargetStationId(item.getTargetStationId()); copy.setSegmentStartIndex(item.getSegmentStartIndex()); copy.setSegmentEndIndex(item.getSegmentEndIndex()); copy.setSegmentPath(copyIntegerList(item.getSegmentPath())); copy.setIssued(item.getSegmentNo() != null && item.getSegmentNo() <= issuedCount); result.add(copy); } return result; } } }