| | |
| | | package com.zy.core.trace; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.zy.asrs.domain.vo.StationTaskTraceEventVo; |
| | | import com.zy.asrs.domain.vo.StationTaskTraceSegmentVo; |
| | | import com.zy.asrs.domain.vo.StationTaskTraceVo; |
| | | import com.zy.common.utils.RedisUtil; |
| | | import com.zy.core.model.command.StationCommand; |
| | | import com.zy.core.enums.RedisKeyType; |
| | | import lombok.Data; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.ArrayList; |
| | |
| | | |
| | | private static final int MAX_EVENT_COUNT = 200; |
| | | private static final long TERMINAL_KEEP_MS = 30L * 60L * 1000L; |
| | | private static final String TRACE_CACHE_KEY = RedisKeyType.STATION_TASK_TRACE_REGISTRY.key; |
| | | |
| | | public static final String STATUS_WAITING = "WAITING"; |
| | | public static final String STATUS_RUNNING = "RUNNING"; |
| | |
| | | public static final String STATUS_FINISHED = "FINISHED"; |
| | | public static final String STATUS_REROUTED = "REROUTED"; |
| | | |
| | | @Autowired |
| | | private RedisUtil redisUtil; |
| | | |
| | | private final Map<Integer, TraceTaskState> taskStateMap = new ConcurrentHashMap<>(); |
| | | private volatile boolean loadedFromRedis = false; |
| | | |
| | | public TraceRegistration registerPlan(Integer taskNo, |
| | | String threadImpl, |
| | |
| | | return new TraceRegistration(); |
| | | } |
| | | |
| | | ensureCacheLoaded(); |
| | | cleanupExpired(); |
| | | TraceTaskState taskState = taskStateMap.computeIfAbsent(taskNo, TraceTaskState::new); |
| | | return taskState.registerPlan(threadImpl, startStationId, currentStationId, finalTargetStationId, |
| | | TraceRegistration registration = taskState.registerPlan(threadImpl, startStationId, currentStationId, finalTargetStationId, |
| | | copyIntegerList(localPathStationIds), copySegmentList(localSegmentList)); |
| | | persistState(taskState); |
| | | return registration; |
| | | } |
| | | |
| | | public void markSegmentIssued(Integer taskNo, |
| | |
| | | String eventType, |
| | | String message, |
| | | Map<String, Object> details) { |
| | | ensureCacheLoaded(); |
| | | TraceTaskState state = taskStateMap.get(taskNo); |
| | | if (state == null) { |
| | | return; |
| | | } |
| | | state.markSegmentIssued(traceVersion, command, eventType, message, details); |
| | | persistState(state); |
| | | } |
| | | |
| | | public void updateProgress(Integer taskNo, |
| | |
| | | String eventType, |
| | | String message, |
| | | Map<String, Object> details) { |
| | | ensureCacheLoaded(); |
| | | TraceTaskState state = taskStateMap.get(taskNo); |
| | | if (state == null) { |
| | | return; |
| | | } |
| | | state.updateProgress(traceVersion, currentStationId, eventType, message, details); |
| | | persistState(state); |
| | | } |
| | | |
| | | public void markBlocked(Integer taskNo, |
| | | Integer traceVersion, |
| | | Integer currentStationId, |
| | | Map<String, Object> details) { |
| | | ensureCacheLoaded(); |
| | | TraceTaskState state = taskStateMap.get(taskNo); |
| | | if (state == null) { |
| | | return; |
| | | } |
| | | state.markTerminal(traceVersion, STATUS_BLOCKED, currentStationId, currentStationId, |
| | | "BLOCKED", "输送任务运行堵塞", details); |
| | | persistState(state); |
| | | } |
| | | |
| | | public void markCancelled(Integer taskNo, |
| | | Integer traceVersion, |
| | | Integer currentStationId, |
| | | Map<String, Object> details) { |
| | | ensureCacheLoaded(); |
| | | TraceTaskState state = taskStateMap.get(taskNo); |
| | | if (state == null) { |
| | | return; |
| | | } |
| | | state.markTerminal(traceVersion, STATUS_CANCELLED, currentStationId, null, |
| | | "CANCELLED", "输送任务收到取消信号", details); |
| | | persistState(state); |
| | | } |
| | | |
| | | public void markTimeout(Integer taskNo, |
| | | Integer traceVersion, |
| | | Integer currentStationId, |
| | | Map<String, Object> details) { |
| | | ensureCacheLoaded(); |
| | | TraceTaskState state = taskStateMap.get(taskNo); |
| | | if (state == null) { |
| | | return; |
| | | } |
| | | state.markTerminal(traceVersion, STATUS_TIMEOUT, currentStationId, null, |
| | | "TIMEOUT", "输送任务长时间无法定位当前位置", details); |
| | | persistState(state); |
| | | } |
| | | |
| | | public void markFinished(Integer taskNo, |
| | | Integer traceVersion, |
| | | Integer currentStationId, |
| | | Map<String, Object> details) { |
| | | ensureCacheLoaded(); |
| | | TraceTaskState state = taskStateMap.get(taskNo); |
| | | if (state == null) { |
| | | return; |
| | | } |
| | | state.markTerminal(traceVersion, STATUS_FINISHED, currentStationId, null, |
| | | "FINISHED", "输送任务轨迹完成", details); |
| | | persistState(state); |
| | | } |
| | | |
| | | public List<StationTaskTraceVo> listLatestTraces() { |
| | | ensureCacheLoaded(); |
| | | cleanupExpired(); |
| | | List<StationTaskTraceVo> result = new ArrayList<>(); |
| | | for (TraceTaskState state : taskStateMap.values()) { |
| | |
| | | TraceTaskState state = entry.getValue(); |
| | | if (state != null && state.shouldRemove(now)) { |
| | | taskStateMap.remove(entry.getKey(), state); |
| | | removePersistedState(entry.getKey()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void ensureCacheLoaded() { |
| | | if (loadedFromRedis) { |
| | | return; |
| | | } |
| | | synchronized (this) { |
| | | if (loadedFromRedis) { |
| | | return; |
| | | } |
| | | Map<Object, Object> cacheMap = redisUtil == null ? null : redisUtil.hmget(TRACE_CACHE_KEY); |
| | | if (cacheMap != null && !cacheMap.isEmpty()) { |
| | | long now = System.currentTimeMillis(); |
| | | for (Map.Entry<Object, Object> entry : cacheMap.entrySet()) { |
| | | Integer taskNo = parseTaskNo(entry.getKey()); |
| | | TraceSnapshot snapshot = parseSnapshot(entry.getValue(), taskNo); |
| | | if (snapshot == null || snapshot.getTaskNo() == null) { |
| | | if (taskNo != null) { |
| | | removePersistedState(taskNo); |
| | | } |
| | | continue; |
| | | } |
| | | if (snapshot.isExpired(now)) { |
| | | removePersistedState(snapshot.getTaskNo()); |
| | | continue; |
| | | } |
| | | taskStateMap.put(snapshot.getTaskNo(), TraceTaskState.fromSnapshot(snapshot)); |
| | | } |
| | | } |
| | | loadedFromRedis = true; |
| | | } |
| | | } |
| | | |
| | | private void persistState(TraceTaskState state) { |
| | | if (state == null || state.taskNo == null || redisUtil == null) { |
| | | return; |
| | | } |
| | | redisUtil.hset(TRACE_CACHE_KEY, String.valueOf(state.taskNo), JSON.toJSONString(state.toSnapshot())); |
| | | } |
| | | |
| | | private void removePersistedState(Integer taskNo) { |
| | | if (taskNo == null || redisUtil == null) { |
| | | return; |
| | | } |
| | | redisUtil.hdel(TRACE_CACHE_KEY, String.valueOf(taskNo)); |
| | | } |
| | | |
| | | private Integer parseTaskNo(Object value) { |
| | | if (value == null) { |
| | | return null; |
| | | } |
| | | try { |
| | | return Integer.parseInt(String.valueOf(value)); |
| | | } catch (Exception e) { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | private TraceSnapshot parseSnapshot(Object cacheValue, Integer fallbackTaskNo) { |
| | | if (cacheValue == null) { |
| | | return null; |
| | | } |
| | | try { |
| | | TraceSnapshot snapshot = JSON.parseObject(String.valueOf(cacheValue), TraceSnapshot.class); |
| | | if (snapshot != null && snapshot.getTaskNo() == null) { |
| | | snapshot.setTaskNo(fallbackTaskNo); |
| | | } |
| | | return snapshot; |
| | | } catch (Exception e) { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | |
| | | return result; |
| | | } |
| | | |
| | | private static List<StationTaskTraceEventVo> copyEventList(List<StationTaskTraceEventVo> source) { |
| | | List<StationTaskTraceEventVo> result = new ArrayList<>(); |
| | | if (source == null) { |
| | | return result; |
| | | } |
| | | for (StationTaskTraceEventVo item : source) { |
| | | if (item == null) { |
| | | continue; |
| | | } |
| | | StationTaskTraceEventVo copy = new StationTaskTraceEventVo(); |
| | | copy.setTimestamp(item.getTimestamp()); |
| | | copy.setEventType(item.getEventType()); |
| | | copy.setMessage(item.getMessage()); |
| | | copy.setStatus(item.getStatus()); |
| | | copy.setCurrentStationId(item.getCurrentStationId()); |
| | | copy.setTargetStationId(item.getTargetStationId()); |
| | | copy.setTraceVersion(item.getTraceVersion()); |
| | | copy.setDetails(copyDetails(item.getDetails())); |
| | | result.add(copy); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | private static boolean isTerminalStatus(String status) { |
| | | return STATUS_BLOCKED.equals(status) |
| | | || STATUS_CANCELLED.equals(status) |
| | |
| | | private Integer pathOffset; |
| | | private List<Integer> fullPathStationIds = new ArrayList<>(); |
| | | private boolean rerouted; |
| | | } |
| | | |
| | | @Data |
| | | private static class TraceSnapshot { |
| | | private Integer taskNo; |
| | | private String threadImpl; |
| | | private String status; |
| | | private Integer traceVersion; |
| | | private Integer startStationId; |
| | | private Integer currentStationId; |
| | | private Integer finalTargetStationId; |
| | | private Integer blockedStationId; |
| | | private List<Integer> fullPathStationIds = new ArrayList<>(); |
| | | private List<Integer> issuedStationIds = new ArrayList<>(); |
| | | private List<Integer> passedStationIds = new ArrayList<>(); |
| | | private List<Integer> pendingStationIds = new ArrayList<>(); |
| | | private List<Integer> latestIssuedSegmentPath = new ArrayList<>(); |
| | | private List<StationTaskTraceSegmentVo> segmentList = new ArrayList<>(); |
| | | private Integer issuedSegmentCount; |
| | | private Integer totalSegmentCount; |
| | | private Long updatedAt; |
| | | private Long terminalExpireAt; |
| | | private List<StationTaskTraceEventVo> events = new ArrayList<>(); |
| | | |
| | | private boolean isExpired(long now) { |
| | | return terminalExpireAt != null && terminalExpireAt <= now; |
| | | } |
| | | } |
| | | |
| | | private static class TraceTaskState { |
| | |
| | | vo.setIssuedSegmentCount(issuedSegmentCount); |
| | | vo.setTotalSegmentCount(totalSegmentCount); |
| | | vo.setUpdatedAt(updatedAt); |
| | | vo.setEvents(new ArrayList<>(events)); |
| | | vo.setEvents(copyEventList(events)); |
| | | return vo; |
| | | } |
| | | |
| | | private synchronized TraceSnapshot toSnapshot() { |
| | | TraceSnapshot snapshot = new TraceSnapshot(); |
| | | snapshot.setTaskNo(taskNo); |
| | | snapshot.setThreadImpl(threadImpl); |
| | | snapshot.setStatus(status); |
| | | snapshot.setTraceVersion(traceVersion); |
| | | snapshot.setStartStationId(startStationId); |
| | | snapshot.setCurrentStationId(currentStationId); |
| | | snapshot.setFinalTargetStationId(finalTargetStationId); |
| | | snapshot.setBlockedStationId(blockedStationId); |
| | | snapshot.setFullPathStationIds(copyIntegerList(fullPathStationIds)); |
| | | snapshot.setIssuedStationIds(copyIntegerList(issuedStationIds)); |
| | | snapshot.setPassedStationIds(copyIntegerList(passedStationIds)); |
| | | snapshot.setPendingStationIds(copyIntegerList(pendingStationIds)); |
| | | snapshot.setLatestIssuedSegmentPath(copyIntegerList(latestIssuedSegmentPath)); |
| | | snapshot.setSegmentList(copySegmentList(segmentList)); |
| | | snapshot.setIssuedSegmentCount(issuedSegmentCount); |
| | | snapshot.setTotalSegmentCount(totalSegmentCount); |
| | | snapshot.setUpdatedAt(updatedAt); |
| | | snapshot.setTerminalExpireAt(terminalExpireAt); |
| | | snapshot.setEvents(copyEventList(events)); |
| | | return snapshot; |
| | | } |
| | | |
| | | private static TraceTaskState fromSnapshot(TraceSnapshot snapshot) { |
| | | TraceTaskState state = new TraceTaskState(snapshot.getTaskNo()); |
| | | state.threadImpl = snapshot.getThreadImpl(); |
| | | state.status = snapshot.getStatus() == null ? STATUS_WAITING : snapshot.getStatus(); |
| | | state.traceVersion = snapshot.getTraceVersion() == null ? 0 : snapshot.getTraceVersion(); |
| | | state.startStationId = snapshot.getStartStationId(); |
| | | state.currentStationId = snapshot.getCurrentStationId(); |
| | | state.finalTargetStationId = snapshot.getFinalTargetStationId(); |
| | | state.blockedStationId = snapshot.getBlockedStationId(); |
| | | state.fullPathStationIds = copyIntegerList(snapshot.getFullPathStationIds()); |
| | | state.issuedStationIds = copyIntegerList(snapshot.getIssuedStationIds()); |
| | | state.passedStationIds = copyIntegerList(snapshot.getPassedStationIds()); |
| | | state.pendingStationIds = copyIntegerList(snapshot.getPendingStationIds()); |
| | | state.latestIssuedSegmentPath = copyIntegerList(snapshot.getLatestIssuedSegmentPath()); |
| | | state.segmentList = copySegmentList(snapshot.getSegmentList()); |
| | | state.issuedSegmentCount = snapshot.getIssuedSegmentCount() == null ? 0 : snapshot.getIssuedSegmentCount(); |
| | | state.totalSegmentCount = snapshot.getTotalSegmentCount() == null ? state.segmentList.size() : snapshot.getTotalSegmentCount(); |
| | | state.updatedAt = snapshot.getUpdatedAt() == null ? System.currentTimeMillis() : snapshot.getUpdatedAt(); |
| | | state.terminalExpireAt = snapshot.getTerminalExpireAt(); |
| | | state.events.clear(); |
| | | state.events.addAll(copyEventList(snapshot.getEvents())); |
| | | return state; |
| | | } |
| | | |
| | | private List<Integer> buildFullPathForRegistration(boolean rerouted, |
| | | Integer planCurrentStationId, |
| | | List<Integer> localPathStationIds) { |