From 058d7bbb714634e42bff1dd71fdfca3a378421d3 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 31 三月 2026 20:50:50 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java |  151 +++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 119 insertions(+), 32 deletions(-)

diff --git a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
index 3da25e3..95209a6 100644
--- a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
+++ b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -35,6 +35,12 @@
     private final Function<StationCommand, CommandResponse> commandSender;
     private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
 
+    private enum SegmentSendResult {
+        DISPATCHED,
+        CANCELLED,
+        RETRY
+    }
+
     public StationSegmentExecutor(DeviceConfig deviceConfig,
                                   RedisUtil redisUtil,
                                   Function<StationCommand, CommandResponse> commandSender) {
@@ -83,10 +89,11 @@
         long lastSeenAt = System.currentTimeMillis();
         int segCursor = 0;
         Integer lastCurrentStationId = null;
+        int lastMatchedPathIndex = -1;
         boolean firstRun = true;
         while (true) {
             try {
-                if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
+                if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) {
                     if (traceRegistry != null) {
                         traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
                                 buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
@@ -138,12 +145,17 @@
                     break;
                 }
 
-                int currentIndex = effectiveFullPath.indexOf(currentStation.getStationId());
+                int currentIndex = resolveCurrentPathIndex(
+                        effectiveFullPath,
+                        currentStation.getStationId(),
+                        lastMatchedPathIndex
+                );
                 if (currentIndex < 0) {
                     Thread.sleep(500L);
                     firstRun = false;
                     continue;
                 }
+                lastMatchedPathIndex = currentIndex;
 
                 int remaining = effectiveFullPath.size() - currentIndex - 1;
                 if (remaining <= 0) {
@@ -186,44 +198,115 @@
         }
     }
 
+    private int resolveCurrentPathIndex(List<Integer> fullPathStationIds,
+                                        Integer currentStationId,
+                                        int lastMatchedPathIndex) {
+        if (fullPathStationIds == null || fullPathStationIds.isEmpty() || currentStationId == null) {
+            return -1;
+        }
+        if (lastMatchedPathIndex >= 0
+                && lastMatchedPathIndex < fullPathStationIds.size()
+                && equalsInteger(currentStationId, fullPathStationIds.get(lastMatchedPathIndex))) {
+            return lastMatchedPathIndex;
+        }
+
+        int nextIndex = findNextStationIndex(fullPathStationIds, currentStationId, Math.max(lastMatchedPathIndex + 1, 0));
+        if (nextIndex >= 0) {
+            return nextIndex;
+        }
+        return findNextStationIndex(fullPathStationIds, currentStationId, 0);
+    }
+
+    private int findNextStationIndex(List<Integer> path, Integer stationId, int fromIndex) {
+        if (path == null || path.isEmpty() || stationId == null) {
+            return -1;
+        }
+        int startIdx = Math.max(fromIndex, 0);
+        for (int i = startIdx; i < path.size(); i++) {
+            if (equalsInteger(stationId, path.get(i))) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
     private boolean sendSegmentWithRetry(StationCommand command,
                                          StationTaskTraceRegistry traceRegistry,
                                          Integer traceVersion,
                                          Integer currentStationId) {
+        // 鍦ㄤ笅鍙戞柊鍒嗘鍓嶆鏌ヨ矾鐢辩増鏈槸鍚︿粛鐒舵湁鏁堬紝閬垮厤鍦ㄨ矾鐢辩増鏈凡鏇存柊鐨勬儏鍐典笅涓嬪彂鏃х増鏈懡浠�
+        if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+            if (traceRegistry != null && command != null) {
+                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+                        buildDetails("reason", "route_version_replaced_before_segment_send", "routeVersion", command.getRouteVersion()));
+            }
+            markCancelled(command == null ? null : command.getTaskNo(),
+                    command == null ? null : command.getRouteVersion(),
+                    currentStationId,
+                    "route_version_replaced_before_segment_send");
+            return false;
+        }
+        
         while (true) {
-            if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
-                if (traceRegistry != null && command != null) {
-                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
-                            buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
-                }
-                markCancelled(command == null ? null : command.getTaskNo(),
-                        command == null ? null : command.getRouteVersion(),
-                        currentStationId,
-                        "route_version_replaced");
+            SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
+            if (sendResult == SegmentSendResult.CANCELLED) {
                 return false;
             }
-            if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
-                if (traceRegistry != null && command != null) {
-                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
-                            buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
-                }
-                markCancelled(command == null ? null : command.getTaskNo(),
-                        command == null ? null : command.getRouteVersion(),
-                        currentStationId,
-                        "redis_cancel_signal");
-                return false;
-            }
-            CommandResponse commandResponse = commandSender.apply(command);
-            if (commandResponse == null) {
+            if (sendResult == SegmentSendResult.RETRY) {
                 sleepQuietly(200L);
                 continue;
             }
-            if (commandResponse.getResult()) {
-                markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
-                return true;
-            }
-            sleepQuietly(200L);
+            return true;
         }
+    }
+
+    private SegmentSendResult executeLockedSegmentSend(StationCommand command,
+                                                       StationTaskTraceRegistry traceRegistry,
+                                                       Integer traceVersion,
+                                                       Integer currentStationId) {
+        Integer taskNo = command == null ? null : command.getTaskNo();
+        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+        if (moveCoordinator != null) {
+            // 鍒嗘鍙戦�佺殑鏈�缁堟鏌ュ拰瀹為檯涓嬪彂闇�瑕佷笌 reroute 鍏辩敤浠诲姟閿併��
+            // 杩欐牱鍒囪矾绾跨▼涓�鏃﹁繘鍏� CANCEL_PENDING/RESET锛屾棫璺嚎灏变笉鑳藉啀绌胯繃鏈�鍚庤繖涓�姝ュ彂鍒拌澶囦晶銆�
+            return moveCoordinator.withTaskDispatchLock(taskNo,
+                    () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId));
+        }
+        return doSendSegment(command, traceRegistry, traceVersion, currentStationId);
+    }
+
+    private SegmentSendResult doSendSegment(StationCommand command,
+                                            StationTaskTraceRegistry traceRegistry,
+                                            Integer traceVersion,
+                                            Integer currentStationId) {
+        if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+            if (traceRegistry != null && command != null) {
+                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+                        buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
+            }
+            markCancelled(command == null ? null : command.getTaskNo(),
+                    command == null ? null : command.getRouteVersion(),
+                    currentStationId,
+                    "route_version_replaced");
+            return SegmentSendResult.CANCELLED;
+        }
+        if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
+            if (traceRegistry != null && command != null) {
+                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+                        buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
+            }
+            markCancelled(command == null ? null : command.getTaskNo(),
+                    command == null ? null : command.getRouteVersion(),
+                    currentStationId,
+                    "redis_cancel_signal");
+            return SegmentSendResult.CANCELLED;
+        }
+        CommandResponse commandResponse = commandSender.apply(command);
+        if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) {
+            return SegmentSendResult.RETRY;
+        }
+        markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
+        return SegmentSendResult.DISPATCHED;
     }
 
     private double loadSegmentAdvanceRatio() {
@@ -401,15 +484,19 @@
         }
     }
 
-    private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
+    private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) {
         // Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
         // do not register a move session and therefore have no routeVersion.
         // They should keep the historical behavior and execute normally.
         if (taskNo == null || routeVersion == null) {
             return true;
         }
-        StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
-        return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
+        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+        return moveCoordinator == null || moveCoordinator.canDispatchRoute(taskNo, routeVersion);
+    }
+
+    private StationMoveCoordinator loadMoveCoordinator() {
+        return SpringUtils.getBean(StationMoveCoordinator.class);
     }
 
     private void markSegmentIssued(Integer taskNo, Integer routeVersion) {

--
Gitblit v1.9.1