package com.zy.core.utils.station; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.zy.asrs.entity.BasStation; import com.zy.asrs.domain.enums.NotifyMsgType; import com.zy.asrs.entity.WrkMast; import com.zy.asrs.service.BasStationService; import com.zy.asrs.service.WrkAnalysisService; import com.zy.asrs.service.WrkMastService; import com.zy.asrs.utils.NotifyUtils; import com.zy.common.utils.RedisUtil; import com.zy.core.News; import com.zy.core.cache.SlaveConnection; import com.zy.core.dispatch.StationCommandDispatchResult; import com.zy.core.dispatch.StationCommandDispatcher; import com.zy.core.enums.RedisKeyType; import com.zy.core.enums.SlaveType; import com.zy.core.enums.WrkStsType; import com.zy.core.model.StationObjModel; import com.zy.core.model.command.StationCommand; import com.zy.core.model.protocol.StationProtocol; import com.zy.core.move.StationMoveCoordinator; import com.zy.core.move.StationMoveSession; import com.zy.core.thread.StationThread; import com.zy.core.utils.station.model.DispatchLimitConfig; import com.zy.core.utils.station.model.LoadGuardState; import com.zy.core.utils.station.model.LoopHitResult; import com.zy.core.utils.station.model.OutOrderDispatchDecision; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.beans.factory.annotation.Value; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; @Component public class StationOutboundDispatchProcessor { private static final int PENDING_DISPATCH_EXPIRE_SECONDS = 60 * 10; private static final long DEFAULT_PENDING_DISCOVER_TIMEOUT_MS = 60_000L; private static final long DEFAULT_PENDING_SESSION_PROTECT_MS = 90_000L; private static final long DEFAULT_RECENT_DISPATCH_PROTECT_MS = 60_000L; @Autowired private WrkMastService wrkMastService; @Autowired private BasStationService basStationService; @Autowired private WrkAnalysisService wrkAnalysisService; @Autowired private RedisUtil redisUtil; @Autowired private NotifyUtils notifyUtils; @Autowired private StationMoveCoordinator stationMoveCoordinator; @Autowired(required = false) private StationCommandDispatcher stationCommandDispatcher; @Autowired private StationDispatchLoadSupport stationDispatchLoadSupport; @Autowired private StationOutboundDecisionSupport stationOutboundDecisionSupport; @Autowired private StationDispatchRuntimeStateSupport stationDispatchRuntimeStateSupport; @Value("${station.outbound.pending-discover-timeout-seconds:60}") private long pendingDiscoverTimeoutSeconds; @Value("${station.outbound.pending-session-protect-seconds:90}") private long pendingSessionProtectSeconds; @Value("${station.outbound.recent-dispatch-protect-seconds:60}") private long recentDispatchProtectSeconds; public void crnStationOutExecute() { try { List wrkMasts = wrkMastService.list(new QueryWrapper() .eq("wrk_sts", WrkStsType.OUTBOUND_RUN_COMPLETE.sts) .isNotNull("crn_no")); for (WrkMast wrkMast : wrkMasts) { crnStationOutExecute(wrkMast); } } catch (Exception e) { e.printStackTrace(); } } public void crnStationOutExecute(WrkMast wrkMast) { try { if (wrkMast == null || wrkMast.getWrkNo() == null) { return; } Object pendingObj = redisUtil.get(RedisKeyType.STATION_OUT_PENDING_DISPATCH_.key + wrkMast.getWrkNo()); if (pendingObj != null) { if (!Objects.equals(wrkMast.getWrkSts(), WrkStsType.OUTBOUND_RUN_COMPLETE.sts)) { clearPendingDispatch(wrkMast.getWrkNo()); return; } StationObjModel pendingStationObjModel = getOutboundSourceStation(wrkMast); if (pendingStationObjModel == null || pendingStationObjModel.getDeviceNo() == null || pendingStationObjModel.getStationId() == null) { clearPendingDispatch(wrkMast.getWrkNo()); return; } StationThread pendingStationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, pendingStationObjModel.getDeviceNo()); if (pendingStationThread != null) { List taskNoList = pendingStationThread.getAllTaskNoList(); if (taskNoList != null && taskNoList.contains(wrkMast.getWrkNo())) { Date now = new Date(); wrkMast.setWrkSts(WrkStsType.STATION_RUN.sts); wrkMast.setSystemMsg(""); wrkMast.setIoTime(now); wrkMast.setModiTime(now); if (wrkMastService.updateById(wrkMast)) { wrkAnalysisService.markOutboundStationStart(wrkMast, now); notifyUtils.notify(String.valueOf(SlaveType.Devp), pendingStationObjModel.getDeviceNo(), String.valueOf(wrkMast.getWrkNo()), wrkMast.getWmsWrkNo(), NotifyMsgType.STATION_OUT_TASK_RUN, null); clearPendingDispatch(wrkMast.getWrkNo()); News.info("输送设备已发现任务号,任务转运行中。deviceNo={},源站={},工作号={}", pendingStationObjModel.getDeviceNo(), pendingStationObjModel.getStationId(), wrkMast.getWrkNo()); } return; } } long createdAt; try { createdAt = Long.parseLong(String.valueOf(pendingObj)); } catch (Exception ignore) { createdAt = System.currentTimeMillis(); } long pendingDiscoverTimeoutMs = resolvePendingDiscoverTimeoutMs(); if (System.currentTimeMillis() - createdAt < pendingDiscoverTimeoutMs) { return; } PendingDispatchGuardResult guardResult = evaluatePendingDispatchGuard(wrkMast, pendingStationObjModel, pendingStationThread); if (guardResult.keepPending()) { News.info("输送站点等待源站执行超时后继续保持等待。工作号={},源站={},原因={}", wrkMast.getWrkNo(), pendingStationObjModel.getStationId(), guardResult.reason()); markPendingDispatch(wrkMast.getWrkNo()); return; } clearPendingDispatch(wrkMast.getWrkNo()); News.warn("输送站点执行确认超时,且未发现有效活动链路,已释放重试资格。工作号={}", wrkMast.getWrkNo()); } StationObjModel stationObjModel = getOutboundSourceStation(wrkMast); if (stationObjModel == null || stationObjModel.getDeviceNo() == null || stationObjModel.getStationId() == null) { return; } StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, stationObjModel.getDeviceNo()); if (stationThread == null) { return; } Map stationMap = stationThread.getStatusMap(); StationProtocol stationProtocol = stationMap == null ? null : stationMap.get(stationObjModel.getStationId()); if (stationProtocol == null) { return; } Object lock = redisUtil.get(RedisKeyType.STATION_OUT_EXECUTE_LIMIT.key + stationProtocol.getStationId()); if (lock != null) { return; } if (!(stationProtocol.isAutoing() && stationProtocol.isLoading() && stationProtocol.getTaskNo() == 0)) { return; } Double pathLenFactor = stationOutboundDecisionSupport.resolveOutboundPathLenFactor(wrkMast); List outOrderList = stationOutboundDecisionSupport.getAllOutOrderList(); OutOrderDispatchDecision dispatchDecision = stationOutboundDecisionSupport.resolveOutboundDispatchDecision( stationProtocol.getStationId(), wrkMast, outOrderList, pathLenFactor ); Integer moveStaNo = dispatchDecision == null ? null : dispatchDecision.getTargetStationId(); if (moveStaNo == null) { return; } DispatchLimitConfig limitConfig = stationDispatchLoadSupport.getDispatchLimitConfig(stationProtocol.getStationId(), moveStaNo); int currentStationTaskCount = stationDispatchLoadSupport.countCurrentStationTask(); LoadGuardState loadGuardState = stationDispatchLoadSupport.buildLoadGuardState(limitConfig); LoopHitResult loopHitResult = stationDispatchLoadSupport.findPathLoopHit( limitConfig, stationProtocol.getStationId(), moveStaNo, loadGuardState, wrkMast, pathLenFactor ); if (stationDispatchLoadSupport.isDispatchBlocked( limitConfig, currentStationTaskCount, loadGuardState, loopHitResult.isThroughLoop())) { return; } StationCommand command = stationOutboundDecisionSupport.buildOutboundMoveCommand( stationThread, wrkMast, stationProtocol.getStationId(), moveStaNo, pathLenFactor ); if (command == null) { News.taskInfo(wrkMast.getWrkNo(), "获取输送线命令失败"); return; } boolean offered = offerDevpCommandWithDedup(stationObjModel.getDeviceNo(), command, "crnStationOutExecute"); if (!offered) { return; } if (stationMoveCoordinator != null) { stationMoveCoordinator.recordDispatch( wrkMast.getWrkNo(), stationProtocol.getStationId(), "crnStationOutExecute", command, false ); } markPendingDispatch(wrkMast.getWrkNo()); News.info("输送站点出库命令已入设备执行链路,等待源站执行。站点号={},工作号={},命令数据={}", stationProtocol.getStationId(), wrkMast.getWrkNo(), JSON.toJSONString(command)); redisUtil.set(RedisKeyType.STATION_OUT_EXECUTE_LIMIT.key + stationProtocol.getStationId(), "lock", 5); loadGuardState.reserveLoopTask(loopHitResult.getLoopNo()); stationDispatchLoadSupport.saveLoopLoadReserve(wrkMast.getWrkNo(), loopHitResult); } catch (Exception e) { e.printStackTrace(); } } public void dualCrnStationOutExecute() { try { List wrkMasts = wrkMastService.list(new QueryWrapper() .eq("wrk_sts", WrkStsType.OUTBOUND_RUN_COMPLETE.sts) .isNotNull("dual_crn_no")); for (WrkMast wrkMast : wrkMasts) { if (hasPendingDispatch(wrkMast.getWrkNo())) { continue; } StationObjModel stationObjModel = getOutboundSourceStation(wrkMast); if (stationObjModel == null || stationObjModel.getDeviceNo() == null || stationObjModel.getStationId() == null) { continue; } StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, stationObjModel.getDeviceNo()); if (stationThread == null) { continue; } Map stationMap = stationThread.getStatusMap(); StationProtocol stationProtocol = stationMap.get(stationObjModel.getStationId()); if (stationProtocol == null) { continue; } Object lock = redisUtil.get(RedisKeyType.STATION_OUT_EXECUTE_LIMIT.key + stationProtocol.getStationId()); if (lock != null) { continue; } if (stationProtocol.isAutoing() && stationProtocol.isLoading() && stationProtocol.getTaskNo() == 0) { Double pathLenFactor = stationOutboundDecisionSupport.resolveOutboundPathLenFactor(wrkMast); StationCommand command = stationOutboundDecisionSupport.buildOutboundMoveCommand( stationThread, wrkMast, stationProtocol.getStationId(), wrkMast.getStaNo(), pathLenFactor ); if (command == null) { News.taskInfo(wrkMast.getWrkNo(), "获取输送线命令失败"); continue; } boolean offered = offerDevpCommandWithDedup(stationObjModel.getDeviceNo(), command, "dualCrnStationOutExecute"); if (!offered) { continue; } if (stationMoveCoordinator != null) { stationMoveCoordinator.recordDispatch( wrkMast.getWrkNo(), stationProtocol.getStationId(), "dualCrnStationOutExecute", command, false ); } markPendingDispatch(wrkMast.getWrkNo()); News.info("输送站点出库命令已入设备执行链路,等待源站接单。站点号={},工作号={},命令数据={}", stationProtocol.getStationId(), wrkMast.getWrkNo(), JSON.toJSONString(command)); redisUtil.set(RedisKeyType.STATION_OUT_EXECUTE_LIMIT.key + stationProtocol.getStationId(), "lock", 5); } } } catch (Exception e) { e.printStackTrace(); } } private boolean offerDevpCommandWithDedup(Integer deviceNo, StationCommand command, String scene) { StationCommandDispatchResult dispatchResult = stationCommandDispatcher .dispatch(deviceNo, command, "station-operate-process", scene); return dispatchResult.isAccepted(); } private StationObjModel getOutboundSourceStation(WrkMast wrkMast) { if (wrkMast == null || wrkMast.getSourceStaNo() == null) { return null; } BasStation basStation = basStationService.getById(wrkMast.getSourceStaNo()); if (basStation == null || basStation.getDeviceNo() == null) { return null; } StationObjModel stationObjModel = new StationObjModel(); stationObjModel.setStationId(wrkMast.getSourceStaNo()); stationObjModel.setDeviceNo(basStation.getDeviceNo()); return stationObjModel; } private boolean hasPendingDispatch(Integer wrkNo) { return wrkNo != null && redisUtil.get(RedisKeyType.STATION_OUT_PENDING_DISPATCH_.key + wrkNo) != null; } private void markPendingDispatch(Integer wrkNo) { if (wrkNo == null) { return; } redisUtil.set(RedisKeyType.STATION_OUT_PENDING_DISPATCH_.key + wrkNo, String.valueOf(System.currentTimeMillis()), PENDING_DISPATCH_EXPIRE_SECONDS); } private void clearPendingDispatch(Integer wrkNo) { if (wrkNo == null) { return; } redisUtil.del(RedisKeyType.STATION_OUT_PENDING_DISPATCH_.key + wrkNo); } private PendingDispatchGuardResult evaluatePendingDispatchGuard(WrkMast wrkMast, StationObjModel stationObjModel, StationThread stationThread) { if (wrkMast == null || wrkMast.getWrkNo() == null || stationObjModel == null || stationObjModel.getStationId() == null) { return PendingDispatchGuardResult.release("missing-runtime-context"); } if (hasActiveMoveSession(wrkMast.getWrkNo(), stationObjModel.getStationId())) { return PendingDispatchGuardResult.keep("active-session"); } if (hasRecentSuccessfulDispatch(wrkMast.getWrkNo(), stationObjModel.getStationId())) { return PendingDispatchGuardResult.keep("recent-successful-dispatch"); } if (taskExistsInBuffer(stationThread, wrkMast.getWrkNo())) { return PendingDispatchGuardResult.keep("task-buffer-detected"); } return PendingDispatchGuardResult.release("no-active-chain"); } private boolean hasActiveMoveSession(Integer wrkNo, Integer sourceStationId) { if (wrkNo == null || wrkNo <= 0 || stationMoveCoordinator == null || sourceStationId == null) { return false; } StationMoveSession session = stationMoveCoordinator.loadSession(wrkNo); if (session == null || !session.isActive()) { return false; } if (!Objects.equals(sourceStationId, session.getDispatchStationId())) { return false; } long now = System.currentTimeMillis(); Long lastIssuedAt = session.getLastIssuedAt(); if (lastIssuedAt != null && now - lastIssuedAt <= resolvePendingSessionProtectMs()) { return true; } Long updatedAt = session.getUpdatedAt(); return updatedAt != null && now - updatedAt <= resolvePendingSessionProtectMs(); } private boolean hasRecentSuccessfulDispatch(Integer wrkNo, Integer sourceStationId) { if (stationDispatchRuntimeStateSupport == null) { return false; } return stationDispatchRuntimeStateSupport.hasRecentIssuedMoveCommand( wrkNo, sourceStationId, resolveRecentDispatchProtectMs() ); } private boolean taskExistsInBuffer(StationThread stationThread, Integer wrkNo) { if (stationThread == null || wrkNo == null || wrkNo <= 0) { return false; } Map statusMap = stationThread.getStatusMap(); if (statusMap == null || statusMap.isEmpty()) { return false; } for (StationProtocol stationProtocol : statusMap.values()) { if (stationProtocol == null || stationProtocol.getTaskBufferItems() == null) { continue; } if (containsTaskNo(stationProtocol, wrkNo)) { return true; } } return false; } private boolean containsTaskNo(StationProtocol stationProtocol, Integer wrkNo) { if (stationProtocol == null || stationProtocol.getTaskBufferItems() == null || wrkNo == null) { return false; } return stationProtocol.getTaskBufferItems().stream() .filter(Objects::nonNull) .anyMatch(item -> Objects.equals(wrkNo, item.getTaskNo())); } private long resolvePendingDiscoverTimeoutMs() { return resolveMillis(pendingDiscoverTimeoutSeconds, DEFAULT_PENDING_DISCOVER_TIMEOUT_MS); } private long resolvePendingSessionProtectMs() { return resolveMillis(pendingSessionProtectSeconds, DEFAULT_PENDING_SESSION_PROTECT_MS); } private long resolveRecentDispatchProtectMs() { return resolveMillis(recentDispatchProtectSeconds, DEFAULT_RECENT_DISPATCH_PROTECT_MS); } private long resolveMillis(long seconds, long defaultMs) { if (seconds <= 0L) { return defaultMs; } return seconds * 1000L; } private static class PendingDispatchGuardResult { private final boolean keepPending; private final String reason; private PendingDispatchGuardResult(boolean keepPending, String reason) { this.keepPending = keepPending; this.reason = reason; } public static PendingDispatchGuardResult keep(String reason) { return new PendingDispatchGuardResult(true, reason); } public static PendingDispatchGuardResult release(String reason) { return new PendingDispatchGuardResult(false, reason); } public boolean keepPending() { return keepPending; } public String reason() { return reason; } } }