Junjie
2026-04-27 73a8fcbaaec22b972c7f52d9477271a246d17926
src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
@@ -1,16 +1,24 @@
package com.zy.core.utils;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.zy.asrs.entity.BasCrnp;
import com.zy.asrs.entity.BasDevp;
import com.zy.asrs.entity.WrkMast;
import com.zy.asrs.utils.Utils;
import com.zy.asrs.service.*;
import com.zy.common.service.CommonService;
import com.zy.common.utils.RedisUtil;
import com.zy.core.News;
import com.zy.core.cache.SlaveConnection;
import com.zy.core.dispatch.StationCommandDispatcher;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
import com.zy.core.enums.StationCommandType;
import com.zy.core.enums.WrkIoType;
import com.zy.core.enums.WrkStsType;
import com.zy.core.model.StationObjModel;
import com.zy.core.model.command.StationCommand;
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.task.MainProcessLane;
import com.zy.core.task.MainProcessTaskSubmitter;
@@ -26,9 +34,12 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Component
public class StationOperateProcessUtils {
    private static final String STATION_COMMAND_SOURCE = "station-operate-process";
    @Autowired
    private WrkMastService wrkMastService;
    @Autowired
@@ -51,10 +62,43 @@
    private StationOutboundDecisionSupport stationOutboundDecisionSupport;
    @Autowired
    private BasCrnpService basCrnpService;
    @Autowired
    private CommonService commonService;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private StationCommandDispatcher stationCommandDispatcher;
    //执行输送站点入库任务
    public synchronized void stationInExecute() {
        stationRegularDispatchProcessor.stationInExecute();
    public void submitStationEnableInTasks(long minIntervalMs) {
        submitStationEnableInTasks(MainProcessLane.STATION_ENABLE_IN, minIntervalMs);
    }
    public void submitStationEnableInTasks(MainProcessLane lane,
                                           long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
            if (stationMap == null || stationMap.isEmpty()) {
                continue;
            }
            for (StationObjModel stationObjModel : basDevp.getInStationList$()) {
                Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
                if (stationId == null || !stationMap.containsKey(stationId)) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "stationEnableInExecute",
                        minIntervalMs,
                        () -> stationEnableInExecute(basDevp, stationObjModel)
                );
            }
        }
    }
    // 执行单个站点的入库任务下发
@@ -62,9 +106,62 @@
        stationRegularDispatchProcessor.stationInExecute(basDevp, stationObjModel);
    }
    //执行堆垛机输送站点出库任务
    public synchronized void crnStationOutExecute() {
        stationOutboundDispatchProcessor.crnStationOutExecute();
    // 执行单个站点的启动入库下发
    public void stationEnableInExecute(BasDevp basDevp, StationObjModel stationObjModel) {
        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
            return;
        }
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
        if (stationThread == null) {
            return;
        }
        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
        if (stationMap == null || stationMap.isEmpty()) {
            return;
        }
        Integer stationId = stationObjModel.getStationId();
        if (!stationMap.containsKey(stationId)) {
            return;
        }
        StationProtocol stationProtocol = stationMap.get(stationId);
        if (stationProtocol == null) {
            return;
        }
        Object lock = redisUtil.get(RedisKeyType.GENERATE_ENABLE_IN_STATION_DATA_LIMIT.key + stationId);
        if (lock != null) {
            return;
        }
        if (!stationProtocol.isAutoing()
                || !stationProtocol.isLoading()
                || stationProtocol.getTaskNo() != 0
                || !stationProtocol.isEnableIn()) {
            return;
        }
        Integer barcodeStationId = stationObjModel.getBarcodeStation() == null ? null : stationObjModel.getBarcodeStation().getStationId();
        if (barcodeStationId == null) {
            return;
        }
        StationCommand command = stationThread.getCommand(
                StationCommandType.MOVE,
                commonService.getWorkNo(WrkIoType.ENABLE_IN.id),
                stationId,
                barcodeStationId,
                0
        );
        stationCommandDispatcher.dispatch(basDevp.getDevpNo(), command, STATION_COMMAND_SOURCE, "enable-in");
        Utils.precomputeInTaskEnableRow(barcodeStationId);
        redisUtil.set(RedisKeyType.GENERATE_ENABLE_IN_STATION_DATA_LIMIT.key + stationId, "lock", 15);
        // 启动入库时删除退回控制key,允许后续异常时再次生成退回命令
        redisUtil.del(RedisKeyType.GENERATE_STATION_BACK_LIMIT.key + barcodeStationId);
        News.info("{}站点启动入库成功,数据包:{}", stationId, JSON.toJSONString(command));
    }
    // 执行单个出库任务对应的输送站点下发
@@ -72,9 +169,9 @@
        stationOutboundDispatchProcessor.crnStationOutExecute(wrkMast);
    }
    //执行双工位堆垛机输送站点出库任务
    public synchronized void dualCrnStationOutExecute() {
        stationOutboundDispatchProcessor.dualCrnStationOutExecute();
    // 执行单个双工位出库任务对应的输送站点下发
    public void dualCrnStationOutExecute(WrkMast wrkMast) {
        stationOutboundDispatchProcessor.dualCrnStationOutExecute(wrkMast);
    }
    // 检测单个出库任务是否到达目标站台
@@ -103,15 +200,25 @@
        if (wrkMast == null) {
            return;
        }
        if (!Objects.equals(wrkMast.getStaNo(), stationObjModel.getStationId())) {
            News.info("入库站点到达扫描忽略,工作号={},扫描站点={},任务目标站={},原因=target_mismatch",
                    wrkMast.getWrkNo(), stationObjModel.getStationId(), wrkMast.getStaNo());
            return;
        }
        if (!Objects.equals(wrkMast.getWrkSts(), WrkStsType.INBOUND_STATION_RUN.sts)) {
            News.info("入库站点到达扫描忽略,工作号={},扫描站点={},任务状态={},原因=wrk_sts_mismatch",
                    wrkMast.getWrkNo(), stationObjModel.getStationId(), wrkMast.getWrkSts());
            return;
        }
        News.info("入库站点到达扫描命中,工作号={},扫描站点={},目标站={},站点taskNo={},准备转状态3",
                wrkMast.getWrkNo(), stationObjModel.getStationId(), wrkMast.getStaNo(), stationProtocol.getTaskNo());
        boolean updated = wrkAnalysisService.completeInboundStationRun(wrkMast, new Date());
        if (updated) {
            News.info("入库站点到达扫描命中,工作号={},目标站={}", wrkMast.getWrkNo(), wrkMast.getStaNo());
            News.info("入库站点到达扫描完成,工作号={},目标站={},结果=updated_to_3", wrkMast.getWrkNo(), wrkMast.getStaNo());
        }
    }
    // 检测任务转完成
    public void checkTaskToComplete() {
        stationRegularDispatchProcessor.checkTaskToComplete();
        else {
            News.info("入库站点到达扫描结束,工作号={},目标站={},结果=skip_update", wrkMast.getWrkNo(), wrkMast.getStaNo());
        }
    }
    // 检测单个出库任务是否可以转完成
@@ -119,24 +226,9 @@
        stationRegularDispatchProcessor.checkTaskToComplete(wrkMast);
    }
    //检测输送站点是否运行堵塞
    public void checkStationRunBlock() {
        stationRerouteProcessor.checkStationRunBlock();
    }
    // 检测单个站点是否运行堵塞
    public void checkStationRunBlock(BasDevp basDevp, Integer stationId) {
        stationRerouteProcessor.checkStationRunBlock(basDevp, stationId);
    }
    //检测输送站点任务停留超时后重新计算路径
    public void checkStationIdleRecover() {
        stationRerouteProcessor.checkStationIdleRecover();
    }
    // 检测单个站点任务停留超时后的恢复处理
    public void checkStationIdleRecover(BasDevp basDevp, Integer stationId) {
        stationRerouteProcessor.checkStationIdleRecover(basDevp, stationId);
    }
    //获取输送线任务数量
@@ -157,24 +249,9 @@
                        WrkStsType.STATION_RUN.sts));
    }
    // 检测出库排序
    public synchronized void checkStationOutOrder() {
        stationRerouteProcessor.checkStationOutOrder();
    }
    // 检测单个站点的出库排序
    public void checkStationOutOrder(BasDevp basDevp, StationObjModel stationObjModel) {
        stationRerouteProcessor.checkStationOutOrder(basDevp, stationObjModel);
    }
    // 监控绕圈站点
    public synchronized void watchCircleStation() {
        stationRerouteProcessor.watchCircleStation();
    }
    // 监控单个绕圈站点
    public void watchCircleStation(BasDevp basDevp, Integer stationId) {
        stationRerouteProcessor.watchCircleStation(basDevp, stationId);
    }
    public void submitStationInTasks(long minIntervalMs) {
@@ -201,7 +278,9 @@
                if (stationProtocol == null
                        || !stationProtocol.isAutoing()
                        || !stationProtocol.isLoading()
                        || stationProtocol.getTaskNo() <= 0) {
                        || stationProtocol.getTaskNo() <= 0
                        || !stationProtocol.isInEnable()
                ) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
@@ -234,6 +313,29 @@
                    "crnStationOutExecute",
                    minIntervalMs,
                    () -> crnStationOutExecute(wrkMast)
            );
        }
    }
    public void submitDualCrnStationOutTasks(long minIntervalMs) {
        submitDualCrnStationOutTasks(MainProcessLane.DUAL_STATION_OUT, minIntervalMs);
    }
    public void submitDualCrnStationOutTasks(MainProcessLane lane, long minIntervalMs) {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("wrk_sts", WrkStsType.OUTBOUND_RUN_COMPLETE.sts)
                .isNotNull("dual_crn_no"));
        for (WrkMast wrkMast : wrkMasts) {
            Integer laneKey = wrkMast == null ? null : wrkMast.getSourceStaNo();
            if (laneKey == null) {
                laneKey = wrkMast == null ? null : wrkMast.getWrkNo();
            }
            mainProcessTaskSubmitter.submitKeyedSerialTask(
                    lane,
                    laneKey,
                    "dualCrnStationOutExecute",
                    minIntervalMs,
                    () -> dualCrnStationOutExecute(wrkMast)
            );
        }
    }
@@ -338,40 +440,6 @@
        }
    }
    public void submitWatchCircleStationTasks(long minIntervalMs) {
        submitWatchCircleStationTasks(MainProcessLane.STATION_WATCH_CIRCLE, minIntervalMs);
    }
    public void submitWatchCircleStationTasks(MainProcessLane lane, long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            for (StationProtocol stationProtocol : stationThread.getStatus()) {
                Integer stationId = stationProtocol == null ? null : stationProtocol.getStationId();
                if (stationId == null) {
                    continue;
                }
                if (!stationProtocol.isAutoing()
                        || !stationProtocol.isLoading()
                        || stationProtocol.getTaskNo() <= 0
                        || !stationOutboundDecisionSupport.isWatchingCircleArrival(stationProtocol.getTaskNo(), stationProtocol.getStationId())) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "watchCircleStation",
                        minIntervalMs,
                        () -> watchCircleStation(basDevp, stationId)
                );
            }
        }
    }
    public void submitCheckStationRunBlockTasks(long minIntervalMs) {
        submitCheckStationRunBlockTasks(MainProcessLane.STATION_RUN_BLOCK, minIntervalMs);
    }
@@ -405,39 +473,6 @@
        }
    }
    public void submitCheckStationIdleRecoverTasks(long minIntervalMs) {
        submitCheckStationIdleRecoverTasks(MainProcessLane.STATION_IDLE_RECOVER, minIntervalMs);
    }
    public void submitCheckStationIdleRecoverTasks(MainProcessLane lane, long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            for (StationProtocol stationProtocol : stationThread.getStatus()) {
                Integer stationId = stationProtocol == null ? null : stationProtocol.getStationId();
                if (stationId == null) {
                    continue;
                }
                if (!stationProtocol.isAutoing()
                        || !stationProtocol.isLoading()
                        || stationProtocol.getTaskNo() <= 0
                        || stationProtocol.isRunBlock()) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "checkStationIdleRecover",
                        minIntervalMs,
                        () -> checkStationIdleRecover(basDevp, stationId)
                );
            }
        }
    }
    RerouteCommandPlan buildRerouteCommandPlan(RerouteContext context,
                                               RerouteDecision decision) {
        return stationRerouteProcessor.buildRerouteCommandPlan(context, decision);
@@ -456,10 +491,6 @@
                                            Integer stationId,
                                            List<Integer> runBlockReassignLocStationList) {
        return stationRerouteProcessor.shouldUseRunBlockDirectReassign(wrkMast, stationId, runBlockReassignLocStationList);
    }
    private boolean shouldSkipIdleRecoverForRecentDispatch(Integer taskNo, Integer stationId) {
        return stationRerouteProcessor.shouldSkipIdleRecoverForRecentDispatch(taskNo, stationId);
    }
    public void attemptClearTaskPath(StationThread stationThread, Integer taskNo) {