From 0a9c9bf4fa908d9d0bbdd910b70fcd0650e0df1d Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期一, 06 四月 2026 22:46:30 +0800
Subject: [PATCH] #出库输送执行
---
src/main/java/com/zy/core/utils/station/StationOutboundDispatchProcessor.java | 327 +++++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 273 insertions(+), 54 deletions(-)
diff --git a/src/main/java/com/zy/core/utils/station/StationOutboundDispatchProcessor.java b/src/main/java/com/zy/core/utils/station/StationOutboundDispatchProcessor.java
index b5d654e..b86e1f5 100644
--- a/src/main/java/com/zy/core/utils/station/StationOutboundDispatchProcessor.java
+++ b/src/main/java/com/zy/core/utils/station/StationOutboundDispatchProcessor.java
@@ -2,8 +2,10 @@
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.zy.asrs.entity.BasStation;
import com.zy.asrs.domain.enums.NotifyMsgType;
import com.zy.asrs.entity.WrkMast;
+import com.zy.asrs.service.BasStationService;
import com.zy.asrs.service.WrkAnalysisService;
import com.zy.asrs.service.WrkMastService;
import com.zy.asrs.utils.NotifyUtils;
@@ -19,6 +21,7 @@
import com.zy.core.model.command.StationCommand;
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.move.StationMoveCoordinator;
+import com.zy.core.move.StationMoveSession;
import com.zy.core.thread.StationThread;
import com.zy.core.utils.station.model.DispatchLimitConfig;
import com.zy.core.utils.station.model.LoadGuardState;
@@ -26,16 +29,25 @@
import com.zy.core.utils.station.model.OutOrderDispatchDecision;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import org.springframework.beans.factory.annotation.Value;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
@Component
public class StationOutboundDispatchProcessor {
+ private static final int PENDING_DISPATCH_EXPIRE_SECONDS = 60 * 10;
+ private static final long DEFAULT_PENDING_DISCOVER_TIMEOUT_MS = 60_000L;
+ private static final long DEFAULT_PENDING_SESSION_PROTECT_MS = 90_000L;
+ private static final long DEFAULT_RECENT_DISPATCH_PROTECT_MS = 60_000L;
+
@Autowired
private WrkMastService wrkMastService;
+ @Autowired
+ private BasStationService basStationService;
@Autowired
private WrkAnalysisService wrkAnalysisService;
@Autowired
@@ -50,6 +62,15 @@
private StationDispatchLoadSupport stationDispatchLoadSupport;
@Autowired
private StationOutboundDecisionSupport stationOutboundDecisionSupport;
+ @Autowired
+ private StationDispatchRuntimeStateSupport stationDispatchRuntimeStateSupport;
+
+ @Value("${station.outbound.pending-discover-timeout-seconds:60}")
+ private long pendingDiscoverTimeoutSeconds;
+ @Value("${station.outbound.pending-session-protect-seconds:90}")
+ private long pendingSessionProtectSeconds;
+ @Value("${station.outbound.recent-dispatch-protect-seconds:60}")
+ private long recentDispatchProtectSeconds;
public void crnStationOutExecute() {
try {
@@ -69,14 +90,68 @@
if (wrkMast == null || wrkMast.getWrkNo() == null) {
return;
}
+ Object pendingObj = redisUtil.get(RedisKeyType.STATION_OUT_PENDING_DISPATCH_.key + wrkMast.getWrkNo());
+ if (pendingObj != null) {
+ if (!Objects.equals(wrkMast.getWrkSts(), WrkStsType.OUTBOUND_RUN_COMPLETE.sts)) {
+ clearPendingDispatch(wrkMast.getWrkNo());
+ return;
+ }
- Object infoObj = redisUtil.get(RedisKeyType.CRN_OUT_TASK_COMPLETE_STATION_INFO.key + wrkMast.getWrkNo());
- if (infoObj == null) {
- News.info("鍑哄簱浠诲姟{}鏁版嵁缂撳瓨涓嶅瓨鍦�", wrkMast.getWrkNo());
- return;
+ StationObjModel pendingStationObjModel = getOutboundSourceStation(wrkMast);
+ if (pendingStationObjModel == null
+ || pendingStationObjModel.getDeviceNo() == null
+ || pendingStationObjModel.getStationId() == null) {
+ clearPendingDispatch(wrkMast.getWrkNo());
+ return;
+ }
+
+ StationThread pendingStationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, pendingStationObjModel.getDeviceNo());
+ if (pendingStationThread != null) {
+ List<Integer> taskNoList = pendingStationThread.getAllTaskNoList();
+ if (taskNoList != null && taskNoList.contains(wrkMast.getWrkNo())) {
+ Date now = new Date();
+ wrkMast.setWrkSts(WrkStsType.STATION_RUN.sts);
+ wrkMast.setSystemMsg("");
+ wrkMast.setIoTime(now);
+ wrkMast.setModiTime(now);
+ if (wrkMastService.updateById(wrkMast)) {
+ wrkAnalysisService.markOutboundStationStart(wrkMast, now);
+ notifyUtils.notify(String.valueOf(SlaveType.Devp), pendingStationObjModel.getDeviceNo(),
+ String.valueOf(wrkMast.getWrkNo()), wrkMast.getWmsWrkNo(),
+ NotifyMsgType.STATION_OUT_TASK_RUN, null);
+ clearPendingDispatch(wrkMast.getWrkNo());
+ News.info("杈撻�佽澶囧凡鍙戠幇浠诲姟鍙凤紝浠诲姟杞繍琛屼腑銆俤eviceNo={}锛屾簮绔�={}锛屽伐浣滃彿={}",
+ pendingStationObjModel.getDeviceNo(), pendingStationObjModel.getStationId(), wrkMast.getWrkNo());
+ }
+ return;
+ }
+ }
+
+ long createdAt;
+ try {
+ createdAt = Long.parseLong(String.valueOf(pendingObj));
+ } catch (Exception ignore) {
+ createdAt = System.currentTimeMillis();
+ }
+ long pendingDiscoverTimeoutMs = resolvePendingDiscoverTimeoutMs();
+ if (System.currentTimeMillis() - createdAt < pendingDiscoverTimeoutMs) {
+ return;
+ }
+
+ PendingDispatchGuardResult guardResult = evaluatePendingDispatchGuard(wrkMast, pendingStationObjModel, pendingStationThread);
+ if (guardResult.keepPending()) {
+ News.info("杈撻�佺珯鐐圭瓑寰呮簮绔欐墽琛岃秴鏃跺悗缁х画淇濇寔绛夊緟銆傚伐浣滃彿={}锛屾簮绔�={}锛屽師鍥�={}",
+ wrkMast.getWrkNo(),
+ pendingStationObjModel.getStationId(),
+ guardResult.reason());
+ markPendingDispatch(wrkMast.getWrkNo());
+ return;
+ }
+ clearPendingDispatch(wrkMast.getWrkNo());
+ News.warn("杈撻�佺珯鐐规墽琛岀‘璁よ秴鏃讹紝涓旀湭鍙戠幇鏈夋晥娲诲姩閾捐矾锛屽凡閲婃斁閲嶈瘯璧勬牸銆傚伐浣滃彿={}", wrkMast.getWrkNo());
}
- StationObjModel stationObjModel = JSON.parseObject(infoObj.toString(), StationObjModel.class);
+ StationObjModel stationObjModel = getOutboundSourceStation(wrkMast);
if (stationObjModel == null || stationObjModel.getDeviceNo() == null || stationObjModel.getStationId() == null) {
return;
}
@@ -150,30 +225,25 @@
return;
}
- Date now = new Date();
- wrkMast.setWrkSts(WrkStsType.STATION_RUN.sts);
- wrkMast.setSystemMsg("");
- wrkMast.setIoTime(now);
- wrkMast.setModiTime(now);
- if (wrkMastService.updateById(wrkMast)) {
- wrkAnalysisService.markOutboundStationStart(wrkMast, now);
- boolean offered = offerDevpCommandWithDedup(stationObjModel.getDeviceNo(), command, "crnStationOutExecute");
- if (offered && stationMoveCoordinator != null) {
- stationMoveCoordinator.recordDispatch(
- wrkMast.getWrkNo(),
- stationProtocol.getStationId(),
- "crnStationOutExecute",
- command,
- false
- );
- }
- News.info("杈撻�佺珯鐐瑰嚭搴撳懡浠や笅鍙戞垚鍔燂紝绔欑偣鍙�={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}",
- stationProtocol.getStationId(), wrkMast.getWrkNo(), JSON.toJSONString(command));
- redisUtil.set(RedisKeyType.STATION_OUT_EXECUTE_LIMIT.key + stationProtocol.getStationId(), "lock", 5);
- redisUtil.del(RedisKeyType.CRN_OUT_TASK_COMPLETE_STATION_INFO.key + wrkMast.getWrkNo());
- loadGuardState.reserveLoopTask(loopHitResult.getLoopNo());
- stationDispatchLoadSupport.saveLoopLoadReserve(wrkMast.getWrkNo(), loopHitResult);
+ boolean offered = offerDevpCommandWithDedup(stationObjModel.getDeviceNo(), command, "crnStationOutExecute");
+ if (!offered) {
+ return;
}
+ if (stationMoveCoordinator != null) {
+ stationMoveCoordinator.recordDispatch(
+ wrkMast.getWrkNo(),
+ stationProtocol.getStationId(),
+ "crnStationOutExecute",
+ command,
+ false
+ );
+ }
+ markPendingDispatch(wrkMast.getWrkNo());
+ News.info("杈撻�佺珯鐐瑰嚭搴撳懡浠ゅ凡鍏ヨ澶囨墽琛岄摼璺紝绛夊緟婧愮珯鎵ц銆傜珯鐐瑰彿={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}",
+ stationProtocol.getStationId(), wrkMast.getWrkNo(), JSON.toJSONString(command));
+ redisUtil.set(RedisKeyType.STATION_OUT_EXECUTE_LIMIT.key + stationProtocol.getStationId(), "lock", 5);
+ loadGuardState.reserveLoopTask(loopHitResult.getLoopNo());
+ stationDispatchLoadSupport.saveLoopLoadReserve(wrkMast.getWrkNo(), loopHitResult);
} catch (Exception e) {
e.printStackTrace();
}
@@ -185,13 +255,13 @@
.eq("wrk_sts", WrkStsType.OUTBOUND_RUN_COMPLETE.sts)
.isNotNull("dual_crn_no"));
for (WrkMast wrkMast : wrkMasts) {
- Object infoObj = redisUtil.get(RedisKeyType.DUAL_CRN_OUT_TASK_STATION_INFO.key + wrkMast.getWrkNo());
- if (infoObj == null) {
- News.info("鍑哄簱浠诲姟{}鏁版嵁缂撳瓨涓嶅瓨鍦�", wrkMast.getWrkNo());
+ if (hasPendingDispatch(wrkMast.getWrkNo())) {
continue;
}
-
- StationObjModel stationObjModel = JSON.parseObject(infoObj.toString(), StationObjModel.class);
+ StationObjModel stationObjModel = getOutboundSourceStation(wrkMast);
+ if (stationObjModel == null || stationObjModel.getDeviceNo() == null || stationObjModel.getStationId() == null) {
+ continue;
+ }
StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, stationObjModel.getDeviceNo());
if (stationThread == null) {
continue;
@@ -224,28 +294,23 @@
continue;
}
- wrkMast.setWrkSts(WrkStsType.STATION_RUN.sts);
- wrkMast.setSystemMsg("");
- wrkMast.setIoTime(new Date());
- if (wrkMastService.updateById(wrkMast)) {
- boolean offered = offerDevpCommandWithDedup(stationObjModel.getDeviceNo(), command, "dualCrnStationOutExecute");
- if (offered && stationMoveCoordinator != null) {
- stationMoveCoordinator.recordDispatch(
- wrkMast.getWrkNo(),
- stationProtocol.getStationId(),
- "dualCrnStationOutExecute",
- command,
- false
- );
- }
- notifyUtils.notify(String.valueOf(SlaveType.Devp), stationObjModel.getDeviceNo(),
- String.valueOf(wrkMast.getWrkNo()), wrkMast.getWmsWrkNo(),
- NotifyMsgType.STATION_OUT_TASK_RUN, null);
- News.info("杈撻�佺珯鐐瑰嚭搴撳懡浠や笅鍙戞垚鍔燂紝绔欑偣鍙�={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}",
- stationProtocol.getStationId(), wrkMast.getWrkNo(), JSON.toJSONString(command));
- redisUtil.set(RedisKeyType.STATION_OUT_EXECUTE_LIMIT.key + stationProtocol.getStationId(), "lock", 5);
- redisUtil.del(RedisKeyType.DUAL_CRN_OUT_TASK_STATION_INFO.key + wrkMast.getWrkNo());
+ boolean offered = offerDevpCommandWithDedup(stationObjModel.getDeviceNo(), command, "dualCrnStationOutExecute");
+ if (!offered) {
+ continue;
}
+ if (stationMoveCoordinator != null) {
+ stationMoveCoordinator.recordDispatch(
+ wrkMast.getWrkNo(),
+ stationProtocol.getStationId(),
+ "dualCrnStationOutExecute",
+ command,
+ false
+ );
+ }
+ markPendingDispatch(wrkMast.getWrkNo());
+ News.info("杈撻�佺珯鐐瑰嚭搴撳懡浠ゅ凡鍏ヨ澶囨墽琛岄摼璺紝绛夊緟婧愮珯鎺ュ崟銆傜珯鐐瑰彿={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}",
+ stationProtocol.getStationId(), wrkMast.getWrkNo(), JSON.toJSONString(command));
+ redisUtil.set(RedisKeyType.STATION_OUT_EXECUTE_LIMIT.key + stationProtocol.getStationId(), "lock", 5);
}
}
} catch (Exception e) {
@@ -258,4 +323,158 @@
.dispatch(deviceNo, command, "station-operate-process", scene);
return dispatchResult.isAccepted();
}
+
+ private StationObjModel getOutboundSourceStation(WrkMast wrkMast) {
+ if (wrkMast == null || wrkMast.getSourceStaNo() == null) {
+ return null;
+ }
+ BasStation basStation = basStationService.getById(wrkMast.getSourceStaNo());
+ if (basStation == null || basStation.getDeviceNo() == null) {
+ return null;
+ }
+ StationObjModel stationObjModel = new StationObjModel();
+ stationObjModel.setStationId(wrkMast.getSourceStaNo());
+ stationObjModel.setDeviceNo(basStation.getDeviceNo());
+ return stationObjModel;
+ }
+
+ private boolean hasPendingDispatch(Integer wrkNo) {
+ return wrkNo != null && redisUtil.get(RedisKeyType.STATION_OUT_PENDING_DISPATCH_.key + wrkNo) != null;
+ }
+
+ private void markPendingDispatch(Integer wrkNo) {
+ if (wrkNo == null) {
+ return;
+ }
+ redisUtil.set(RedisKeyType.STATION_OUT_PENDING_DISPATCH_.key + wrkNo, String.valueOf(System.currentTimeMillis()), PENDING_DISPATCH_EXPIRE_SECONDS);
+ }
+
+ private void clearPendingDispatch(Integer wrkNo) {
+ if (wrkNo == null) {
+ return;
+ }
+ redisUtil.del(RedisKeyType.STATION_OUT_PENDING_DISPATCH_.key + wrkNo);
+ }
+
+ private PendingDispatchGuardResult evaluatePendingDispatchGuard(WrkMast wrkMast,
+ StationObjModel stationObjModel,
+ StationThread stationThread) {
+ if (wrkMast == null || wrkMast.getWrkNo() == null || stationObjModel == null || stationObjModel.getStationId() == null) {
+ return PendingDispatchGuardResult.release("missing-runtime-context");
+ }
+ if (hasActiveMoveSession(wrkMast.getWrkNo(), stationObjModel.getStationId())) {
+ return PendingDispatchGuardResult.keep("active-session");
+ }
+ if (hasRecentSuccessfulDispatch(wrkMast.getWrkNo(), stationObjModel.getStationId())) {
+ return PendingDispatchGuardResult.keep("recent-successful-dispatch");
+ }
+ if (taskExistsInBuffer(stationThread, wrkMast.getWrkNo())) {
+ return PendingDispatchGuardResult.keep("task-buffer-detected");
+ }
+ return PendingDispatchGuardResult.release("no-active-chain");
+ }
+
+ private boolean hasActiveMoveSession(Integer wrkNo, Integer sourceStationId) {
+ if (wrkNo == null || wrkNo <= 0 || stationMoveCoordinator == null || sourceStationId == null) {
+ return false;
+ }
+ StationMoveSession session = stationMoveCoordinator.loadSession(wrkNo);
+ if (session == null || !session.isActive()) {
+ return false;
+ }
+ if (!Objects.equals(sourceStationId, session.getDispatchStationId())) {
+ return false;
+ }
+ long now = System.currentTimeMillis();
+ Long lastIssuedAt = session.getLastIssuedAt();
+ if (lastIssuedAt != null && now - lastIssuedAt <= resolvePendingSessionProtectMs()) {
+ return true;
+ }
+ Long updatedAt = session.getUpdatedAt();
+ return updatedAt != null && now - updatedAt <= resolvePendingSessionProtectMs();
+ }
+
+ private boolean hasRecentSuccessfulDispatch(Integer wrkNo, Integer sourceStationId) {
+ if (stationDispatchRuntimeStateSupport == null) {
+ return false;
+ }
+ return stationDispatchRuntimeStateSupport.hasRecentIssuedMoveCommand(
+ wrkNo,
+ sourceStationId,
+ resolveRecentDispatchProtectMs()
+ );
+ }
+
+ private boolean taskExistsInBuffer(StationThread stationThread, Integer wrkNo) {
+ if (stationThread == null || wrkNo == null || wrkNo <= 0) {
+ return false;
+ }
+ Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
+ if (statusMap == null || statusMap.isEmpty()) {
+ return false;
+ }
+ for (StationProtocol stationProtocol : statusMap.values()) {
+ if (stationProtocol == null || stationProtocol.getTaskBufferItems() == null) {
+ continue;
+ }
+ if (containsTaskNo(stationProtocol, wrkNo)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean containsTaskNo(StationProtocol stationProtocol, Integer wrkNo) {
+ if (stationProtocol == null || stationProtocol.getTaskBufferItems() == null || wrkNo == null) {
+ return false;
+ }
+ return stationProtocol.getTaskBufferItems().stream()
+ .filter(Objects::nonNull)
+ .anyMatch(item -> Objects.equals(wrkNo, item.getTaskNo()));
+ }
+
+ private long resolvePendingDiscoverTimeoutMs() {
+ return resolveMillis(pendingDiscoverTimeoutSeconds, DEFAULT_PENDING_DISCOVER_TIMEOUT_MS);
+ }
+
+ private long resolvePendingSessionProtectMs() {
+ return resolveMillis(pendingSessionProtectSeconds, DEFAULT_PENDING_SESSION_PROTECT_MS);
+ }
+
+ private long resolveRecentDispatchProtectMs() {
+ return resolveMillis(recentDispatchProtectSeconds, DEFAULT_RECENT_DISPATCH_PROTECT_MS);
+ }
+
+ private long resolveMillis(long seconds, long defaultMs) {
+ if (seconds <= 0L) {
+ return defaultMs;
+ }
+ return seconds * 1000L;
+ }
+
+ private static class PendingDispatchGuardResult {
+ private final boolean keepPending;
+ private final String reason;
+
+ private PendingDispatchGuardResult(boolean keepPending, String reason) {
+ this.keepPending = keepPending;
+ this.reason = reason;
+ }
+
+ public static PendingDispatchGuardResult keep(String reason) {
+ return new PendingDispatchGuardResult(true, reason);
+ }
+
+ public static PendingDispatchGuardResult release(String reason) {
+ return new PendingDispatchGuardResult(false, reason);
+ }
+
+ public boolean keepPending() {
+ return keepPending;
+ }
+
+ public String reason() {
+ return reason;
+ }
+ }
}
--
Gitblit v1.9.1