#
Junjie
9 天以前 dc3f9cc91759823ce59486f19b138be4b296a0f1
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -6,6 +6,7 @@
import com.zy.asrs.domain.vo.StationTaskTraceSegmentVo;
import com.zy.asrs.entity.DeviceConfig;
import com.zy.common.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import com.zy.core.cache.SlaveConnection;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
@@ -15,6 +16,8 @@
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.move.StationMoveCoordinator;
import com.zy.core.trace.StationTaskTraceRegistry;
import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider;
import com.zy.core.thread.support.StationTaskLocationRegistry;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
@@ -24,16 +27,24 @@
import java.util.Map;
import java.util.function.Function;
@Slf4j
public class StationSegmentExecutor {
    private static final String CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio";
    private static final double DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = 0.3d;
    private static final long CURRENT_STATION_TIMEOUT_MS = 1000L * 60L;
    private static final long TASK_LOCATION_STALE_MS = 2_000L;
    private final DeviceConfig deviceConfig;
    private final RedisUtil redisUtil;
    private final Function<StationCommand, CommandResponse> commandSender;
    private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
    private enum SegmentSendResult {
        DISPATCHED,
        CANCELLED,
        RETRY
    }
    public StationSegmentExecutor(DeviceConfig deviceConfig,
                                  RedisUtil redisUtil,
@@ -83,10 +94,11 @@
        long lastSeenAt = System.currentTimeMillis();
        int segCursor = 0;
        Integer lastCurrentStationId = null;
        int lastMatchedPathIndex = -1;
        boolean firstRun = true;
        while (true) {
            try {
                if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
                if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) {
                    if (traceRegistry != null) {
                        traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
                                buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
@@ -138,12 +150,17 @@
                    break;
                }
                int currentIndex = effectiveFullPath.indexOf(currentStation.getStationId());
                int currentIndex = resolveCurrentPathIndex(
                        effectiveFullPath,
                        currentStation.getStationId(),
                        lastMatchedPathIndex
                );
                if (currentIndex < 0) {
                    Thread.sleep(500L);
                    firstRun = false;
                    continue;
                }
                lastMatchedPathIndex = currentIndex;
                int remaining = effectiveFullPath.size() - currentIndex - 1;
                if (remaining <= 0) {
@@ -186,47 +203,129 @@
        }
    }
    private int resolveCurrentPathIndex(List<Integer> fullPathStationIds,
                                        Integer currentStationId,
                                        int lastMatchedPathIndex) {
        if (fullPathStationIds == null || fullPathStationIds.isEmpty() || currentStationId == null) {
            return -1;
        }
        if (lastMatchedPathIndex >= 0
                && lastMatchedPathIndex < fullPathStationIds.size()
                && equalsInteger(currentStationId, fullPathStationIds.get(lastMatchedPathIndex))) {
            return lastMatchedPathIndex;
        }
        int nextIndex = findNextStationIndex(fullPathStationIds, currentStationId, Math.max(lastMatchedPathIndex + 1, 0));
        if (nextIndex >= 0) {
            return nextIndex;
        }
        return findNextStationIndex(fullPathStationIds, currentStationId, 0);
    }
    private int findNextStationIndex(List<Integer> path, Integer stationId, int fromIndex) {
        if (path == null || path.isEmpty() || stationId == null) {
            return -1;
        }
        int startIdx = Math.max(fromIndex, 0);
        for (int i = startIdx; i < path.size(); i++) {
            if (equalsInteger(stationId, path.get(i))) {
                return i;
            }
        }
        return -1;
    }
    private boolean sendSegmentWithRetry(StationCommand command,
                                         StationTaskTraceRegistry traceRegistry,
                                         Integer traceVersion,
                                         Integer currentStationId) {
        // 在下发新分段前检查路由版本是否仍然有效,避免在路由版本已更新的情况下下发旧版本命令
        if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
            // 首次校验失败可能是 Redis 写入延迟导致的,短暂等待后重试一次。
            sleepQuietly(50L);
            if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
                if (traceRegistry != null && command != null) {
                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                            buildDetails("reason", "route_version_replaced_before_segment_send", "routeVersion", command.getRouteVersion()));
                }
                markCancelled(command == null ? null : command.getTaskNo(),
                        command == null ? null : command.getRouteVersion(),
                        currentStationId,
                        "route_version_replaced_before_segment_send");
                return false;
            }
        }
        while (true) {
            if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
                if (traceRegistry != null && command != null) {
                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                            buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
                }
                markCancelled(command == null ? null : command.getTaskNo(),
                        command == null ? null : command.getRouteVersion(),
                        currentStationId,
                        "route_version_replaced");
            SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
            if (sendResult == SegmentSendResult.CANCELLED) {
                return false;
            }
            if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
                if (traceRegistry != null && command != null) {
                    traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                            buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
                }
                markCancelled(command == null ? null : command.getTaskNo(),
                        command == null ? null : command.getRouteVersion(),
                        currentStationId,
                        "redis_cancel_signal");
                return false;
            }
            CommandResponse commandResponse = commandSender.apply(command);
            if (commandResponse == null) {
            if (sendResult == SegmentSendResult.RETRY) {
                sleepQuietly(200L);
                continue;
            }
            if (commandResponse.getResult()) {
                markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
                return true;
            }
            sleepQuietly(200L);
            return true;
        }
    }
    private SegmentSendResult executeLockedSegmentSend(StationCommand command,
                                                       StationTaskTraceRegistry traceRegistry,
                                                       Integer traceVersion,
                                                       Integer currentStationId) {
        Integer taskNo = command == null ? null : command.getTaskNo();
        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
        if (moveCoordinator != null) {
            // 分段发送的最终检查和实际下发需要与 reroute 共用任务锁。
            // 这样切路线程一旦进入 CANCEL_PENDING/RESET,旧路线就不能再穿过最后这一步发到设备侧。
            return moveCoordinator.withTaskDispatchLock(taskNo,
                    () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId));
        }
        return doSendSegment(command, traceRegistry, traceVersion, currentStationId);
    }
    private SegmentSendResult doSendSegment(StationCommand command,
                                            StationTaskTraceRegistry traceRegistry,
                                            Integer traceVersion,
                                            Integer currentStationId) {
        if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
            if (traceRegistry != null && command != null) {
                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                        buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
            }
            markCancelled(command == null ? null : command.getTaskNo(),
                    command == null ? null : command.getRouteVersion(),
                    currentStationId,
                    "route_version_replaced");
            return SegmentSendResult.CANCELLED;
        }
        if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
            if (traceRegistry != null && command != null) {
                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                        buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
            }
            markCancelled(command == null ? null : command.getTaskNo(),
                    command == null ? null : command.getRouteVersion(),
                    currentStationId,
                    "redis_cancel_signal");
            return SegmentSendResult.CANCELLED;
        }
        CommandResponse commandResponse = commandSender.apply(command);
        if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) {
            return SegmentSendResult.RETRY;
        }
        markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
        return SegmentSendResult.DISPATCHED;
    }
    private double loadSegmentAdvanceRatio() {
        if (isV5ThreadImpl()) {
            StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class);
            if (configProvider != null) {
                return configProvider.getSegmentAdvanceRatio();
            }
            return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
        }
        try {
            ConfigService configService = SpringUtils.getBean(ConfigService.class);
            if (configService == null) {
@@ -280,6 +379,9 @@
    }
    private StationProtocol findCurrentStationByTask(Integer taskNo) {
        if (isV5ThreadImpl()) {
            return findCurrentStationByTaskFromRegistry(taskNo);
        }
        try {
            com.zy.asrs.service.DeviceConfigService deviceConfigService = SpringUtils.getBean(com.zy.asrs.service.DeviceConfigService.class);
            if (deviceConfigService == null) {
@@ -306,6 +408,27 @@
            return null;
        }
        return null;
    }
    private StationProtocol findCurrentStationByTaskFromRegistry(Integer taskNo) {
        StationTaskLocationRegistry registry = SpringUtils.getBean(StationTaskLocationRegistry.class);
        if (registry == null) {
            return null;
        }
        StationTaskLocationRegistry.TaskLocationSnapshot snapshot = registry.findActive(taskNo, TASK_LOCATION_STALE_MS);
        if (snapshot == null || !snapshot.isLoading()) {
            return null;
        }
        StationProtocol stationProtocol = new StationProtocol();
        stationProtocol.setTaskNo(snapshot.getTaskNo());
        stationProtocol.setStationId(snapshot.getStationId());
        stationProtocol.setRunBlock(snapshot.isRunBlock());
        stationProtocol.setLoading(true);
        return stationProtocol;
    }
    private boolean isV5ThreadImpl() {
        return deviceConfig != null && "ZyStationV5Thread".equals(deviceConfig.getThreadImpl());
    }
    private List<StationTaskTraceSegmentVo> buildTraceSegments(List<StationCommand> segmentCommands) {
@@ -401,15 +524,27 @@
        }
    }
    private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
    private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) {
        // Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
        // do not register a move session and therefore have no routeVersion.
        // They should keep the historical behavior and execute normally.
        if (taskNo == null || routeVersion == null) {
            return true;
        }
        StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
        return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
        StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
        if (moveCoordinator == null) {
            return true;
        }
        boolean dispatchable = moveCoordinator.canDispatchRoute(taskNo, routeVersion);
        if (!dispatchable) {
            log.warn("isRouteDispatchable rejected, taskNo={}, routeVersion={}, threadImpl={}",
                    taskNo, routeVersion, deviceConfig == null ? null : deviceConfig.getThreadImpl());
        }
        return dispatchable;
    }
    private StationMoveCoordinator loadMoveCoordinator() {
        return SpringUtils.getBean(StationMoveCoordinator.class);
    }
    private void markSegmentIssued(Integer taskNo, Integer routeVersion) {