From f3b64d003bc3458af3dd434e6187d3aba23a64aa Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期四, 26 三月 2026 14:35:44 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java |   98 +++++++++++++++++++++++++++++++++---------------
 1 files changed, 67 insertions(+), 31 deletions(-)

diff --git a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
index 3da25e3..50ca80f 100644
--- a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
+++ b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -35,6 +35,12 @@
     private final Function<StationCommand, CommandResponse> commandSender;
     private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
 
+    private enum SegmentSendResult {
+        DISPATCHED,
+        CANCELLED,
+        RETRY
+    }
+
     public StationSegmentExecutor(DeviceConfig deviceConfig,
                                   RedisUtil redisUtil,
                                   Function<StationCommand, CommandResponse> commandSender) {
@@ -86,7 +92,7 @@
         boolean firstRun = true;
         while (true) {
             try {
-                if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
+                if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) {
                     if (traceRegistry != null) {
                         traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
                                 buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
@@ -191,39 +197,65 @@
                                          Integer traceVersion,
                                          Integer currentStationId) {
         while (true) {
-            if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
-                if (traceRegistry != null && command != null) {
-                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
-                            buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
-                }
-                markCancelled(command == null ? null : command.getTaskNo(),
-                        command == null ? null : command.getRouteVersion(),
-                        currentStationId,
-                        "route_version_replaced");
+            SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
+            if (sendResult == SegmentSendResult.CANCELLED) {
                 return false;
             }
-            if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
-                if (traceRegistry != null && command != null) {
-                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
-                            buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
-                }
-                markCancelled(command == null ? null : command.getTaskNo(),
-                        command == null ? null : command.getRouteVersion(),
-                        currentStationId,
-                        "redis_cancel_signal");
-                return false;
-            }
-            CommandResponse commandResponse = commandSender.apply(command);
-            if (commandResponse == null) {
+            if (sendResult == SegmentSendResult.RETRY) {
                 sleepQuietly(200L);
                 continue;
             }
-            if (commandResponse.getResult()) {
-                markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
-                return true;
-            }
-            sleepQuietly(200L);
+            return true;
         }
+    }
+
+    private SegmentSendResult executeLockedSegmentSend(StationCommand command,
+                                                       StationTaskTraceRegistry traceRegistry,
+                                                       Integer traceVersion,
+                                                       Integer currentStationId) {
+        Integer taskNo = command == null ? null : command.getTaskNo();
+        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+        if (moveCoordinator != null) {
+            // 鍒嗘鍙戦�佺殑鏈�缁堟鏌ュ拰瀹為檯涓嬪彂闇�瑕佷笌 reroute 鍏辩敤浠诲姟閿併��
+            // 杩欐牱鍒囪矾绾跨▼涓�鏃﹁繘鍏� CANCEL_PENDING/RESET锛屾棫璺嚎灏变笉鑳藉啀绌胯繃鏈�鍚庤繖涓�姝ュ彂鍒拌澶囦晶銆�
+            return moveCoordinator.withTaskDispatchLock(taskNo,
+                    () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId));
+        }
+        return doSendSegment(command, traceRegistry, traceVersion, currentStationId);
+    }
+
+    private SegmentSendResult doSendSegment(StationCommand command,
+                                            StationTaskTraceRegistry traceRegistry,
+                                            Integer traceVersion,
+                                            Integer currentStationId) {
+        if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+            if (traceRegistry != null && command != null) {
+                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+                        buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
+            }
+            markCancelled(command == null ? null : command.getTaskNo(),
+                    command == null ? null : command.getRouteVersion(),
+                    currentStationId,
+                    "route_version_replaced");
+            return SegmentSendResult.CANCELLED;
+        }
+        if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
+            if (traceRegistry != null && command != null) {
+                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+                        buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
+            }
+            markCancelled(command == null ? null : command.getTaskNo(),
+                    command == null ? null : command.getRouteVersion(),
+                    currentStationId,
+                    "redis_cancel_signal");
+            return SegmentSendResult.CANCELLED;
+        }
+        CommandResponse commandResponse = commandSender.apply(command);
+        if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) {
+            return SegmentSendResult.RETRY;
+        }
+        markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
+        return SegmentSendResult.DISPATCHED;
     }
 
     private double loadSegmentAdvanceRatio() {
@@ -401,15 +433,19 @@
         }
     }
 
-    private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
+    private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) {
         // Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
         // do not register a move session and therefore have no routeVersion.
         // They should keep the historical behavior and execute normally.
         if (taskNo == null || routeVersion == null) {
             return true;
         }
-        StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
-        return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
+        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+        return moveCoordinator == null || moveCoordinator.canDispatchRoute(taskNo, routeVersion);
+    }
+
+    private StationMoveCoordinator loadMoveCoordinator() {
+        return SpringUtils.getBean(StationMoveCoordinator.class);
     }
 
     private void markSegmentIssued(Integer taskNo, Integer routeVersion) {

--
Gitblit v1.9.1