Junjie
8 小时以前 0a9c9bf4fa908d9d0bbdd910b70fcd0650e0df1d
#出库输送执行
2个文件已修改
160 ■■■■■ 已修改文件
src/main/java/com/zy/core/utils/station/StationOutboundDispatchProcessor.java 151 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application.yml 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/utils/station/StationOutboundDispatchProcessor.java
@@ -21,6 +21,7 @@
import com.zy.core.model.command.StationCommand;
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.move.StationMoveCoordinator;
import com.zy.core.move.StationMoveSession;
import com.zy.core.thread.StationThread;
import com.zy.core.utils.station.model.DispatchLimitConfig;
import com.zy.core.utils.station.model.LoadGuardState;
@@ -28,6 +29,7 @@
import com.zy.core.utils.station.model.OutOrderDispatchDecision;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Value;
import java.util.Date;
import java.util.List;
@@ -38,6 +40,9 @@
public class StationOutboundDispatchProcessor {
    private static final int PENDING_DISPATCH_EXPIRE_SECONDS = 60 * 10;
    private static final long DEFAULT_PENDING_DISCOVER_TIMEOUT_MS = 60_000L;
    private static final long DEFAULT_PENDING_SESSION_PROTECT_MS = 90_000L;
    private static final long DEFAULT_RECENT_DISPATCH_PROTECT_MS = 60_000L;
    @Autowired
    private WrkMastService wrkMastService;
@@ -57,6 +62,15 @@
    private StationDispatchLoadSupport stationDispatchLoadSupport;
    @Autowired
    private StationOutboundDecisionSupport stationOutboundDecisionSupport;
    @Autowired
    private StationDispatchRuntimeStateSupport stationDispatchRuntimeStateSupport;
    @Value("${station.outbound.pending-discover-timeout-seconds:60}")
    private long pendingDiscoverTimeoutSeconds;
    @Value("${station.outbound.pending-session-protect-seconds:90}")
    private long pendingSessionProtectSeconds;
    @Value("${station.outbound.recent-dispatch-protect-seconds:60}")
    private long recentDispatchProtectSeconds;
    public void crnStationOutExecute() {
        try {
@@ -119,11 +133,22 @@
                } catch (Exception ignore) {
                    createdAt = System.currentTimeMillis();
                }
                if (System.currentTimeMillis() - createdAt < 15_000L) {
                long pendingDiscoverTimeoutMs = resolvePendingDiscoverTimeoutMs();
                if (System.currentTimeMillis() - createdAt < pendingDiscoverTimeoutMs) {
                    return;
                }
                PendingDispatchGuardResult guardResult = evaluatePendingDispatchGuard(wrkMast, pendingStationObjModel, pendingStationThread);
                if (guardResult.keepPending()) {
                    News.info("输送站点等待源站执行超时后继续保持等待。工作号={},源站={},原因={}",
                            wrkMast.getWrkNo(),
                            pendingStationObjModel.getStationId(),
                            guardResult.reason());
                    markPendingDispatch(wrkMast.getWrkNo());
                    return;
                }
                clearPendingDispatch(wrkMast.getWrkNo());
                News.warn("输送站点执行超时,已释放重试资格。工作号={}", wrkMast.getWrkNo());
                News.warn("输送站点执行确认超时,且未发现有效活动链路,已释放重试资格。工作号={}", wrkMast.getWrkNo());
            }
            StationObjModel stationObjModel = getOutboundSourceStation(wrkMast);
@@ -330,4 +355,126 @@
        }
        redisUtil.del(RedisKeyType.STATION_OUT_PENDING_DISPATCH_.key + wrkNo);
    }
    private PendingDispatchGuardResult evaluatePendingDispatchGuard(WrkMast wrkMast,
                                                                    StationObjModel stationObjModel,
                                                                    StationThread stationThread) {
        if (wrkMast == null || wrkMast.getWrkNo() == null || stationObjModel == null || stationObjModel.getStationId() == null) {
            return PendingDispatchGuardResult.release("missing-runtime-context");
        }
        if (hasActiveMoveSession(wrkMast.getWrkNo(), stationObjModel.getStationId())) {
            return PendingDispatchGuardResult.keep("active-session");
        }
        if (hasRecentSuccessfulDispatch(wrkMast.getWrkNo(), stationObjModel.getStationId())) {
            return PendingDispatchGuardResult.keep("recent-successful-dispatch");
        }
        if (taskExistsInBuffer(stationThread, wrkMast.getWrkNo())) {
            return PendingDispatchGuardResult.keep("task-buffer-detected");
        }
        return PendingDispatchGuardResult.release("no-active-chain");
    }
    private boolean hasActiveMoveSession(Integer wrkNo, Integer sourceStationId) {
        if (wrkNo == null || wrkNo <= 0 || stationMoveCoordinator == null || sourceStationId == null) {
            return false;
        }
        StationMoveSession session = stationMoveCoordinator.loadSession(wrkNo);
        if (session == null || !session.isActive()) {
            return false;
        }
        if (!Objects.equals(sourceStationId, session.getDispatchStationId())) {
            return false;
        }
        long now = System.currentTimeMillis();
        Long lastIssuedAt = session.getLastIssuedAt();
        if (lastIssuedAt != null && now - lastIssuedAt <= resolvePendingSessionProtectMs()) {
            return true;
        }
        Long updatedAt = session.getUpdatedAt();
        return updatedAt != null && now - updatedAt <= resolvePendingSessionProtectMs();
    }
    private boolean hasRecentSuccessfulDispatch(Integer wrkNo, Integer sourceStationId) {
        if (stationDispatchRuntimeStateSupport == null) {
            return false;
        }
        return stationDispatchRuntimeStateSupport.hasRecentIssuedMoveCommand(
                wrkNo,
                sourceStationId,
                resolveRecentDispatchProtectMs()
        );
    }
    private boolean taskExistsInBuffer(StationThread stationThread, Integer wrkNo) {
        if (stationThread == null || wrkNo == null || wrkNo <= 0) {
            return false;
        }
        Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
        if (statusMap == null || statusMap.isEmpty()) {
            return false;
        }
        for (StationProtocol stationProtocol : statusMap.values()) {
            if (stationProtocol == null || stationProtocol.getTaskBufferItems() == null) {
                continue;
            }
            if (containsTaskNo(stationProtocol, wrkNo)) {
                return true;
            }
        }
        return false;
    }
    private boolean containsTaskNo(StationProtocol stationProtocol, Integer wrkNo) {
        if (stationProtocol == null || stationProtocol.getTaskBufferItems() == null || wrkNo == null) {
            return false;
        }
        return stationProtocol.getTaskBufferItems().stream()
                .filter(Objects::nonNull)
                .anyMatch(item -> Objects.equals(wrkNo, item.getTaskNo()));
    }
    private long resolvePendingDiscoverTimeoutMs() {
        return resolveMillis(pendingDiscoverTimeoutSeconds, DEFAULT_PENDING_DISCOVER_TIMEOUT_MS);
    }
    private long resolvePendingSessionProtectMs() {
        return resolveMillis(pendingSessionProtectSeconds, DEFAULT_PENDING_SESSION_PROTECT_MS);
    }
    private long resolveRecentDispatchProtectMs() {
        return resolveMillis(recentDispatchProtectSeconds, DEFAULT_RECENT_DISPATCH_PROTECT_MS);
    }
    private long resolveMillis(long seconds, long defaultMs) {
        if (seconds <= 0L) {
            return defaultMs;
        }
        return seconds * 1000L;
    }
    private static class PendingDispatchGuardResult {
        private final boolean keepPending;
        private final String reason;
        private PendingDispatchGuardResult(boolean keepPending, String reason) {
            this.keepPending = keepPending;
            this.reason = reason;
        }
        public static PendingDispatchGuardResult keep(String reason) {
            return new PendingDispatchGuardResult(true, reason);
        }
        public static PendingDispatchGuardResult release(String reason) {
            return new PendingDispatchGuardResult(false, reason);
        }
        public boolean keepPending() {
            return keepPending;
        }
        public String reason() {
            return reason;
        }
    }
}
src/main/resources/application.yml
@@ -147,3 +147,12 @@
    enabled: false
    thresholdMs: 50
    sampleRate: 1.0
station:
  outbound:
    # 出库命令入设备执行链路后,等待设备状态回显任务号的超时时间
    pending-discover-timeout-seconds: 60
    # 已存在活动路由会话时,继续保持 pending 的保护时间
    pending-session-protect-seconds: 90
    # 最近已经成功写过同源站 MOVE 命令时,继续保持 pending 的保护时间
    recent-dispatch-protect-seconds: 60