#
Junjie
3 小时以前 f3b64d003bc3458af3dd434e6187d3aba23a64aa
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -35,6 +35,12 @@
    private final Function<StationCommand, CommandResponse> commandSender;
    private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
    private enum SegmentSendResult {
        DISPATCHED,
        CANCELLED,
        RETRY
    }
    public StationSegmentExecutor(DeviceConfig deviceConfig,
                                  RedisUtil redisUtil,
                                  Function<StationCommand, CommandResponse> commandSender) {
@@ -86,7 +92,7 @@
        boolean firstRun = true;
        while (true) {
            try {
                if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
                if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) {
                    if (traceRegistry != null) {
                        traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
                                buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
@@ -191,39 +197,65 @@
                                         Integer traceVersion,
                                         Integer currentStationId) {
        while (true) {
            if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
                if (traceRegistry != null && command != null) {
                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                            buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
                }
                markCancelled(command == null ? null : command.getTaskNo(),
                        command == null ? null : command.getRouteVersion(),
                        currentStationId,
                        "route_version_replaced");
            SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
            if (sendResult == SegmentSendResult.CANCELLED) {
                return false;
            }
            if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
                if (traceRegistry != null && command != null) {
                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                            buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
                }
                markCancelled(command == null ? null : command.getTaskNo(),
                        command == null ? null : command.getRouteVersion(),
                        currentStationId,
                        "redis_cancel_signal");
                return false;
            }
            CommandResponse commandResponse = commandSender.apply(command);
            if (commandResponse == null) {
            if (sendResult == SegmentSendResult.RETRY) {
                sleepQuietly(200L);
                continue;
            }
            if (commandResponse.getResult()) {
                markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
                return true;
            }
            sleepQuietly(200L);
            return true;
        }
    }
    private SegmentSendResult executeLockedSegmentSend(StationCommand command,
                                                       StationTaskTraceRegistry traceRegistry,
                                                       Integer traceVersion,
                                                       Integer currentStationId) {
        Integer taskNo = command == null ? null : command.getTaskNo();
        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
        if (moveCoordinator != null) {
            // 分段发送的最终检查和实际下发需要与 reroute 共用任务锁。
            // 这样切路线程一旦进入 CANCEL_PENDING/RESET,旧路线就不能再穿过最后这一步发到设备侧。
            return moveCoordinator.withTaskDispatchLock(taskNo,
                    () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId));
        }
        return doSendSegment(command, traceRegistry, traceVersion, currentStationId);
    }
    private SegmentSendResult doSendSegment(StationCommand command,
                                            StationTaskTraceRegistry traceRegistry,
                                            Integer traceVersion,
                                            Integer currentStationId) {
        if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
            if (traceRegistry != null && command != null) {
                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                        buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
            }
            markCancelled(command == null ? null : command.getTaskNo(),
                    command == null ? null : command.getRouteVersion(),
                    currentStationId,
                    "route_version_replaced");
            return SegmentSendResult.CANCELLED;
        }
        if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
            if (traceRegistry != null && command != null) {
                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                        buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
            }
            markCancelled(command == null ? null : command.getTaskNo(),
                    command == null ? null : command.getRouteVersion(),
                    currentStationId,
                    "redis_cancel_signal");
            return SegmentSendResult.CANCELLED;
        }
        CommandResponse commandResponse = commandSender.apply(command);
        if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) {
            return SegmentSendResult.RETRY;
        }
        markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
        return SegmentSendResult.DISPATCHED;
    }
    private double loadSegmentAdvanceRatio() {
@@ -401,15 +433,19 @@
        }
    }
    private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
    private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) {
        // Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
        // do not register a move session and therefore have no routeVersion.
        // They should keep the historical behavior and execute normally.
        if (taskNo == null || routeVersion == null) {
            return true;
        }
        StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
        return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
        return moveCoordinator == null || moveCoordinator.canDispatchRoute(taskNo, routeVersion);
    }
    private StationMoveCoordinator loadMoveCoordinator() {
        return SpringUtils.getBean(StationMoveCoordinator.class);
    }
    private void markSegmentIssued(Integer taskNo, Integer routeVersion) {