From f3b64d003bc3458af3dd434e6187d3aba23a64aa Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期四, 26 三月 2026 14:35:44 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java          |   98 +++++++++++++------
 src/main/java/com/zy/core/utils/StationOperateProcessUtils.java                    |   38 +++++--
 src/main/java/com/zy/core/move/StationMoveSessionRegistry.java                     |    9 +
 src/main/java/com/zy/core/move/StationMoveCoordinator.java                         |   25 +++++
 src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java      |   73 ++++++++++++++
 src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java |   25 +++++
 6 files changed, 227 insertions(+), 41 deletions(-)

diff --git a/src/main/java/com/zy/core/move/StationMoveCoordinator.java b/src/main/java/com/zy/core/move/StationMoveCoordinator.java
index 84eb0c6..2e6c0c3 100644
--- a/src/main/java/com/zy/core/move/StationMoveCoordinator.java
+++ b/src/main/java/com/zy/core/move/StationMoveCoordinator.java
@@ -12,15 +12,19 @@
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 @Component
 public class StationMoveCoordinator {
 
     private static final int SESSION_EXPIRE_SECONDS = 60 * 60 * 24;
+    private final Map<Integer, ReentrantLock> taskDispatchLocks = new ConcurrentHashMap<>();
 
     @Autowired
     private StationMoveSessionRegistry sessionRegistry;
@@ -36,6 +40,10 @@
 
     public boolean isActiveRoute(Integer taskNo, Integer routeVersion) {
         return sessionRegistry != null && sessionRegistry.isActiveRoute(taskNo, routeVersion);
+    }
+
+    public boolean canDispatchRoute(Integer taskNo, Integer routeVersion) {
+        return sessionRegistry != null && sessionRegistry.canDispatchRoute(taskNo, routeVersion);
     }
 
     public void markSegmentIssued(Integer taskNo, Integer routeVersion) {
@@ -76,6 +84,23 @@
         saveSession(session);
     }
 
+    public <T> T withTaskDispatchLock(Integer taskNo, Supplier<T> supplier) {
+        if (supplier == null) {
+            return null;
+        }
+        if (taskNo == null || taskNo <= 0) {
+            return supplier.get();
+        }
+        // 鍚屼竴浠诲姟鐨勫垏璺拰鍒嗘鍙戦�佸繀椤诲叡浜竴鎶婇攣锛岄伩鍏嶆棫 routeVersion 鍦ㄧ嚎绋嬫櫄鍒版椂缁х画鎶婁笂涓�鏉℃鍛戒护鍐欏嚭鍘汇��
+        ReentrantLock lock = taskDispatchLocks.computeIfAbsent(taskNo, key -> new ReentrantLock());
+        lock.lock();
+        try {
+            return supplier.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+
     public boolean shouldSuppressDispatch(Integer taskNo, Integer currentStationId, StationCommand candidateCommand) {
         if (taskNo == null || taskNo <= 0 || currentStationId == null || candidateCommand == null) {
             return false;
diff --git a/src/main/java/com/zy/core/move/StationMoveSessionRegistry.java b/src/main/java/com/zy/core/move/StationMoveSessionRegistry.java
index 82bdad5..db95c1b 100644
--- a/src/main/java/com/zy/core/move/StationMoveSessionRegistry.java
+++ b/src/main/java/com/zy/core/move/StationMoveSessionRegistry.java
@@ -63,6 +63,15 @@
                 && Objects.equals(routeVersion, session.getRouteVersion());
     }
 
+    public synchronized boolean canDispatchRoute(Integer taskNo, Integer routeVersion) {
+        StationMoveSession session = load(taskNo);
+        if (session == null || routeVersion == null || !Objects.equals(routeVersion, session.getRouteVersion())) {
+            return false;
+        }
+        return StationMoveSession.STATUS_WAITING.equals(session.getStatus())
+                || StationMoveSession.STATUS_RUNNING.equals(session.getStatus());
+    }
+
     public synchronized boolean shouldSkipOutOrderDecision(Integer taskNo, Integer currentStationId) {
         StationMoveSession session = load(taskNo);
         if (session == null || !session.isActive() || currentStationId == null) {
diff --git a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
index 3da25e3..50ca80f 100644
--- a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
+++ b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -35,6 +35,12 @@
     private final Function<StationCommand, CommandResponse> commandSender;
     private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
 
+    private enum SegmentSendResult {
+        DISPATCHED,
+        CANCELLED,
+        RETRY
+    }
+
     public StationSegmentExecutor(DeviceConfig deviceConfig,
                                   RedisUtil redisUtil,
                                   Function<StationCommand, CommandResponse> commandSender) {
@@ -86,7 +92,7 @@
         boolean firstRun = true;
         while (true) {
             try {
-                if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
+                if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) {
                     if (traceRegistry != null) {
                         traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
                                 buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
@@ -191,39 +197,65 @@
                                          Integer traceVersion,
                                          Integer currentStationId) {
         while (true) {
-            if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
-                if (traceRegistry != null && command != null) {
-                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
-                            buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
-                }
-                markCancelled(command == null ? null : command.getTaskNo(),
-                        command == null ? null : command.getRouteVersion(),
-                        currentStationId,
-                        "route_version_replaced");
+            SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
+            if (sendResult == SegmentSendResult.CANCELLED) {
                 return false;
             }
-            if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
-                if (traceRegistry != null && command != null) {
-                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
-                            buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
-                }
-                markCancelled(command == null ? null : command.getTaskNo(),
-                        command == null ? null : command.getRouteVersion(),
-                        currentStationId,
-                        "redis_cancel_signal");
-                return false;
-            }
-            CommandResponse commandResponse = commandSender.apply(command);
-            if (commandResponse == null) {
+            if (sendResult == SegmentSendResult.RETRY) {
                 sleepQuietly(200L);
                 continue;
             }
-            if (commandResponse.getResult()) {
-                markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
-                return true;
-            }
-            sleepQuietly(200L);
+            return true;
         }
+    }
+
+    private SegmentSendResult executeLockedSegmentSend(StationCommand command,
+                                                       StationTaskTraceRegistry traceRegistry,
+                                                       Integer traceVersion,
+                                                       Integer currentStationId) {
+        Integer taskNo = command == null ? null : command.getTaskNo();
+        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+        if (moveCoordinator != null) {
+            // 鍒嗘鍙戦�佺殑鏈�缁堟鏌ュ拰瀹為檯涓嬪彂闇�瑕佷笌 reroute 鍏辩敤浠诲姟閿併��
+            // 杩欐牱鍒囪矾绾跨▼涓�鏃﹁繘鍏� CANCEL_PENDING/RESET锛屾棫璺嚎灏变笉鑳藉啀绌胯繃鏈�鍚庤繖涓�姝ュ彂鍒拌澶囦晶銆�
+            return moveCoordinator.withTaskDispatchLock(taskNo,
+                    () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId));
+        }
+        return doSendSegment(command, traceRegistry, traceVersion, currentStationId);
+    }
+
+    private SegmentSendResult doSendSegment(StationCommand command,
+                                            StationTaskTraceRegistry traceRegistry,
+                                            Integer traceVersion,
+                                            Integer currentStationId) {
+        if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+            if (traceRegistry != null && command != null) {
+                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+                        buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
+            }
+            markCancelled(command == null ? null : command.getTaskNo(),
+                    command == null ? null : command.getRouteVersion(),
+                    currentStationId,
+                    "route_version_replaced");
+            return SegmentSendResult.CANCELLED;
+        }
+        if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
+            if (traceRegistry != null && command != null) {
+                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+                        buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
+            }
+            markCancelled(command == null ? null : command.getTaskNo(),
+                    command == null ? null : command.getRouteVersion(),
+                    currentStationId,
+                    "redis_cancel_signal");
+            return SegmentSendResult.CANCELLED;
+        }
+        CommandResponse commandResponse = commandSender.apply(command);
+        if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) {
+            return SegmentSendResult.RETRY;
+        }
+        markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
+        return SegmentSendResult.DISPATCHED;
     }
 
     private double loadSegmentAdvanceRatio() {
@@ -401,15 +433,19 @@
         }
     }
 
-    private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
+    private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) {
         // Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
         // do not register a move session and therefore have no routeVersion.
         // They should keep the historical behavior and execute normally.
         if (taskNo == null || routeVersion == null) {
             return true;
         }
-        StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
-        return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
+        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+        return moveCoordinator == null || moveCoordinator.canDispatchRoute(taskNo, routeVersion);
+    }
+
+    private StationMoveCoordinator loadMoveCoordinator() {
+        return SpringUtils.getBean(StationMoveCoordinator.class);
     }
 
     private void markSegmentIssued(Integer taskNo, Integer routeVersion) {
diff --git a/src/main/java/com/zy/core/utils/StationOperateProcessUtils.java b/src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
index 9ab571c..71153a4 100644
--- a/src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
+++ b/src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
@@ -804,17 +804,19 @@
         if (taskNo == null || taskNo <= 0 || stationId == null) {
             return RerouteExecutionResult.skip("invalid-station-task");
         }
-        boolean runBlockReroute = context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE;
-        if (runBlockReroute) {
-            // 绔欑偣杩涘叆鍫靛鍚庯紝璁惧渚у彲鑳藉凡缁忔妸涔嬪墠棰勪笅鍙戠殑鍒嗘鍛戒护娓呮帀浜嗐��
-            // 鍏堜綔搴熸湰鍦� session/segment 鐘舵�侊紝鍐嶆寜鏂拌矾绾块噸鍙戯紝閬垮厤琚棫鐘舵�佸弽鍚戝崱浣忋��
-            if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
-                stationMoveCoordinator.cancelSession(taskNo);
-            }
-            if (context.resetSegmentCommandsBeforeDispatch()) {
-                resetSegmentMoveCommandsBeforeReroute(taskNo);
-            }
+        if (stationMoveCoordinator != null) {
+            return stationMoveCoordinator.withTaskDispatchLock(taskNo,
+                    () -> executeReroutePlanWithTaskLock(context, plan, stationProtocol, taskNo, stationId));
         }
+        return executeReroutePlanWithTaskLock(context, plan, stationProtocol, taskNo, stationId);
+    }
+
+    private RerouteExecutionResult executeReroutePlanWithTaskLock(RerouteContext context,
+                                                                  RerouteCommandPlan plan,
+                                                                  StationProtocol stationProtocol,
+                                                                  Integer taskNo,
+                                                                  Integer stationId) {
+        boolean runBlockReroute = context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE;
         if (context.checkRecentDispatch()
                 && shouldSkipIdleRecoverForRecentDispatch(taskNo, stationId)) {
             return RerouteExecutionResult.skip("recent-dispatch");
@@ -848,6 +850,22 @@
             return RerouteExecutionResult.skip("out-order-lock");
         }
 
+        if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
+            // 鍒囪矾鍓嶅厛鎶婃棫 session 缃负 CANCEL_PENDING锛岃宸茬粡鎺掗槦涓殑鏃у垎娈电嚎绋嬪湪鏈�缁堝彂閫佸墠鍋滀笅銆�
+            stationMoveCoordinator.markCancelPending(taskNo, "reroute_pending");
+        }
+
+        if (runBlockReroute) {
+            // 绔欑偣杩涘叆鍫靛鍚庯紝璁惧渚у彲鑳藉凡缁忔妸涔嬪墠棰勪笅鍙戠殑鍒嗘鍛戒护娓呮帀浜嗐��
+            // 鍏堜綔搴熸湰鍦� session/segment 鐘舵�侊紝鍐嶆寜鏂拌矾绾块噸鍙戯紝閬垮厤琚棫鐘舵�佸弽鍚戝崱浣忋��
+            if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
+                stationMoveCoordinator.cancelSession(taskNo);
+            }
+            if (context.resetSegmentCommandsBeforeDispatch()) {
+                resetSegmentMoveCommandsBeforeReroute(taskNo);
+            }
+        }
+
         if (!runBlockReroute
                 && context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
             stationMoveCoordinator.cancelSession(taskNo);
diff --git a/src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java b/src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java
new file mode 100644
index 0000000..abb018b
--- /dev/null
+++ b/src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java
@@ -0,0 +1,73 @@
+package com.zy.core.thread.impl.station;
+
+import com.alibaba.fastjson.JSON;
+import com.core.common.SpringUtils;
+import com.zy.asrs.entity.DeviceConfig;
+import com.zy.common.utils.RedisUtil;
+import com.zy.core.enums.RedisKeyType;
+import com.zy.core.model.CommandResponse;
+import com.zy.core.model.command.StationCommand;
+import com.zy.core.move.StationMoveCoordinator;
+import com.zy.core.move.StationMoveSessionRegistry;
+import com.zy.core.move.StationMoveSession;
+import org.junit.jupiter.api.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class StationSegmentExecutorTest {
+
+    @Test
+    void sendSegmentWithRetry_skipsWhenRouteIsCancelPending() {
+        ApplicationContext applicationContext = mock(ApplicationContext.class);
+        StationMoveCoordinator coordinator = new StationMoveCoordinator();
+        StationMoveSessionRegistry sessionRegistry = new StationMoveSessionRegistry();
+        RedisUtil redisUtil = mock(RedisUtil.class);
+        @SuppressWarnings("unchecked")
+        Function<StationCommand, CommandResponse> commandSender = mock(Function.class);
+
+        ReflectionTestUtils.setField(coordinator, "sessionRegistry", sessionRegistry);
+        ReflectionTestUtils.setField(coordinator, "redisUtil", redisUtil);
+        ReflectionTestUtils.setField(sessionRegistry, "redisUtil", redisUtil);
+        when(applicationContext.getBean(StationMoveCoordinator.class)).thenReturn(coordinator);
+        SpringUtils.init(applicationContext);
+
+        StationSegmentExecutor executor = new StationSegmentExecutor(new DeviceConfig(), redisUtil, commandSender);
+
+        StationCommand command = new StationCommand();
+        command.setTaskNo(10492);
+        command.setStationId(186);
+        command.setTargetStaNo(124);
+        command.setRouteVersion(23);
+
+        StationMoveSession session = new StationMoveSession();
+        session.setTaskNo(10492);
+        session.setRouteVersion(23);
+        session.setStatus(StationMoveSession.STATUS_CANCEL_PENDING);
+
+        when(redisUtil.get(RedisKeyType.STATION_MOVE_SESSION_.key + 10492))
+                .thenReturn(JSON.toJSONString(session));
+        when(redisUtil.get(RedisKeyType.DEVICE_STATION_MOVE_RESET.key + 10492)).thenReturn(null);
+        when(commandSender.apply(any())).thenReturn(new CommandResponse(true));
+
+        Boolean result = ReflectionTestUtils.invokeMethod(
+                executor,
+                "sendSegmentWithRetry",
+                command,
+                null,
+                null,
+                186
+        );
+
+        assertFalse(Boolean.TRUE.equals(result));
+        verify(commandSender, never()).apply(any());
+    }
+}
diff --git a/src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java b/src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java
index d9163cf..05f033b 100644
--- a/src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java
+++ b/src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java
@@ -42,12 +42,21 @@
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 class StationOperateProcessUtilsReroutePipelineTest {
+
+    @SuppressWarnings("unchecked")
+    private void stubTaskDispatchLock(StationMoveCoordinator coordinator) {
+        when(coordinator.withTaskDispatchLock(any(), any())).thenAnswer(invocation -> {
+            java.util.function.Supplier<Object> supplier = invocation.getArgument(1);
+            return supplier == null ? null : supplier.get();
+        });
+    }
 
     @Test
     void choosesRunBlockCommandBuilderForRunBlockRerouteScene() {
@@ -181,6 +190,7 @@
         StationOperateProcessUtils utils = new StationOperateProcessUtils();
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         RedisUtil redisUtil = mock(RedisUtil.class);
+        stubTaskDispatchLock(coordinator);
         ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
         ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
 
@@ -226,6 +236,7 @@
         StationOperateProcessUtils utils = new StationOperateProcessUtils();
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         RedisUtil redisUtil = mock(RedisUtil.class);
+        stubTaskDispatchLock(coordinator);
         ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
         ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
 
@@ -273,6 +284,7 @@
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         BasStationOptService basStationOptService = mock(BasStationOptService.class);
         RedisUtil redisUtil = mock(RedisUtil.class);
+        stubTaskDispatchLock(coordinator);
         ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
         ReflectionTestUtils.setField(utils, "basStationOptService", basStationOptService);
         ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -310,6 +322,7 @@
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         RedisUtil redisUtil = mock(RedisUtil.class);
         StationThread stationThread = mock(StationThread.class);
+        stubTaskDispatchLock(coordinator);
 
         ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
         ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
@@ -371,6 +384,7 @@
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         RedisUtil redisUtil = mock(RedisUtil.class);
         StationThread stationThread = mock(StationThread.class);
+        stubTaskDispatchLock(coordinator);
 
         ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
         ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
@@ -479,7 +493,10 @@
     void executePlan_runBlockReroute_reissuesWhenBlockedSessionMatchesCandidatePath() {
         StationOperateProcessUtils utils = new StationOperateProcessUtils();
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
+        RedisUtil redisUtil = mock(RedisUtil.class);
+        stubTaskDispatchLock(coordinator);
         ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
+        ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
 
         StationCommand command = new StationCommand();
         command.setTaskNo(100);
@@ -521,6 +538,9 @@
             );
 
             assertTrue(!result.skipped());
+            org.mockito.InOrder inOrder = inOrder(coordinator);
+            inOrder.verify(coordinator).markCancelPending(100, "reroute_pending");
+            inOrder.verify(coordinator).cancelSession(100);
             verify(coordinator, times(1)).cancelSession(100);
         } finally {
             MessageQueue.clear(SlaveType.Devp, 1);
@@ -532,6 +552,7 @@
         StationOperateProcessUtils utils = new StationOperateProcessUtils();
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         RedisUtil redisUtil = mock(RedisUtil.class);
+        stubTaskDispatchLock(coordinator);
         ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
         ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
 
@@ -582,6 +603,7 @@
         StationOperateProcessUtils utils = new StationOperateProcessUtils();
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         RedisUtil redisUtil = mock(RedisUtil.class);
+        stubTaskDispatchLock(coordinator);
         ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
         ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
 
@@ -636,6 +658,7 @@
         WrkAnalysisService wrkAnalysisService = mock(WrkAnalysisService.class);
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         StationThread stationThread = mock(StationThread.class);
+        stubTaskDispatchLock(coordinator);
 
         ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
         ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
@@ -696,6 +719,7 @@
         NotifyUtils notifyUtils = mock(NotifyUtils.class);
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         StationThread stationThread = mock(StationThread.class);
+        stubTaskDispatchLock(coordinator);
 
         ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
         ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -749,6 +773,7 @@
         StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
         RedisUtil redisUtil = mock(RedisUtil.class);
         StationThread stationThread = mock(StationThread.class);
+        stubTaskDispatchLock(coordinator);
 
         ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
         ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);

--
Gitblit v1.9.1