From f3b64d003bc3458af3dd434e6187d3aba23a64aa Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期四, 26 三月 2026 14:35:44 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java | 98 +++++++++++++------
src/main/java/com/zy/core/utils/StationOperateProcessUtils.java | 38 +++++--
src/main/java/com/zy/core/move/StationMoveSessionRegistry.java | 9 +
src/main/java/com/zy/core/move/StationMoveCoordinator.java | 25 +++++
src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java | 73 ++++++++++++++
src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java | 25 +++++
6 files changed, 227 insertions(+), 41 deletions(-)
diff --git a/src/main/java/com/zy/core/move/StationMoveCoordinator.java b/src/main/java/com/zy/core/move/StationMoveCoordinator.java
index 84eb0c6..2e6c0c3 100644
--- a/src/main/java/com/zy/core/move/StationMoveCoordinator.java
+++ b/src/main/java/com/zy/core/move/StationMoveCoordinator.java
@@ -12,15 +12,19 @@
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Supplier;
@Component
public class StationMoveCoordinator {
private static final int SESSION_EXPIRE_SECONDS = 60 * 60 * 24;
+ private final Map<Integer, ReentrantLock> taskDispatchLocks = new ConcurrentHashMap<>();
@Autowired
private StationMoveSessionRegistry sessionRegistry;
@@ -36,6 +40,10 @@
public boolean isActiveRoute(Integer taskNo, Integer routeVersion) {
return sessionRegistry != null && sessionRegistry.isActiveRoute(taskNo, routeVersion);
+ }
+
+ public boolean canDispatchRoute(Integer taskNo, Integer routeVersion) {
+ return sessionRegistry != null && sessionRegistry.canDispatchRoute(taskNo, routeVersion);
}
public void markSegmentIssued(Integer taskNo, Integer routeVersion) {
@@ -76,6 +84,23 @@
saveSession(session);
}
+ public <T> T withTaskDispatchLock(Integer taskNo, Supplier<T> supplier) {
+ if (supplier == null) {
+ return null;
+ }
+ if (taskNo == null || taskNo <= 0) {
+ return supplier.get();
+ }
+ // 鍚屼竴浠诲姟鐨勫垏璺拰鍒嗘鍙戦�佸繀椤诲叡浜竴鎶婇攣锛岄伩鍏嶆棫 routeVersion 鍦ㄧ嚎绋嬫櫄鍒版椂缁х画鎶婁笂涓�鏉℃鍛戒护鍐欏嚭鍘汇��
+ ReentrantLock lock = taskDispatchLocks.computeIfAbsent(taskNo, key -> new ReentrantLock());
+ lock.lock();
+ try {
+ return supplier.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+
public boolean shouldSuppressDispatch(Integer taskNo, Integer currentStationId, StationCommand candidateCommand) {
if (taskNo == null || taskNo <= 0 || currentStationId == null || candidateCommand == null) {
return false;
diff --git a/src/main/java/com/zy/core/move/StationMoveSessionRegistry.java b/src/main/java/com/zy/core/move/StationMoveSessionRegistry.java
index 82bdad5..db95c1b 100644
--- a/src/main/java/com/zy/core/move/StationMoveSessionRegistry.java
+++ b/src/main/java/com/zy/core/move/StationMoveSessionRegistry.java
@@ -63,6 +63,15 @@
&& Objects.equals(routeVersion, session.getRouteVersion());
}
+ public synchronized boolean canDispatchRoute(Integer taskNo, Integer routeVersion) {
+ StationMoveSession session = load(taskNo);
+ if (session == null || routeVersion == null || !Objects.equals(routeVersion, session.getRouteVersion())) {
+ return false;
+ }
+ return StationMoveSession.STATUS_WAITING.equals(session.getStatus())
+ || StationMoveSession.STATUS_RUNNING.equals(session.getStatus());
+ }
+
public synchronized boolean shouldSkipOutOrderDecision(Integer taskNo, Integer currentStationId) {
StationMoveSession session = load(taskNo);
if (session == null || !session.isActive() || currentStationId == null) {
diff --git a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
index 3da25e3..50ca80f 100644
--- a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
+++ b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -35,6 +35,12 @@
private final Function<StationCommand, CommandResponse> commandSender;
private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
+ private enum SegmentSendResult {
+ DISPATCHED,
+ CANCELLED,
+ RETRY
+ }
+
public StationSegmentExecutor(DeviceConfig deviceConfig,
RedisUtil redisUtil,
Function<StationCommand, CommandResponse> commandSender) {
@@ -86,7 +92,7 @@
boolean firstRun = true;
while (true) {
try {
- if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
+ if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) {
if (traceRegistry != null) {
traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
@@ -191,39 +197,65 @@
Integer traceVersion,
Integer currentStationId) {
while (true) {
- if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
- if (traceRegistry != null && command != null) {
- traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
- buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
- }
- markCancelled(command == null ? null : command.getTaskNo(),
- command == null ? null : command.getRouteVersion(),
- currentStationId,
- "route_version_replaced");
+ SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
+ if (sendResult == SegmentSendResult.CANCELLED) {
return false;
}
- if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
- if (traceRegistry != null && command != null) {
- traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
- buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
- }
- markCancelled(command == null ? null : command.getTaskNo(),
- command == null ? null : command.getRouteVersion(),
- currentStationId,
- "redis_cancel_signal");
- return false;
- }
- CommandResponse commandResponse = commandSender.apply(command);
- if (commandResponse == null) {
+ if (sendResult == SegmentSendResult.RETRY) {
sleepQuietly(200L);
continue;
}
- if (commandResponse.getResult()) {
- markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
- return true;
- }
- sleepQuietly(200L);
+ return true;
}
+ }
+
+ private SegmentSendResult executeLockedSegmentSend(StationCommand command,
+ StationTaskTraceRegistry traceRegistry,
+ Integer traceVersion,
+ Integer currentStationId) {
+ Integer taskNo = command == null ? null : command.getTaskNo();
+ StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+ if (moveCoordinator != null) {
+ // 鍒嗘鍙戦�佺殑鏈�缁堟鏌ュ拰瀹為檯涓嬪彂闇�瑕佷笌 reroute 鍏辩敤浠诲姟閿併��
+ // 杩欐牱鍒囪矾绾跨▼涓�鏃﹁繘鍏� CANCEL_PENDING/RESET锛屾棫璺嚎灏变笉鑳藉啀绌胯繃鏈�鍚庤繖涓�姝ュ彂鍒拌澶囦晶銆�
+ return moveCoordinator.withTaskDispatchLock(taskNo,
+ () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId));
+ }
+ return doSendSegment(command, traceRegistry, traceVersion, currentStationId);
+ }
+
+ private SegmentSendResult doSendSegment(StationCommand command,
+ StationTaskTraceRegistry traceRegistry,
+ Integer traceVersion,
+ Integer currentStationId) {
+ if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+ if (traceRegistry != null && command != null) {
+ traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+ buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
+ }
+ markCancelled(command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getRouteVersion(),
+ currentStationId,
+ "route_version_replaced");
+ return SegmentSendResult.CANCELLED;
+ }
+ if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
+ if (traceRegistry != null && command != null) {
+ traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+ buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
+ }
+ markCancelled(command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getRouteVersion(),
+ currentStationId,
+ "redis_cancel_signal");
+ return SegmentSendResult.CANCELLED;
+ }
+ CommandResponse commandResponse = commandSender.apply(command);
+ if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) {
+ return SegmentSendResult.RETRY;
+ }
+ markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
+ return SegmentSendResult.DISPATCHED;
}
private double loadSegmentAdvanceRatio() {
@@ -401,15 +433,19 @@
}
}
- private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
+ private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) {
// Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
// do not register a move session and therefore have no routeVersion.
// They should keep the historical behavior and execute normally.
if (taskNo == null || routeVersion == null) {
return true;
}
- StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
- return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
+ StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+ return moveCoordinator == null || moveCoordinator.canDispatchRoute(taskNo, routeVersion);
+ }
+
+ private StationMoveCoordinator loadMoveCoordinator() {
+ return SpringUtils.getBean(StationMoveCoordinator.class);
}
private void markSegmentIssued(Integer taskNo, Integer routeVersion) {
diff --git a/src/main/java/com/zy/core/utils/StationOperateProcessUtils.java b/src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
index 9ab571c..71153a4 100644
--- a/src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
+++ b/src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
@@ -804,17 +804,19 @@
if (taskNo == null || taskNo <= 0 || stationId == null) {
return RerouteExecutionResult.skip("invalid-station-task");
}
- boolean runBlockReroute = context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE;
- if (runBlockReroute) {
- // 绔欑偣杩涘叆鍫靛鍚庯紝璁惧渚у彲鑳藉凡缁忔妸涔嬪墠棰勪笅鍙戠殑鍒嗘鍛戒护娓呮帀浜嗐��
- // 鍏堜綔搴熸湰鍦� session/segment 鐘舵�侊紝鍐嶆寜鏂拌矾绾块噸鍙戯紝閬垮厤琚棫鐘舵�佸弽鍚戝崱浣忋��
- if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
- stationMoveCoordinator.cancelSession(taskNo);
- }
- if (context.resetSegmentCommandsBeforeDispatch()) {
- resetSegmentMoveCommandsBeforeReroute(taskNo);
- }
+ if (stationMoveCoordinator != null) {
+ return stationMoveCoordinator.withTaskDispatchLock(taskNo,
+ () -> executeReroutePlanWithTaskLock(context, plan, stationProtocol, taskNo, stationId));
}
+ return executeReroutePlanWithTaskLock(context, plan, stationProtocol, taskNo, stationId);
+ }
+
+ private RerouteExecutionResult executeReroutePlanWithTaskLock(RerouteContext context,
+ RerouteCommandPlan plan,
+ StationProtocol stationProtocol,
+ Integer taskNo,
+ Integer stationId) {
+ boolean runBlockReroute = context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE;
if (context.checkRecentDispatch()
&& shouldSkipIdleRecoverForRecentDispatch(taskNo, stationId)) {
return RerouteExecutionResult.skip("recent-dispatch");
@@ -848,6 +850,22 @@
return RerouteExecutionResult.skip("out-order-lock");
}
+ if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
+ // 鍒囪矾鍓嶅厛鎶婃棫 session 缃负 CANCEL_PENDING锛岃宸茬粡鎺掗槦涓殑鏃у垎娈电嚎绋嬪湪鏈�缁堝彂閫佸墠鍋滀笅銆�
+ stationMoveCoordinator.markCancelPending(taskNo, "reroute_pending");
+ }
+
+ if (runBlockReroute) {
+ // 绔欑偣杩涘叆鍫靛鍚庯紝璁惧渚у彲鑳藉凡缁忔妸涔嬪墠棰勪笅鍙戠殑鍒嗘鍛戒护娓呮帀浜嗐��
+ // 鍏堜綔搴熸湰鍦� session/segment 鐘舵�侊紝鍐嶆寜鏂拌矾绾块噸鍙戯紝閬垮厤琚棫鐘舵�佸弽鍚戝崱浣忋��
+ if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
+ stationMoveCoordinator.cancelSession(taskNo);
+ }
+ if (context.resetSegmentCommandsBeforeDispatch()) {
+ resetSegmentMoveCommandsBeforeReroute(taskNo);
+ }
+ }
+
if (!runBlockReroute
&& context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
stationMoveCoordinator.cancelSession(taskNo);
diff --git a/src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java b/src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java
new file mode 100644
index 0000000..abb018b
--- /dev/null
+++ b/src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java
@@ -0,0 +1,73 @@
+package com.zy.core.thread.impl.station;
+
+import com.alibaba.fastjson.JSON;
+import com.core.common.SpringUtils;
+import com.zy.asrs.entity.DeviceConfig;
+import com.zy.common.utils.RedisUtil;
+import com.zy.core.enums.RedisKeyType;
+import com.zy.core.model.CommandResponse;
+import com.zy.core.model.command.StationCommand;
+import com.zy.core.move.StationMoveCoordinator;
+import com.zy.core.move.StationMoveSessionRegistry;
+import com.zy.core.move.StationMoveSession;
+import org.junit.jupiter.api.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class StationSegmentExecutorTest {
+
+ @Test
+ void sendSegmentWithRetry_skipsWhenRouteIsCancelPending() {
+ ApplicationContext applicationContext = mock(ApplicationContext.class);
+ StationMoveCoordinator coordinator = new StationMoveCoordinator();
+ StationMoveSessionRegistry sessionRegistry = new StationMoveSessionRegistry();
+ RedisUtil redisUtil = mock(RedisUtil.class);
+ @SuppressWarnings("unchecked")
+ Function<StationCommand, CommandResponse> commandSender = mock(Function.class);
+
+ ReflectionTestUtils.setField(coordinator, "sessionRegistry", sessionRegistry);
+ ReflectionTestUtils.setField(coordinator, "redisUtil", redisUtil);
+ ReflectionTestUtils.setField(sessionRegistry, "redisUtil", redisUtil);
+ when(applicationContext.getBean(StationMoveCoordinator.class)).thenReturn(coordinator);
+ SpringUtils.init(applicationContext);
+
+ StationSegmentExecutor executor = new StationSegmentExecutor(new DeviceConfig(), redisUtil, commandSender);
+
+ StationCommand command = new StationCommand();
+ command.setTaskNo(10492);
+ command.setStationId(186);
+ command.setTargetStaNo(124);
+ command.setRouteVersion(23);
+
+ StationMoveSession session = new StationMoveSession();
+ session.setTaskNo(10492);
+ session.setRouteVersion(23);
+ session.setStatus(StationMoveSession.STATUS_CANCEL_PENDING);
+
+ when(redisUtil.get(RedisKeyType.STATION_MOVE_SESSION_.key + 10492))
+ .thenReturn(JSON.toJSONString(session));
+ when(redisUtil.get(RedisKeyType.DEVICE_STATION_MOVE_RESET.key + 10492)).thenReturn(null);
+ when(commandSender.apply(any())).thenReturn(new CommandResponse(true));
+
+ Boolean result = ReflectionTestUtils.invokeMethod(
+ executor,
+ "sendSegmentWithRetry",
+ command,
+ null,
+ null,
+ 186
+ );
+
+ assertFalse(Boolean.TRUE.equals(result));
+ verify(commandSender, never()).apply(any());
+ }
+}
diff --git a/src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java b/src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java
index d9163cf..05f033b 100644
--- a/src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java
+++ b/src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java
@@ -42,12 +42,21 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class StationOperateProcessUtilsReroutePipelineTest {
+
+ @SuppressWarnings("unchecked")
+ private void stubTaskDispatchLock(StationMoveCoordinator coordinator) {
+ when(coordinator.withTaskDispatchLock(any(), any())).thenAnswer(invocation -> {
+ java.util.function.Supplier<Object> supplier = invocation.getArgument(1);
+ return supplier == null ? null : supplier.get();
+ });
+ }
@Test
void choosesRunBlockCommandBuilderForRunBlockRerouteScene() {
@@ -181,6 +190,7 @@
StationOperateProcessUtils utils = new StationOperateProcessUtils();
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
RedisUtil redisUtil = mock(RedisUtil.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -226,6 +236,7 @@
StationOperateProcessUtils utils = new StationOperateProcessUtils();
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
RedisUtil redisUtil = mock(RedisUtil.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -273,6 +284,7 @@
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
BasStationOptService basStationOptService = mock(BasStationOptService.class);
RedisUtil redisUtil = mock(RedisUtil.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
ReflectionTestUtils.setField(utils, "basStationOptService", basStationOptService);
ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -310,6 +322,7 @@
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
RedisUtil redisUtil = mock(RedisUtil.class);
StationThread stationThread = mock(StationThread.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
@@ -371,6 +384,7 @@
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
RedisUtil redisUtil = mock(RedisUtil.class);
StationThread stationThread = mock(StationThread.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
@@ -479,7 +493,10 @@
void executePlan_runBlockReroute_reissuesWhenBlockedSessionMatchesCandidatePath() {
StationOperateProcessUtils utils = new StationOperateProcessUtils();
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
+ RedisUtil redisUtil = mock(RedisUtil.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
+ ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
StationCommand command = new StationCommand();
command.setTaskNo(100);
@@ -521,6 +538,9 @@
);
assertTrue(!result.skipped());
+ org.mockito.InOrder inOrder = inOrder(coordinator);
+ inOrder.verify(coordinator).markCancelPending(100, "reroute_pending");
+ inOrder.verify(coordinator).cancelSession(100);
verify(coordinator, times(1)).cancelSession(100);
} finally {
MessageQueue.clear(SlaveType.Devp, 1);
@@ -532,6 +552,7 @@
StationOperateProcessUtils utils = new StationOperateProcessUtils();
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
RedisUtil redisUtil = mock(RedisUtil.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -582,6 +603,7 @@
StationOperateProcessUtils utils = new StationOperateProcessUtils();
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
RedisUtil redisUtil = mock(RedisUtil.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -636,6 +658,7 @@
WrkAnalysisService wrkAnalysisService = mock(WrkAnalysisService.class);
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
StationThread stationThread = mock(StationThread.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
@@ -696,6 +719,7 @@
NotifyUtils notifyUtils = mock(NotifyUtils.class);
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
StationThread stationThread = mock(StationThread.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -749,6 +773,7 @@
StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
RedisUtil redisUtil = mock(RedisUtil.class);
StationThread stationThread = mock(StationThread.class);
+ stubTaskDispatchLock(coordinator);
ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
--
Gitblit v1.9.1