| | |
| | | import java.security.MessageDigest; |
| | | import java.util.ArrayList; |
| | | import java.util.Collections; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Objects; |
| | | import java.util.function.Supplier; |
| | | |
| | | @Component |
| | | public class StationMoveCoordinator { |
| | | |
| | | private static final int SESSION_EXPIRE_SECONDS = 60 * 60 * 24; |
| | | private final Map<Integer, ReentrantLock> taskDispatchLocks = new ConcurrentHashMap<>(); |
| | | |
| | | @Autowired |
| | | private StationMoveSessionRegistry sessionRegistry; |
| | |
| | | |
| | | public boolean isActiveRoute(Integer taskNo, Integer routeVersion) { |
| | | return sessionRegistry != null && sessionRegistry.isActiveRoute(taskNo, routeVersion); |
| | | } |
| | | |
| | | public boolean canDispatchRoute(Integer taskNo, Integer routeVersion) { |
| | | return sessionRegistry != null && sessionRegistry.canDispatchRoute(taskNo, routeVersion); |
| | | } |
| | | |
| | | public void markSegmentIssued(Integer taskNo, Integer routeVersion) { |
| | |
| | | saveSession(session); |
| | | } |
| | | |
| | | public <T> T withTaskDispatchLock(Integer taskNo, Supplier<T> supplier) { |
| | | if (supplier == null) { |
| | | return null; |
| | | } |
| | | if (taskNo == null || taskNo <= 0) { |
| | | return supplier.get(); |
| | | } |
| | | // 同一任务的切路和分段发送必须共享一把锁,避免旧 routeVersion 在线程晚到时继续把上一条段命令写出去。 |
| | | ReentrantLock lock = taskDispatchLocks.computeIfAbsent(taskNo, key -> new ReentrantLock()); |
| | | lock.lock(); |
| | | try { |
| | | return supplier.get(); |
| | | } finally { |
| | | lock.unlock(); |
| | | } |
| | | } |
| | | |
| | | public boolean shouldSuppressDispatch(Integer taskNo, Integer currentStationId, StationCommand candidateCommand) { |
| | | if (taskNo == null || taskNo <= 0 || currentStationId == null || candidateCommand == null) { |
| | | return false; |
| | |
| | | && Objects.equals(routeVersion, session.getRouteVersion()); |
| | | } |
| | | |
| | | public synchronized boolean canDispatchRoute(Integer taskNo, Integer routeVersion) { |
| | | StationMoveSession session = load(taskNo); |
| | | if (session == null || routeVersion == null || !Objects.equals(routeVersion, session.getRouteVersion())) { |
| | | return false; |
| | | } |
| | | return StationMoveSession.STATUS_WAITING.equals(session.getStatus()) |
| | | || StationMoveSession.STATUS_RUNNING.equals(session.getStatus()); |
| | | } |
| | | |
| | | public synchronized boolean shouldSkipOutOrderDecision(Integer taskNo, Integer currentStationId) { |
| | | StationMoveSession session = load(taskNo); |
| | | if (session == null || !session.isActive() || currentStationId == null) { |
| | |
| | | private final Function<StationCommand, CommandResponse> commandSender; |
| | | private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner(); |
| | | |
| | | private enum SegmentSendResult { |
| | | DISPATCHED, |
| | | CANCELLED, |
| | | RETRY |
| | | } |
| | | |
| | | public StationSegmentExecutor(DeviceConfig deviceConfig, |
| | | RedisUtil redisUtil, |
| | | Function<StationCommand, CommandResponse> commandSender) { |
| | |
| | | boolean firstRun = true; |
| | | while (true) { |
| | | try { |
| | | if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) { |
| | | if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) { |
| | | if (traceRegistry != null) { |
| | | traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId, |
| | | buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion())); |
| | |
| | | Integer traceVersion, |
| | | Integer currentStationId) { |
| | | while (true) { |
| | | if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) { |
| | | SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId); |
| | | if (sendResult == SegmentSendResult.CANCELLED) { |
| | | return false; |
| | | } |
| | | if (sendResult == SegmentSendResult.RETRY) { |
| | | sleepQuietly(200L); |
| | | continue; |
| | | } |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | private SegmentSendResult executeLockedSegmentSend(StationCommand command, |
| | | StationTaskTraceRegistry traceRegistry, |
| | | Integer traceVersion, |
| | | Integer currentStationId) { |
| | | Integer taskNo = command == null ? null : command.getTaskNo(); |
| | | StationMoveCoordinator moveCoordinator = loadMoveCoordinator(); |
| | | if (moveCoordinator != null) { |
| | | // 分段发送的最终检查和实际下发需要与 reroute 共用任务锁。 |
| | | // 这样切路线程一旦进入 CANCEL_PENDING/RESET,旧路线就不能再穿过最后这一步发到设备侧。 |
| | | return moveCoordinator.withTaskDispatchLock(taskNo, |
| | | () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId)); |
| | | } |
| | | return doSendSegment(command, traceRegistry, traceVersion, currentStationId); |
| | | } |
| | | |
| | | private SegmentSendResult doSendSegment(StationCommand command, |
| | | StationTaskTraceRegistry traceRegistry, |
| | | Integer traceVersion, |
| | | Integer currentStationId) { |
| | | if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) { |
| | | if (traceRegistry != null && command != null) { |
| | | traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId, |
| | | buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion())); |
| | |
| | | command == null ? null : command.getRouteVersion(), |
| | | currentStationId, |
| | | "route_version_replaced"); |
| | | return false; |
| | | return SegmentSendResult.CANCELLED; |
| | | } |
| | | if (isTaskMoveReset(command == null ? null : command.getTaskNo())) { |
| | | if (traceRegistry != null && command != null) { |
| | |
| | | command == null ? null : command.getRouteVersion(), |
| | | currentStationId, |
| | | "redis_cancel_signal"); |
| | | return false; |
| | | return SegmentSendResult.CANCELLED; |
| | | } |
| | | CommandResponse commandResponse = commandSender.apply(command); |
| | | if (commandResponse == null) { |
| | | sleepQuietly(200L); |
| | | continue; |
| | | if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) { |
| | | return SegmentSendResult.RETRY; |
| | | } |
| | | if (commandResponse.getResult()) { |
| | | markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion()); |
| | | return true; |
| | | } |
| | | sleepQuietly(200L); |
| | | } |
| | | return SegmentSendResult.DISPATCHED; |
| | | } |
| | | |
| | | private double loadSegmentAdvanceRatio() { |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean isRouteActive(Integer taskNo, Integer routeVersion) { |
| | | private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) { |
| | | // Legacy direct-enqueue commands (for example FakeProcess/stationInExecute) |
| | | // do not register a move session and therefore have no routeVersion. |
| | | // They should keep the historical behavior and execute normally. |
| | | if (taskNo == null || routeVersion == null) { |
| | | return true; |
| | | } |
| | | StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class); |
| | | return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion); |
| | | StationMoveCoordinator moveCoordinator = loadMoveCoordinator(); |
| | | return moveCoordinator == null || moveCoordinator.canDispatchRoute(taskNo, routeVersion); |
| | | } |
| | | |
| | | private StationMoveCoordinator loadMoveCoordinator() { |
| | | return SpringUtils.getBean(StationMoveCoordinator.class); |
| | | } |
| | | |
| | | private void markSegmentIssued(Integer taskNo, Integer routeVersion) { |
| | |
| | | if (taskNo == null || taskNo <= 0 || stationId == null) { |
| | | return RerouteExecutionResult.skip("invalid-station-task"); |
| | | } |
| | | if (stationMoveCoordinator != null) { |
| | | return stationMoveCoordinator.withTaskDispatchLock(taskNo, |
| | | () -> executeReroutePlanWithTaskLock(context, plan, stationProtocol, taskNo, stationId)); |
| | | } |
| | | return executeReroutePlanWithTaskLock(context, plan, stationProtocol, taskNo, stationId); |
| | | } |
| | | |
| | | private RerouteExecutionResult executeReroutePlanWithTaskLock(RerouteContext context, |
| | | RerouteCommandPlan plan, |
| | | StationProtocol stationProtocol, |
| | | Integer taskNo, |
| | | Integer stationId) { |
| | | boolean runBlockReroute = context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE; |
| | | if (runBlockReroute) { |
| | | // 站点进入堵塞后,设备侧可能已经把之前预下发的分段命令清掉了。 |
| | | // 先作废本地 session/segment 状态,再按新路线重发,避免被旧状态反向卡住。 |
| | | if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) { |
| | | stationMoveCoordinator.cancelSession(taskNo); |
| | | } |
| | | if (context.resetSegmentCommandsBeforeDispatch()) { |
| | | resetSegmentMoveCommandsBeforeReroute(taskNo); |
| | | } |
| | | } |
| | | if (context.checkRecentDispatch() |
| | | && shouldSkipIdleRecoverForRecentDispatch(taskNo, stationId)) { |
| | | return RerouteExecutionResult.skip("recent-dispatch"); |
| | |
| | | return RerouteExecutionResult.skip("out-order-lock"); |
| | | } |
| | | |
| | | if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) { |
| | | // 切路前先把旧 session 置为 CANCEL_PENDING,让已经排队中的旧分段线程在最终发送前停下。 |
| | | stationMoveCoordinator.markCancelPending(taskNo, "reroute_pending"); |
| | | } |
| | | |
| | | if (runBlockReroute) { |
| | | // 站点进入堵塞后,设备侧可能已经把之前预下发的分段命令清掉了。 |
| | | // 先作废本地 session/segment 状态,再按新路线重发,避免被旧状态反向卡住。 |
| | | if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) { |
| | | stationMoveCoordinator.cancelSession(taskNo); |
| | | } |
| | | if (context.resetSegmentCommandsBeforeDispatch()) { |
| | | resetSegmentMoveCommandsBeforeReroute(taskNo); |
| | | } |
| | | } |
| | | |
| | | if (!runBlockReroute |
| | | && context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) { |
| | | stationMoveCoordinator.cancelSession(taskNo); |
| New file |
| | |
| | | package com.zy.core.thread.impl.station; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.core.common.SpringUtils; |
| | | import com.zy.asrs.entity.DeviceConfig; |
| | | import com.zy.common.utils.RedisUtil; |
| | | import com.zy.core.enums.RedisKeyType; |
| | | import com.zy.core.model.CommandResponse; |
| | | import com.zy.core.model.command.StationCommand; |
| | | import com.zy.core.move.StationMoveCoordinator; |
| | | import com.zy.core.move.StationMoveSessionRegistry; |
| | | import com.zy.core.move.StationMoveSession; |
| | | import org.junit.jupiter.api.Test; |
| | | import org.springframework.context.ApplicationContext; |
| | | import org.springframework.test.util.ReflectionTestUtils; |
| | | |
| | | import java.util.function.Function; |
| | | |
| | | import static org.junit.jupiter.api.Assertions.assertFalse; |
| | | import static org.mockito.ArgumentMatchers.any; |
| | | import static org.mockito.Mockito.mock; |
| | | import static org.mockito.Mockito.never; |
| | | import static org.mockito.Mockito.verify; |
| | | import static org.mockito.Mockito.when; |
| | | |
| | | class StationSegmentExecutorTest { |
| | | |
| | | @Test |
| | | void sendSegmentWithRetry_skipsWhenRouteIsCancelPending() { |
| | | ApplicationContext applicationContext = mock(ApplicationContext.class); |
| | | StationMoveCoordinator coordinator = new StationMoveCoordinator(); |
| | | StationMoveSessionRegistry sessionRegistry = new StationMoveSessionRegistry(); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | @SuppressWarnings("unchecked") |
| | | Function<StationCommand, CommandResponse> commandSender = mock(Function.class); |
| | | |
| | | ReflectionTestUtils.setField(coordinator, "sessionRegistry", sessionRegistry); |
| | | ReflectionTestUtils.setField(coordinator, "redisUtil", redisUtil); |
| | | ReflectionTestUtils.setField(sessionRegistry, "redisUtil", redisUtil); |
| | | when(applicationContext.getBean(StationMoveCoordinator.class)).thenReturn(coordinator); |
| | | SpringUtils.init(applicationContext); |
| | | |
| | | StationSegmentExecutor executor = new StationSegmentExecutor(new DeviceConfig(), redisUtil, commandSender); |
| | | |
| | | StationCommand command = new StationCommand(); |
| | | command.setTaskNo(10492); |
| | | command.setStationId(186); |
| | | command.setTargetStaNo(124); |
| | | command.setRouteVersion(23); |
| | | |
| | | StationMoveSession session = new StationMoveSession(); |
| | | session.setTaskNo(10492); |
| | | session.setRouteVersion(23); |
| | | session.setStatus(StationMoveSession.STATUS_CANCEL_PENDING); |
| | | |
| | | when(redisUtil.get(RedisKeyType.STATION_MOVE_SESSION_.key + 10492)) |
| | | .thenReturn(JSON.toJSONString(session)); |
| | | when(redisUtil.get(RedisKeyType.DEVICE_STATION_MOVE_RESET.key + 10492)).thenReturn(null); |
| | | when(commandSender.apply(any())).thenReturn(new CommandResponse(true)); |
| | | |
| | | Boolean result = ReflectionTestUtils.invokeMethod( |
| | | executor, |
| | | "sendSegmentWithRetry", |
| | | command, |
| | | null, |
| | | null, |
| | | 186 |
| | | ); |
| | | |
| | | assertFalse(Boolean.TRUE.equals(result)); |
| | | verify(commandSender, never()).apply(any()); |
| | | } |
| | | } |
| | |
| | | import static org.mockito.ArgumentMatchers.eq; |
| | | import static org.mockito.ArgumentMatchers.same; |
| | | import static org.mockito.Mockito.mock; |
| | | import static org.mockito.Mockito.inOrder; |
| | | import static org.mockito.Mockito.times; |
| | | import static org.mockito.Mockito.never; |
| | | import static org.mockito.Mockito.verify; |
| | | import static org.mockito.Mockito.when; |
| | | |
| | | class StationOperateProcessUtilsReroutePipelineTest { |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | private void stubTaskDispatchLock(StationMoveCoordinator coordinator) { |
| | | when(coordinator.withTaskDispatchLock(any(), any())).thenAnswer(invocation -> { |
| | | java.util.function.Supplier<Object> supplier = invocation.getArgument(1); |
| | | return supplier == null ? null : supplier.get(); |
| | | }); |
| | | } |
| | | |
| | | @Test |
| | | void choosesRunBlockCommandBuilderForRunBlockRerouteScene() { |
| | |
| | | StationOperateProcessUtils utils = new StationOperateProcessUtils(); |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator); |
| | | ReflectionTestUtils.setField(utils, "redisUtil", redisUtil); |
| | | |
| | |
| | | StationOperateProcessUtils utils = new StationOperateProcessUtils(); |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator); |
| | | ReflectionTestUtils.setField(utils, "redisUtil", redisUtil); |
| | | |
| | |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | BasStationOptService basStationOptService = mock(BasStationOptService.class); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator); |
| | | ReflectionTestUtils.setField(utils, "basStationOptService", basStationOptService); |
| | | ReflectionTestUtils.setField(utils, "redisUtil", redisUtil); |
| | |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | StationThread stationThread = mock(StationThread.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | |
| | | ReflectionTestUtils.setField(utils, "basDevpService", basDevpService); |
| | | ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService); |
| | |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | StationThread stationThread = mock(StationThread.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | |
| | | ReflectionTestUtils.setField(utils, "basDevpService", basDevpService); |
| | | ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService); |
| | |
| | | void executePlan_runBlockReroute_reissuesWhenBlockedSessionMatchesCandidatePath() { |
| | | StationOperateProcessUtils utils = new StationOperateProcessUtils(); |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator); |
| | | ReflectionTestUtils.setField(utils, "redisUtil", redisUtil); |
| | | |
| | | StationCommand command = new StationCommand(); |
| | | command.setTaskNo(100); |
| | |
| | | ); |
| | | |
| | | assertTrue(!result.skipped()); |
| | | org.mockito.InOrder inOrder = inOrder(coordinator); |
| | | inOrder.verify(coordinator).markCancelPending(100, "reroute_pending"); |
| | | inOrder.verify(coordinator).cancelSession(100); |
| | | verify(coordinator, times(1)).cancelSession(100); |
| | | } finally { |
| | | MessageQueue.clear(SlaveType.Devp, 1); |
| | |
| | | StationOperateProcessUtils utils = new StationOperateProcessUtils(); |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator); |
| | | ReflectionTestUtils.setField(utils, "redisUtil", redisUtil); |
| | | |
| | |
| | | StationOperateProcessUtils utils = new StationOperateProcessUtils(); |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator); |
| | | ReflectionTestUtils.setField(utils, "redisUtil", redisUtil); |
| | | |
| | |
| | | WrkAnalysisService wrkAnalysisService = mock(WrkAnalysisService.class); |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | StationThread stationThread = mock(StationThread.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | |
| | | ReflectionTestUtils.setField(utils, "basDevpService", basDevpService); |
| | | ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService); |
| | |
| | | NotifyUtils notifyUtils = mock(NotifyUtils.class); |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | StationThread stationThread = mock(StationThread.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | |
| | | ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService); |
| | | ReflectionTestUtils.setField(utils, "redisUtil", redisUtil); |
| | |
| | | StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class); |
| | | RedisUtil redisUtil = mock(RedisUtil.class); |
| | | StationThread stationThread = mock(StationThread.class); |
| | | stubTaskDispatchLock(coordinator); |
| | | |
| | | ReflectionTestUtils.setField(utils, "basDevpService", basDevpService); |
| | | ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService); |