#
Junjie
3 小时以前 f3b64d003bc3458af3dd434e6187d3aba23a64aa
#
1个文件已添加
5个文件已修改
268 ■■■■ 已修改文件
src/main/java/com/zy/core/move/StationMoveCoordinator.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/move/StationMoveSessionRegistry.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/utils/StationOperateProcessUtils.java 38 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/move/StationMoveCoordinator.java
@@ -12,15 +12,19 @@
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
@Component
public class StationMoveCoordinator {
    private static final int SESSION_EXPIRE_SECONDS = 60 * 60 * 24;
    private final Map<Integer, ReentrantLock> taskDispatchLocks = new ConcurrentHashMap<>();
    @Autowired
    private StationMoveSessionRegistry sessionRegistry;
@@ -36,6 +40,10 @@
    public boolean isActiveRoute(Integer taskNo, Integer routeVersion) {
        return sessionRegistry != null && sessionRegistry.isActiveRoute(taskNo, routeVersion);
    }
    public boolean canDispatchRoute(Integer taskNo, Integer routeVersion) {
        return sessionRegistry != null && sessionRegistry.canDispatchRoute(taskNo, routeVersion);
    }
    public void markSegmentIssued(Integer taskNo, Integer routeVersion) {
@@ -76,6 +84,23 @@
        saveSession(session);
    }
    public <T> T withTaskDispatchLock(Integer taskNo, Supplier<T> supplier) {
        if (supplier == null) {
            return null;
        }
        if (taskNo == null || taskNo <= 0) {
            return supplier.get();
        }
        // 同一任务的切路和分段发送必须共享一把锁,避免旧 routeVersion 在线程晚到时继续把上一条段命令写出去。
        ReentrantLock lock = taskDispatchLocks.computeIfAbsent(taskNo, key -> new ReentrantLock());
        lock.lock();
        try {
            return supplier.get();
        } finally {
            lock.unlock();
        }
    }
    public boolean shouldSuppressDispatch(Integer taskNo, Integer currentStationId, StationCommand candidateCommand) {
        if (taskNo == null || taskNo <= 0 || currentStationId == null || candidateCommand == null) {
            return false;
src/main/java/com/zy/core/move/StationMoveSessionRegistry.java
@@ -63,6 +63,15 @@
                && Objects.equals(routeVersion, session.getRouteVersion());
    }
    public synchronized boolean canDispatchRoute(Integer taskNo, Integer routeVersion) {
        StationMoveSession session = load(taskNo);
        if (session == null || routeVersion == null || !Objects.equals(routeVersion, session.getRouteVersion())) {
            return false;
        }
        return StationMoveSession.STATUS_WAITING.equals(session.getStatus())
                || StationMoveSession.STATUS_RUNNING.equals(session.getStatus());
    }
    public synchronized boolean shouldSkipOutOrderDecision(Integer taskNo, Integer currentStationId) {
        StationMoveSession session = load(taskNo);
        if (session == null || !session.isActive() || currentStationId == null) {
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -35,6 +35,12 @@
    private final Function<StationCommand, CommandResponse> commandSender;
    private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
    private enum SegmentSendResult {
        DISPATCHED,
        CANCELLED,
        RETRY
    }
    public StationSegmentExecutor(DeviceConfig deviceConfig,
                                  RedisUtil redisUtil,
                                  Function<StationCommand, CommandResponse> commandSender) {
@@ -86,7 +92,7 @@
        boolean firstRun = true;
        while (true) {
            try {
                if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
                if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) {
                    if (traceRegistry != null) {
                        traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
                                buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
@@ -191,39 +197,65 @@
                                         Integer traceVersion,
                                         Integer currentStationId) {
        while (true) {
            if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
                if (traceRegistry != null && command != null) {
                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                            buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
                }
                markCancelled(command == null ? null : command.getTaskNo(),
                        command == null ? null : command.getRouteVersion(),
                        currentStationId,
                        "route_version_replaced");
            SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
            if (sendResult == SegmentSendResult.CANCELLED) {
                return false;
            }
            if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
                if (traceRegistry != null && command != null) {
                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                            buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
                }
                markCancelled(command == null ? null : command.getTaskNo(),
                        command == null ? null : command.getRouteVersion(),
                        currentStationId,
                        "redis_cancel_signal");
                return false;
            }
            CommandResponse commandResponse = commandSender.apply(command);
            if (commandResponse == null) {
            if (sendResult == SegmentSendResult.RETRY) {
                sleepQuietly(200L);
                continue;
            }
            if (commandResponse.getResult()) {
                markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
                return true;
            }
            sleepQuietly(200L);
            return true;
        }
    }
    private SegmentSendResult executeLockedSegmentSend(StationCommand command,
                                                       StationTaskTraceRegistry traceRegistry,
                                                       Integer traceVersion,
                                                       Integer currentStationId) {
        Integer taskNo = command == null ? null : command.getTaskNo();
        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
        if (moveCoordinator != null) {
            // 分段发送的最终检查和实际下发需要与 reroute 共用任务锁。
            // 这样切路线程一旦进入 CANCEL_PENDING/RESET,旧路线就不能再穿过最后这一步发到设备侧。
            return moveCoordinator.withTaskDispatchLock(taskNo,
                    () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId));
        }
        return doSendSegment(command, traceRegistry, traceVersion, currentStationId);
    }
    private SegmentSendResult doSendSegment(StationCommand command,
                                            StationTaskTraceRegistry traceRegistry,
                                            Integer traceVersion,
                                            Integer currentStationId) {
        if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
            if (traceRegistry != null && command != null) {
                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                        buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
            }
            markCancelled(command == null ? null : command.getTaskNo(),
                    command == null ? null : command.getRouteVersion(),
                    currentStationId,
                    "route_version_replaced");
            return SegmentSendResult.CANCELLED;
        }
        if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
            if (traceRegistry != null && command != null) {
                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                        buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
            }
            markCancelled(command == null ? null : command.getTaskNo(),
                    command == null ? null : command.getRouteVersion(),
                    currentStationId,
                    "redis_cancel_signal");
            return SegmentSendResult.CANCELLED;
        }
        CommandResponse commandResponse = commandSender.apply(command);
        if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) {
            return SegmentSendResult.RETRY;
        }
        markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
        return SegmentSendResult.DISPATCHED;
    }
    private double loadSegmentAdvanceRatio() {
@@ -401,15 +433,19 @@
        }
    }
    private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
    private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) {
        // Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
        // do not register a move session and therefore have no routeVersion.
        // They should keep the historical behavior and execute normally.
        if (taskNo == null || routeVersion == null) {
            return true;
        }
        StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
        return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
        return moveCoordinator == null || moveCoordinator.canDispatchRoute(taskNo, routeVersion);
    }
    private StationMoveCoordinator loadMoveCoordinator() {
        return SpringUtils.getBean(StationMoveCoordinator.class);
    }
    private void markSegmentIssued(Integer taskNo, Integer routeVersion) {
src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
@@ -804,17 +804,19 @@
        if (taskNo == null || taskNo <= 0 || stationId == null) {
            return RerouteExecutionResult.skip("invalid-station-task");
        }
        boolean runBlockReroute = context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE;
        if (runBlockReroute) {
            // 站点进入堵塞后,设备侧可能已经把之前预下发的分段命令清掉了。
            // 先作废本地 session/segment 状态,再按新路线重发,避免被旧状态反向卡住。
            if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
                stationMoveCoordinator.cancelSession(taskNo);
            }
            if (context.resetSegmentCommandsBeforeDispatch()) {
                resetSegmentMoveCommandsBeforeReroute(taskNo);
            }
        if (stationMoveCoordinator != null) {
            return stationMoveCoordinator.withTaskDispatchLock(taskNo,
                    () -> executeReroutePlanWithTaskLock(context, plan, stationProtocol, taskNo, stationId));
        }
        return executeReroutePlanWithTaskLock(context, plan, stationProtocol, taskNo, stationId);
    }
    private RerouteExecutionResult executeReroutePlanWithTaskLock(RerouteContext context,
                                                                  RerouteCommandPlan plan,
                                                                  StationProtocol stationProtocol,
                                                                  Integer taskNo,
                                                                  Integer stationId) {
        boolean runBlockReroute = context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE;
        if (context.checkRecentDispatch()
                && shouldSkipIdleRecoverForRecentDispatch(taskNo, stationId)) {
            return RerouteExecutionResult.skip("recent-dispatch");
@@ -848,6 +850,22 @@
            return RerouteExecutionResult.skip("out-order-lock");
        }
        if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
            // 切路前先把旧 session 置为 CANCEL_PENDING,让已经排队中的旧分段线程在最终发送前停下。
            stationMoveCoordinator.markCancelPending(taskNo, "reroute_pending");
        }
        if (runBlockReroute) {
            // 站点进入堵塞后,设备侧可能已经把之前预下发的分段命令清掉了。
            // 先作废本地 session/segment 状态,再按新路线重发,避免被旧状态反向卡住。
            if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
                stationMoveCoordinator.cancelSession(taskNo);
            }
            if (context.resetSegmentCommandsBeforeDispatch()) {
                resetSegmentMoveCommandsBeforeReroute(taskNo);
            }
        }
        if (!runBlockReroute
                && context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
            stationMoveCoordinator.cancelSession(taskNo);
src/test/java/com/zy/core/thread/impl/station/StationSegmentExecutorTest.java
New file
@@ -0,0 +1,73 @@
package com.zy.core.thread.impl.station;
import com.alibaba.fastjson.JSON;
import com.core.common.SpringUtils;
import com.zy.asrs.entity.DeviceConfig;
import com.zy.common.utils.RedisUtil;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.model.CommandResponse;
import com.zy.core.model.command.StationCommand;
import com.zy.core.move.StationMoveCoordinator;
import com.zy.core.move.StationMoveSessionRegistry;
import com.zy.core.move.StationMoveSession;
import org.junit.jupiter.api.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class StationSegmentExecutorTest {
    @Test
    void sendSegmentWithRetry_skipsWhenRouteIsCancelPending() {
        ApplicationContext applicationContext = mock(ApplicationContext.class);
        StationMoveCoordinator coordinator = new StationMoveCoordinator();
        StationMoveSessionRegistry sessionRegistry = new StationMoveSessionRegistry();
        RedisUtil redisUtil = mock(RedisUtil.class);
        @SuppressWarnings("unchecked")
        Function<StationCommand, CommandResponse> commandSender = mock(Function.class);
        ReflectionTestUtils.setField(coordinator, "sessionRegistry", sessionRegistry);
        ReflectionTestUtils.setField(coordinator, "redisUtil", redisUtil);
        ReflectionTestUtils.setField(sessionRegistry, "redisUtil", redisUtil);
        when(applicationContext.getBean(StationMoveCoordinator.class)).thenReturn(coordinator);
        SpringUtils.init(applicationContext);
        StationSegmentExecutor executor = new StationSegmentExecutor(new DeviceConfig(), redisUtil, commandSender);
        StationCommand command = new StationCommand();
        command.setTaskNo(10492);
        command.setStationId(186);
        command.setTargetStaNo(124);
        command.setRouteVersion(23);
        StationMoveSession session = new StationMoveSession();
        session.setTaskNo(10492);
        session.setRouteVersion(23);
        session.setStatus(StationMoveSession.STATUS_CANCEL_PENDING);
        when(redisUtil.get(RedisKeyType.STATION_MOVE_SESSION_.key + 10492))
                .thenReturn(JSON.toJSONString(session));
        when(redisUtil.get(RedisKeyType.DEVICE_STATION_MOVE_RESET.key + 10492)).thenReturn(null);
        when(commandSender.apply(any())).thenReturn(new CommandResponse(true));
        Boolean result = ReflectionTestUtils.invokeMethod(
                executor,
                "sendSegmentWithRetry",
                command,
                null,
                null,
                186
        );
        assertFalse(Boolean.TRUE.equals(result));
        verify(commandSender, never()).apply(any());
    }
}
src/test/java/com/zy/core/utils/StationOperateProcessUtilsReroutePipelineTest.java
@@ -42,12 +42,21 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class StationOperateProcessUtilsReroutePipelineTest {
    @SuppressWarnings("unchecked")
    private void stubTaskDispatchLock(StationMoveCoordinator coordinator) {
        when(coordinator.withTaskDispatchLock(any(), any())).thenAnswer(invocation -> {
            java.util.function.Supplier<Object> supplier = invocation.getArgument(1);
            return supplier == null ? null : supplier.get();
        });
    }
    @Test
    void choosesRunBlockCommandBuilderForRunBlockRerouteScene() {
@@ -181,6 +190,7 @@
        StationOperateProcessUtils utils = new StationOperateProcessUtils();
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        RedisUtil redisUtil = mock(RedisUtil.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
        ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -226,6 +236,7 @@
        StationOperateProcessUtils utils = new StationOperateProcessUtils();
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        RedisUtil redisUtil = mock(RedisUtil.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
        ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -273,6 +284,7 @@
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        BasStationOptService basStationOptService = mock(BasStationOptService.class);
        RedisUtil redisUtil = mock(RedisUtil.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
        ReflectionTestUtils.setField(utils, "basStationOptService", basStationOptService);
        ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -310,6 +322,7 @@
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        RedisUtil redisUtil = mock(RedisUtil.class);
        StationThread stationThread = mock(StationThread.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
        ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
@@ -371,6 +384,7 @@
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        RedisUtil redisUtil = mock(RedisUtil.class);
        StationThread stationThread = mock(StationThread.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
        ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
@@ -479,7 +493,10 @@
    void executePlan_runBlockReroute_reissuesWhenBlockedSessionMatchesCandidatePath() {
        StationOperateProcessUtils utils = new StationOperateProcessUtils();
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        RedisUtil redisUtil = mock(RedisUtil.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
        ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
        StationCommand command = new StationCommand();
        command.setTaskNo(100);
@@ -521,6 +538,9 @@
            );
            assertTrue(!result.skipped());
            org.mockito.InOrder inOrder = inOrder(coordinator);
            inOrder.verify(coordinator).markCancelPending(100, "reroute_pending");
            inOrder.verify(coordinator).cancelSession(100);
            verify(coordinator, times(1)).cancelSession(100);
        } finally {
            MessageQueue.clear(SlaveType.Devp, 1);
@@ -532,6 +552,7 @@
        StationOperateProcessUtils utils = new StationOperateProcessUtils();
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        RedisUtil redisUtil = mock(RedisUtil.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
        ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -582,6 +603,7 @@
        StationOperateProcessUtils utils = new StationOperateProcessUtils();
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        RedisUtil redisUtil = mock(RedisUtil.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "stationMoveCoordinator", coordinator);
        ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -636,6 +658,7 @@
        WrkAnalysisService wrkAnalysisService = mock(WrkAnalysisService.class);
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        StationThread stationThread = mock(StationThread.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
        ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
@@ -696,6 +719,7 @@
        NotifyUtils notifyUtils = mock(NotifyUtils.class);
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        StationThread stationThread = mock(StationThread.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);
        ReflectionTestUtils.setField(utils, "redisUtil", redisUtil);
@@ -749,6 +773,7 @@
        StationMoveCoordinator coordinator = mock(StationMoveCoordinator.class);
        RedisUtil redisUtil = mock(RedisUtil.class);
        StationThread stationThread = mock(StationThread.class);
        stubTaskDispatchLock(coordinator);
        ReflectionTestUtils.setField(utils, "basDevpService", basDevpService);
        ReflectionTestUtils.setField(utils, "wrkMastService", wrkMastService);