package com.zy.core.trace;
|
|
import com.zy.asrs.domain.vo.StationTaskTraceEventVo;
|
import com.zy.asrs.domain.vo.StationTaskTraceSegmentVo;
|
import com.zy.asrs.domain.vo.StationTaskTraceVo;
|
import com.zy.core.model.command.StationCommand;
|
import lombok.Data;
|
import org.springframework.stereotype.Component;
|
|
import java.util.ArrayList;
|
import java.util.Comparator;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Objects;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
@Component
|
public class StationTaskTraceRegistry {
|
|
private static final int MAX_EVENT_COUNT = 200;
|
private static final long TERMINAL_KEEP_MS = 30L * 60L * 1000L;
|
|
public static final String STATUS_WAITING = "WAITING";
|
public static final String STATUS_RUNNING = "RUNNING";
|
public static final String STATUS_BLOCKED = "BLOCKED";
|
public static final String STATUS_CANCELLED = "CANCELLED";
|
public static final String STATUS_TIMEOUT = "TIMEOUT";
|
public static final String STATUS_FINISHED = "FINISHED";
|
public static final String STATUS_REROUTED = "REROUTED";
|
|
private final Map<Integer, TraceTaskState> taskStateMap = new ConcurrentHashMap<>();
|
|
public TraceRegistration registerPlan(Integer taskNo,
|
String threadImpl,
|
Integer startStationId,
|
Integer currentStationId,
|
Integer finalTargetStationId,
|
List<Integer> localPathStationIds,
|
List<StationTaskTraceSegmentVo> localSegmentList) {
|
if (taskNo == null || taskNo <= 0) {
|
return new TraceRegistration();
|
}
|
|
cleanupExpired();
|
TraceTaskState taskState = taskStateMap.computeIfAbsent(taskNo, TraceTaskState::new);
|
return taskState.registerPlan(threadImpl, startStationId, currentStationId, finalTargetStationId,
|
copyIntegerList(localPathStationIds), copySegmentList(localSegmentList));
|
}
|
|
public void markSegmentIssued(Integer taskNo,
|
Integer traceVersion,
|
StationCommand command,
|
String eventType,
|
String message,
|
Map<String, Object> details) {
|
TraceTaskState state = taskStateMap.get(taskNo);
|
if (state == null) {
|
return;
|
}
|
state.markSegmentIssued(traceVersion, command, eventType, message, details);
|
}
|
|
public void updateProgress(Integer taskNo,
|
Integer traceVersion,
|
Integer currentStationId,
|
String eventType,
|
String message,
|
Map<String, Object> details) {
|
TraceTaskState state = taskStateMap.get(taskNo);
|
if (state == null) {
|
return;
|
}
|
state.updateProgress(traceVersion, currentStationId, eventType, message, details);
|
}
|
|
public void markBlocked(Integer taskNo,
|
Integer traceVersion,
|
Integer currentStationId,
|
Map<String, Object> details) {
|
TraceTaskState state = taskStateMap.get(taskNo);
|
if (state == null) {
|
return;
|
}
|
state.markTerminal(traceVersion, STATUS_BLOCKED, currentStationId, currentStationId,
|
"BLOCKED", "输送任务运行堵塞", details);
|
}
|
|
public void markCancelled(Integer taskNo,
|
Integer traceVersion,
|
Integer currentStationId,
|
Map<String, Object> details) {
|
TraceTaskState state = taskStateMap.get(taskNo);
|
if (state == null) {
|
return;
|
}
|
state.markTerminal(traceVersion, STATUS_CANCELLED, currentStationId, null,
|
"CANCELLED", "输送任务收到取消信号", details);
|
}
|
|
public void markTimeout(Integer taskNo,
|
Integer traceVersion,
|
Integer currentStationId,
|
Map<String, Object> details) {
|
TraceTaskState state = taskStateMap.get(taskNo);
|
if (state == null) {
|
return;
|
}
|
state.markTerminal(traceVersion, STATUS_TIMEOUT, currentStationId, null,
|
"TIMEOUT", "输送任务长时间无法定位当前位置", details);
|
}
|
|
public void markFinished(Integer taskNo,
|
Integer traceVersion,
|
Integer currentStationId,
|
Map<String, Object> details) {
|
TraceTaskState state = taskStateMap.get(taskNo);
|
if (state == null) {
|
return;
|
}
|
state.markTerminal(traceVersion, STATUS_FINISHED, currentStationId, null,
|
"FINISHED", "输送任务轨迹完成", details);
|
}
|
|
public List<StationTaskTraceVo> listLatestTraces() {
|
cleanupExpired();
|
List<StationTaskTraceVo> result = new ArrayList<>();
|
for (TraceTaskState state : taskStateMap.values()) {
|
if (state != null) {
|
result.add(state.toVo());
|
}
|
}
|
result.sort(new Comparator<StationTaskTraceVo>() {
|
@Override
|
public int compare(StationTaskTraceVo a, StationTaskTraceVo b) {
|
long av = a.getUpdatedAt() == null ? 0L : a.getUpdatedAt();
|
long bv = b.getUpdatedAt() == null ? 0L : b.getUpdatedAt();
|
return Long.compare(bv, av);
|
}
|
});
|
return result;
|
}
|
|
private void cleanupExpired() {
|
long now = System.currentTimeMillis();
|
for (Map.Entry<Integer, TraceTaskState> entry : taskStateMap.entrySet()) {
|
TraceTaskState state = entry.getValue();
|
if (state != null && state.shouldRemove(now)) {
|
taskStateMap.remove(entry.getKey(), state);
|
}
|
}
|
}
|
|
private static List<Integer> copyIntegerList(List<Integer> source) {
|
List<Integer> result = new ArrayList<>();
|
if (source == null) {
|
return result;
|
}
|
for (Integer item : source) {
|
if (item != null) {
|
result.add(item);
|
}
|
}
|
return result;
|
}
|
|
private static List<StationTaskTraceSegmentVo> copySegmentList(List<StationTaskTraceSegmentVo> source) {
|
List<StationTaskTraceSegmentVo> result = new ArrayList<>();
|
if (source == null) {
|
return result;
|
}
|
for (StationTaskTraceSegmentVo item : source) {
|
if (item == null) {
|
continue;
|
}
|
StationTaskTraceSegmentVo copy = new StationTaskTraceSegmentVo();
|
copy.setSegmentNo(item.getSegmentNo());
|
copy.setSegmentCount(item.getSegmentCount());
|
copy.setStationId(item.getStationId());
|
copy.setTargetStationId(item.getTargetStationId());
|
copy.setSegmentStartIndex(item.getSegmentStartIndex());
|
copy.setSegmentEndIndex(item.getSegmentEndIndex());
|
copy.setSegmentPath(copyIntegerList(item.getSegmentPath()));
|
copy.setIssued(Boolean.TRUE.equals(item.getIssued()));
|
result.add(copy);
|
}
|
return result;
|
}
|
|
private static Map<String, Object> copyDetails(Map<String, Object> source) {
|
Map<String, Object> result = new LinkedHashMap<>();
|
if (source == null || source.isEmpty()) {
|
return result;
|
}
|
for (Map.Entry<String, Object> entry : source.entrySet()) {
|
Object value = entry.getValue();
|
if (value instanceof List) {
|
result.put(entry.getKey(), new ArrayList<>((List<?>) value));
|
} else if (value instanceof Map) {
|
result.put(entry.getKey(), new LinkedHashMap<>((Map<?, ?>) value));
|
} else {
|
result.put(entry.getKey(), value);
|
}
|
}
|
return result;
|
}
|
|
private static boolean isTerminalStatus(String status) {
|
return STATUS_BLOCKED.equals(status)
|
|| STATUS_CANCELLED.equals(status)
|
|| STATUS_TIMEOUT.equals(status)
|
|| STATUS_FINISHED.equals(status);
|
}
|
|
@Data
|
public static class TraceRegistration {
|
private Integer traceVersion;
|
private Integer pathOffset;
|
private List<Integer> fullPathStationIds = new ArrayList<>();
|
private boolean rerouted;
|
}
|
|
private static class TraceTaskState {
|
|
private final Integer taskNo;
|
private String threadImpl;
|
private String status = STATUS_WAITING;
|
private Integer traceVersion = 0;
|
private Integer startStationId;
|
private Integer currentStationId;
|
private Integer finalTargetStationId;
|
private Integer blockedStationId;
|
private List<Integer> fullPathStationIds = new ArrayList<>();
|
private List<Integer> issuedStationIds = new ArrayList<>();
|
private List<Integer> passedStationIds = new ArrayList<>();
|
private List<Integer> pendingStationIds = new ArrayList<>();
|
private List<Integer> latestIssuedSegmentPath = new ArrayList<>();
|
private List<StationTaskTraceSegmentVo> segmentList = new ArrayList<>();
|
private Integer issuedSegmentCount = 0;
|
private Integer totalSegmentCount = 0;
|
private final List<StationTaskTraceEventVo> events = new ArrayList<>();
|
private Long updatedAt = System.currentTimeMillis();
|
private Long terminalExpireAt;
|
|
private TraceTaskState(Integer taskNo) {
|
this.taskNo = taskNo;
|
}
|
|
private synchronized TraceRegistration registerPlan(String threadImpl,
|
Integer startStationId,
|
Integer currentStationId,
|
Integer finalTargetStationId,
|
List<Integer> localPathStationIds,
|
List<StationTaskTraceSegmentVo> localSegmentList) {
|
TraceRegistration registration = new TraceRegistration();
|
boolean rerouted = !isTerminalStatus(this.status) && this.traceVersion != null && this.traceVersion > 0;
|
int nextTraceVersion = rerouted ? this.traceVersion + 1 : 1;
|
int pathOffset = rerouted ? this.passedStationIds.size() : 0;
|
Integer planCurrentStationId = currentStationId != null ? currentStationId : startStationId;
|
List<Integer> nextFullPath = buildFullPathForRegistration(rerouted, planCurrentStationId, localPathStationIds);
|
List<StationTaskTraceSegmentVo> nextSegmentList = shiftSegments(copySegmentList(localSegmentList), pathOffset);
|
|
this.threadImpl = threadImpl;
|
this.traceVersion = nextTraceVersion;
|
this.startStationId = rerouted && this.startStationId != null ? this.startStationId : startStationId;
|
this.currentStationId = planCurrentStationId;
|
this.finalTargetStationId = finalTargetStationId;
|
this.blockedStationId = null;
|
this.fullPathStationIds = nextFullPath;
|
this.segmentList = nextSegmentList;
|
this.issuedSegmentCount = 0;
|
this.totalSegmentCount = nextSegmentList.size();
|
this.issuedStationIds = new ArrayList<>();
|
this.latestIssuedSegmentPath = new ArrayList<>();
|
this.status = rerouted ? STATUS_REROUTED : STATUS_WAITING;
|
this.terminalExpireAt = null;
|
rebuildProgress(planCurrentStationId);
|
this.updatedAt = System.currentTimeMillis();
|
|
Map<String, Object> details = new LinkedHashMap<>();
|
details.put("traceVersion", nextTraceVersion);
|
details.put("fullPathStationIds", copyIntegerList(this.fullPathStationIds));
|
details.put("segmentCount", this.totalSegmentCount);
|
details.put("pathOffset", pathOffset);
|
details.put("currentStationId", this.currentStationId);
|
appendEvent(rerouted ? "REROUTED" : "PLAN_READY",
|
rerouted ? "输送任务路径已重算并续接轨迹" : "输送任务分段计划已建立",
|
details);
|
|
registration.setTraceVersion(nextTraceVersion);
|
registration.setPathOffset(pathOffset);
|
registration.setFullPathStationIds(copyIntegerList(this.fullPathStationIds));
|
registration.setRerouted(rerouted);
|
return registration;
|
}
|
|
private synchronized void markSegmentIssued(Integer traceVersion,
|
StationCommand command,
|
String eventType,
|
String message,
|
Map<String, Object> details) {
|
if (!acceptTraceVersion(traceVersion)) {
|
return;
|
}
|
this.status = STATUS_RUNNING;
|
this.blockedStationId = null;
|
int currentIssued = this.issuedSegmentCount == null ? 0 : this.issuedSegmentCount;
|
int nextIssued = command == null || command.getSegmentNo() == null ? currentIssued : command.getSegmentNo();
|
this.issuedSegmentCount = Math.max(currentIssued, nextIssued);
|
this.latestIssuedSegmentPath = copyIntegerList(command == null ? null : command.getNavigatePath());
|
int segmentEndIndex = command == null || command.getSegmentEndIndex() == null ? -1 : command.getSegmentEndIndex();
|
if (segmentEndIndex >= 0 && !this.fullPathStationIds.isEmpty()) {
|
int end = Math.min(segmentEndIndex + 1, this.fullPathStationIds.size());
|
this.issuedStationIds = copyIntegerList(this.fullPathStationIds.subList(0, end));
|
}
|
this.updatedAt = System.currentTimeMillis();
|
|
Map<String, Object> nextDetails = copyDetails(details);
|
if (command != null) {
|
nextDetails.put("segmentNo", command.getSegmentNo());
|
nextDetails.put("segmentCount", command.getSegmentCount());
|
nextDetails.put("commandStationId", command.getStationId());
|
nextDetails.put("commandTargetStationId", command.getTargetStaNo());
|
nextDetails.put("segmentPath", copyIntegerList(command.getNavigatePath()));
|
nextDetails.put("issuedSegmentCount", this.issuedSegmentCount);
|
nextDetails.put("totalSegmentCount", this.totalSegmentCount);
|
}
|
appendEvent(eventType, message, nextDetails);
|
}
|
|
private synchronized void updateProgress(Integer traceVersion,
|
Integer currentStationId,
|
String eventType,
|
String message,
|
Map<String, Object> details) {
|
if (!acceptTraceVersion(traceVersion)) {
|
return;
|
}
|
boolean changed = !Objects.equals(this.currentStationId, currentStationId);
|
rebuildProgress(currentStationId);
|
if (!isTerminalStatus(this.status)) {
|
this.status = STATUS_RUNNING;
|
this.blockedStationId = null;
|
}
|
this.updatedAt = System.currentTimeMillis();
|
if (changed && eventType != null) {
|
Map<String, Object> nextDetails = copyDetails(details);
|
nextDetails.put("currentStationId", currentStationId);
|
nextDetails.put("passedStationIds", copyIntegerList(this.passedStationIds));
|
nextDetails.put("pendingStationIds", copyIntegerList(this.pendingStationIds));
|
appendEvent(eventType, message, nextDetails);
|
}
|
}
|
|
private synchronized void markTerminal(Integer traceVersion,
|
String terminalStatus,
|
Integer currentStationId,
|
Integer blockedStationId,
|
String eventType,
|
String message,
|
Map<String, Object> details) {
|
if (!acceptTraceVersion(traceVersion)) {
|
return;
|
}
|
if (currentStationId != null) {
|
rebuildProgress(currentStationId);
|
}
|
this.status = terminalStatus;
|
this.blockedStationId = blockedStationId;
|
this.updatedAt = System.currentTimeMillis();
|
this.terminalExpireAt = this.updatedAt + TERMINAL_KEEP_MS;
|
|
Map<String, Object> nextDetails = copyDetails(details);
|
nextDetails.put("currentStationId", this.currentStationId);
|
nextDetails.put("blockedStationId", this.blockedStationId);
|
nextDetails.put("passedStationIds", copyIntegerList(this.passedStationIds));
|
nextDetails.put("pendingStationIds", copyIntegerList(this.pendingStationIds));
|
appendEvent(eventType, message, nextDetails);
|
}
|
|
private synchronized boolean shouldRemove(long now) {
|
return terminalExpireAt != null && terminalExpireAt <= now;
|
}
|
|
private synchronized StationTaskTraceVo toVo() {
|
StationTaskTraceVo vo = new StationTaskTraceVo();
|
vo.setTaskNo(taskNo);
|
vo.setThreadImpl(threadImpl);
|
vo.setStatus(status);
|
vo.setTraceVersion(traceVersion);
|
vo.setStartStationId(startStationId);
|
vo.setCurrentStationId(currentStationId);
|
vo.setFinalTargetStationId(finalTargetStationId);
|
vo.setBlockedStationId(blockedStationId);
|
vo.setFullPathStationIds(copyIntegerList(fullPathStationIds));
|
vo.setIssuedStationIds(copyIntegerList(issuedStationIds));
|
vo.setPassedStationIds(copyIntegerList(passedStationIds));
|
vo.setPendingStationIds(copyIntegerList(pendingStationIds));
|
vo.setLatestIssuedSegmentPath(copyIntegerList(latestIssuedSegmentPath));
|
vo.setSegmentList(copySegmentListWithIssued(segmentList, issuedSegmentCount));
|
vo.setIssuedSegmentCount(issuedSegmentCount);
|
vo.setTotalSegmentCount(totalSegmentCount);
|
vo.setUpdatedAt(updatedAt);
|
vo.setEvents(new ArrayList<>(events));
|
return vo;
|
}
|
|
private List<Integer> buildFullPathForRegistration(boolean rerouted,
|
Integer planCurrentStationId,
|
List<Integer> localPathStationIds) {
|
List<Integer> localPath = copyIntegerList(localPathStationIds);
|
if (!rerouted) {
|
if (localPath.isEmpty() && planCurrentStationId != null) {
|
localPath.add(planCurrentStationId);
|
}
|
return localPath;
|
}
|
|
List<Integer> result = new ArrayList<>(copyIntegerList(this.passedStationIds));
|
if (planCurrentStationId != null) {
|
result.add(planCurrentStationId);
|
}
|
if (!localPath.isEmpty()) {
|
int startIdx = 0;
|
if (planCurrentStationId != null && Objects.equals(localPath.get(0), planCurrentStationId)) {
|
startIdx = 1;
|
}
|
for (int i = startIdx; i < localPath.size(); i++) {
|
Integer stationId = localPath.get(i);
|
if (stationId != null) {
|
result.add(stationId);
|
}
|
}
|
}
|
return result;
|
}
|
|
private void rebuildProgress(Integer nextCurrentStationId) {
|
this.currentStationId = nextCurrentStationId;
|
List<Integer> fullPath = copyIntegerList(this.fullPathStationIds);
|
this.passedStationIds = new ArrayList<>();
|
this.pendingStationIds = copyIntegerList(fullPath);
|
int currentIndex = nextCurrentStationId == null ? -1 : fullPath.indexOf(nextCurrentStationId);
|
if (currentIndex < 0) {
|
return;
|
}
|
this.passedStationIds = copyIntegerList(fullPath.subList(0, currentIndex));
|
this.pendingStationIds = copyIntegerList(fullPath.subList(currentIndex + 1, fullPath.size()));
|
}
|
|
private boolean acceptTraceVersion(Integer incomingTraceVersion) {
|
return incomingTraceVersion != null
|
&& this.traceVersion != null
|
&& incomingTraceVersion.intValue() == this.traceVersion.intValue();
|
}
|
|
private void appendEvent(String eventType, String message, Map<String, Object> details) {
|
if (eventType == null) {
|
return;
|
}
|
StationTaskTraceEventVo event = new StationTaskTraceEventVo();
|
event.setTimestamp(System.currentTimeMillis());
|
event.setEventType(eventType);
|
event.setMessage(message);
|
event.setStatus(this.status);
|
event.setCurrentStationId(this.currentStationId);
|
event.setTargetStationId(this.finalTargetStationId);
|
event.setTraceVersion(this.traceVersion);
|
event.setDetails(copyDetails(details));
|
this.events.add(event);
|
if (this.events.size() > MAX_EVENT_COUNT) {
|
this.events.remove(0);
|
}
|
}
|
|
private List<StationTaskTraceSegmentVo> shiftSegments(List<StationTaskTraceSegmentVo> source, int pathOffset) {
|
List<StationTaskTraceSegmentVo> result = new ArrayList<>();
|
for (StationTaskTraceSegmentVo item : source) {
|
if (item == null) {
|
continue;
|
}
|
StationTaskTraceSegmentVo copy = new StationTaskTraceSegmentVo();
|
copy.setSegmentNo(item.getSegmentNo());
|
copy.setSegmentCount(item.getSegmentCount());
|
copy.setStationId(item.getStationId());
|
copy.setTargetStationId(item.getTargetStationId());
|
copy.setSegmentStartIndex(item.getSegmentStartIndex() == null ? null : item.getSegmentStartIndex() + pathOffset);
|
copy.setSegmentEndIndex(item.getSegmentEndIndex() == null ? null : item.getSegmentEndIndex() + pathOffset);
|
copy.setSegmentPath(copyIntegerList(item.getSegmentPath()));
|
copy.setIssued(Boolean.FALSE);
|
result.add(copy);
|
}
|
return result;
|
}
|
|
private List<StationTaskTraceSegmentVo> copySegmentListWithIssued(List<StationTaskTraceSegmentVo> source, Integer issuedSegmentCount) {
|
List<StationTaskTraceSegmentVo> result = new ArrayList<>();
|
int issuedCount = issuedSegmentCount == null ? 0 : issuedSegmentCount;
|
for (StationTaskTraceSegmentVo item : source) {
|
if (item == null) {
|
continue;
|
}
|
StationTaskTraceSegmentVo copy = new StationTaskTraceSegmentVo();
|
copy.setSegmentNo(item.getSegmentNo());
|
copy.setSegmentCount(item.getSegmentCount());
|
copy.setStationId(item.getStationId());
|
copy.setTargetStationId(item.getTargetStationId());
|
copy.setSegmentStartIndex(item.getSegmentStartIndex());
|
copy.setSegmentEndIndex(item.getSegmentEndIndex());
|
copy.setSegmentPath(copyIntegerList(item.getSegmentPath()));
|
copy.setIssued(item.getSegmentNo() != null && item.getSegmentNo() <= issuedCount);
|
result.add(copy);
|
}
|
return result;
|
}
|
}
|
}
|