From f7d2eda120867b3c5a2d9001d5f5c9d0396c65bd Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 14 四月 2026 12:41:13 +0800
Subject: [PATCH] #命令下发队列优化

---
 src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java    |   20 ++++++----
 src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java         |   46 ++++++++++++++--------
 src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java |   11 ++++-
 3 files changed, 49 insertions(+), 28 deletions(-)

diff --git a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
index 9647384..21496f0 100644
--- a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
+++ b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -241,15 +241,19 @@
                                          Integer currentStationId) {
         // 鍦ㄤ笅鍙戞柊鍒嗘鍓嶆鏌ヨ矾鐢辩増鏈槸鍚︿粛鐒舵湁鏁堬紝閬垮厤鍦ㄨ矾鐢辩増鏈凡鏇存柊鐨勬儏鍐典笅涓嬪彂鏃х増鏈懡浠�
         if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
-            if (traceRegistry != null && command != null) {
-                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
-                        buildDetails("reason", "route_version_replaced_before_segment_send", "routeVersion", command.getRouteVersion()));
+            // 棣栨鏍¢獙澶辫触鍙兘鏄� Redis 鍐欏叆寤惰繜瀵艰嚧鐨勶紝鐭殏绛夊緟鍚庨噸璇曚竴娆°��
+            sleepQuietly(50L);
+            if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+                if (traceRegistry != null && command != null) {
+                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+                            buildDetails("reason", "route_version_replaced_before_segment_send", "routeVersion", command.getRouteVersion()));
+                }
+                markCancelled(command == null ? null : command.getTaskNo(),
+                        command == null ? null : command.getRouteVersion(),
+                        currentStationId,
+                        "route_version_replaced_before_segment_send");
+                return false;
             }
-            markCancelled(command == null ? null : command.getTaskNo(),
-                    command == null ? null : command.getRouteVersion(),
-                    currentStationId,
-                    "route_version_replaced_before_segment_send");
-            return false;
         }
         
         while (true) {
diff --git a/src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java b/src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java
index 5b51be2..7ae1627 100644
--- a/src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java
+++ b/src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java
@@ -351,8 +351,8 @@
             wrkMast.setModiTime(now);
             if (wrkMastService.updateById(wrkMast)) {
                 wrkAnalysisService.markInboundStationStart(wrkMast, now);
-                boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "stationInExecute");
-                if (offered && stationMoveCoordinator != null) {
+                // 鍏堣褰� session 鍐嶅叆闃熷懡浠わ紝閬垮厤娑堣垂绾跨▼鍦� session 鍐欏叆 Redis 鍓嶅彇鍒板懡浠ゅ鑷磋矾鐢辨牎楠屽け璐ャ��
+                if (stationMoveCoordinator != null) {
                     stationMoveCoordinator.recordDispatch(
                             wrkMast.getWrkNo(),
                             stationProtocol.getStationId(),
@@ -361,7 +361,12 @@
                             false
                     );
                 }
-                News.info("杈撻�佺珯鐐瑰叆搴撳懡浠や笅鍙戞垚鍔燂紝绔欑偣鍙�={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}", stationId, wrkMast.getWrkNo(), JSON.toJSONString(command));
+                boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "stationInExecute");
+                if (offered) {
+                    News.info("杈撻�佺珯鐐瑰叆搴撳懡浠や笅鍙戞垚鍔燂紝绔欑偣鍙�={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}", stationId, wrkMast.getWrkNo(), JSON.toJSONString(command));
+                } else {
+                    News.warn("杈撻�佺珯鐐瑰叆搴撳懡浠ゅ叆闃熻鎷掔粷(鍙兘閲嶅)锛岀珯鐐瑰彿={}锛屽伐浣滃彿={}", stationId, wrkMast.getWrkNo());
+                }
                 redisUtil.set(RedisKeyType.STATION_IN_EXECUTE_LIMIT.key + stationId, "lock", 5);
                 loadGuardState.reserveLoopTask(loopHitResult.getLoopNo());
                 stationDispatchLoadSupport.saveLoopLoadReserve(wrkMast.getWrkNo(), loopHitResult);
diff --git a/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java b/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
index dddde68..a433445 100644
--- a/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
+++ b/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
@@ -511,13 +511,16 @@
 
         int clearedCommandCount = 0;
 
-        boolean offered = offerDevpCommandWithDedup(context.dispatchDeviceNo(), plan.command(), plan.dispatchScene());
-        if (!offered) {
-            return RerouteExecutionResult.skip("dispatch-dedup");
-        }
+        // 鍏堝彇娑堟棫 session 骞惰褰曟柊 session锛屽啀鍏ラ槦鍛戒护锛岄伩鍏嶆秷璐圭嚎绋嬪湪 session 鍐欏叆 Redis 鍓嶅彇鍒板懡浠ゅ鑷磋矾鐢辨牎楠屽け璐ャ��
         if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
             stationMoveCoordinator.markCancelPending(taskNo, "reroute_pending");
             stationMoveCoordinator.cancelSession(taskNo);
+        }
+        preRegisterDispatchSession(context, plan);
+
+        boolean offered = offerDevpCommandWithDedup(context.dispatchDeviceNo(), plan.command(), plan.dispatchScene());
+        if (!offered) {
+            return RerouteExecutionResult.skip("dispatch-dedup");
         }
 
         applyRerouteDispatchEffects(context, plan, clearedCommandCount);
@@ -531,6 +534,24 @@
         }
         RerouteCommandPlan plan = buildRerouteCommandPlan(context, decision);
         return executeReroutePlan(context, plan);
+    }
+
+    private void preRegisterDispatchSession(RerouteContext context, RerouteCommandPlan plan) {
+        if (context == null || plan == null || plan.command() == null || context.wrkMast() == null || context.stationProtocol() == null) {
+            return;
+        }
+        if (stationMoveCoordinator == null) {
+            return;
+        }
+        OutOrderDispatchDecision dispatchDecision =
+                plan.decision() == null ? null : plan.decision().dispatchDecision();
+        stationMoveCoordinator.recordDispatch(
+                context.wrkMast().getWrkNo(),
+                context.stationProtocol().getStationId(),
+                plan.dispatchScene(),
+                plan.command(),
+                dispatchDecision != null && dispatchDecision.isCircle()
+        );
     }
 
     private void applyRerouteDispatchEffects(RerouteContext context,
@@ -551,15 +572,6 @@
                 dispatchDecision,
                 plan.command()
         );
-        if (stationMoveCoordinator != null) {
-            stationMoveCoordinator.recordDispatch(
-                    wrkMast.getWrkNo(),
-                    stationProtocol.getStationId(),
-                    plan.dispatchScene(),
-                    plan.command(),
-                    dispatchDecision != null && dispatchDecision.isCircle()
-            );
-        }
         if (context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE) {
             News.info("杈撻�佺珯鐐瑰牭濉炲悗閲嶆柊璁$畻璺緞鍛戒护涓嬪彂鎴愬姛锛岀珯鐐瑰彿={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}",
                     stationProtocol.getStationId(),
@@ -678,10 +690,6 @@
                 stationProtocol.getStationId(),
                 RUN_BLOCK_DIRECT_REASSIGN_LIMIT_SECONDS);
         stationDispatchRuntimeStateSupport.signalSegmentReset(wrkMast.getWrkNo(), STATION_MOVE_RESET_WAIT_MS);
-        boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "checkStationRunBlock_direct");
-        if (!offered) {
-            return;
-        }
         if (stationMoveCoordinator != null) {
             stationMoveCoordinator.markCancelPending(wrkMast.getWrkNo(), "reroute_pending");
             stationMoveCoordinator.cancelSession(wrkMast.getWrkNo());
@@ -693,6 +701,10 @@
                     false
             );
         }
+        boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "checkStationRunBlock_direct");
+        if (!offered) {
+            News.warn("杈撻�佺珯鐐瑰牭濉炵洿娲惧懡浠ゅ叆闃熻鎷掔粷(鍙兘閲嶅)锛岀珯鐐瑰彿={}锛屽伐浣滃彿={}", stationProtocol.getStationId(), wrkMast.getWrkNo());
+        }
     }
 
     private int countCurrentTaskBufferCommands(List<StationTaskBufferItem> taskBufferItems, Integer currentTaskNo) {

--
Gitblit v1.9.1