Junjie
2026-04-13 b5cefd0727da0040ad7130c72f18df07643490cd
src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
@@ -1,27 +1,29 @@
package com.zy.core.utils;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.zy.asrs.entity.BasCrnp;
import com.zy.asrs.entity.BasDevp;
import com.zy.asrs.entity.BasStation;
import com.zy.asrs.entity.WrkMast;
import com.zy.asrs.service.BasDevpService;
import com.zy.asrs.service.BasStationService;
import com.zy.asrs.service.WrkAnalysisService;
import com.zy.asrs.service.WrkMastService;
import com.zy.asrs.utils.Utils;
import com.zy.asrs.service.*;
import com.zy.common.service.CommonService;
import com.zy.common.utils.RedisUtil;
import com.zy.core.News;
import com.zy.core.cache.SlaveConnection;
import com.zy.core.dispatch.StationCommandDispatcher;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
import com.zy.core.enums.StationCommandType;
import com.zy.core.enums.WrkIoType;
import com.zy.core.enums.WrkStsType;
import com.zy.core.model.StationObjModel;
import com.zy.core.model.command.StationCommand;
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.task.MainProcessLane;
import com.zy.core.task.MainProcessTaskSubmitter;
import com.zy.core.thread.StationThread;
import com.zy.core.utils.station.StationDispatchLoadSupport;
import com.zy.core.utils.station.StationOutboundDispatchProcessor;
import com.zy.core.utils.station.StationRegularDispatchProcessor;
import com.zy.core.utils.station.StationRerouteProcessor;
import com.zy.core.utils.station.*;
import com.zy.core.utils.station.model.RerouteCommandPlan;
import com.zy.core.utils.station.model.RerouteContext;
import com.zy.core.utils.station.model.RerouteDecision;
@@ -35,6 +37,8 @@
@Component
public class StationOperateProcessUtils {
    private static final String STATION_COMMAND_SOURCE = "station-operate-process";
    @Autowired
    private WrkMastService wrkMastService;
    @Autowired
@@ -53,10 +57,47 @@
    private StationRerouteProcessor stationRerouteProcessor;
    @Autowired
    private MainProcessTaskSubmitter mainProcessTaskSubmitter;
    @Autowired
    private StationOutboundDecisionSupport stationOutboundDecisionSupport;
    @Autowired
    private BasCrnpService basCrnpService;
    @Autowired
    private CommonService commonService;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private StationCommandDispatcher stationCommandDispatcher;
    //执行输送站点入库任务
    public synchronized void stationInExecute() {
        stationRegularDispatchProcessor.stationInExecute();
    public void submitStationEnableInTasks(long minIntervalMs) {
        submitStationEnableInTasks(MainProcessLane.STATION_ENABLE_IN, minIntervalMs);
    }
    public void submitStationEnableInTasks(MainProcessLane lane,
                                           long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
            if (stationMap == null || stationMap.isEmpty()) {
                continue;
            }
            for (StationObjModel stationObjModel : basDevp.getInStationList$()) {
                Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
                if (stationId == null || !stationMap.containsKey(stationId)) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "stationEnableInExecute",
                        minIntervalMs,
                        () -> stationEnableInExecute(basDevp, stationObjModel)
                );
            }
        }
    }
    // 执行单个站点的入库任务下发
@@ -64,9 +105,62 @@
        stationRegularDispatchProcessor.stationInExecute(basDevp, stationObjModel);
    }
    //执行堆垛机输送站点出库任务
    public synchronized void crnStationOutExecute() {
        stationOutboundDispatchProcessor.crnStationOutExecute();
    // 执行单个站点的启动入库下发
    public void stationEnableInExecute(BasDevp basDevp, StationObjModel stationObjModel) {
        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
            return;
        }
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
        if (stationThread == null) {
            return;
        }
        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
        if (stationMap == null || stationMap.isEmpty()) {
            return;
        }
        Integer stationId = stationObjModel.getStationId();
        if (!stationMap.containsKey(stationId)) {
            return;
        }
        StationProtocol stationProtocol = stationMap.get(stationId);
        if (stationProtocol == null) {
            return;
        }
        Object lock = redisUtil.get(RedisKeyType.GENERATE_ENABLE_IN_STATION_DATA_LIMIT.key + stationId);
        if (lock != null) {
            return;
        }
        if (!stationProtocol.isAutoing()
                || !stationProtocol.isLoading()
                || stationProtocol.getTaskNo() != 0
                || !stationProtocol.isEnableIn()) {
            return;
        }
        Integer barcodeStationId = stationObjModel.getBarcodeStation() == null ? null : stationObjModel.getBarcodeStation().getStationId();
        if (barcodeStationId == null) {
            return;
        }
        StationCommand command = stationThread.getCommand(
                StationCommandType.MOVE,
                commonService.getWorkNo(WrkIoType.ENABLE_IN.id),
                stationId,
                barcodeStationId,
                0
        );
        stationCommandDispatcher.dispatch(basDevp.getDevpNo(), command, STATION_COMMAND_SOURCE, "enable-in");
        Utils.precomputeInTaskEnableRow(barcodeStationId);
        redisUtil.set(RedisKeyType.GENERATE_ENABLE_IN_STATION_DATA_LIMIT.key + stationId, "lock", 15);
        // 启动入库时删除退回控制key,允许后续异常时再次生成退回命令
        redisUtil.del(RedisKeyType.GENERATE_STATION_BACK_LIMIT.key + barcodeStationId);
        News.info("{}站点启动入库成功,数据包:{}", stationId, JSON.toJSONString(command));
    }
    // 执行单个出库任务对应的输送站点下发
@@ -74,77 +168,35 @@
        stationOutboundDispatchProcessor.crnStationOutExecute(wrkMast);
    }
    //执行双工位堆垛机输送站点出库任务
    public synchronized void dualCrnStationOutExecute() {
        stationOutboundDispatchProcessor.dualCrnStationOutExecute();
    }
    //检测输送站点出库任务执行完成
    public synchronized void stationOutExecuteFinish() {
        stationRegularDispatchProcessor.stationOutExecuteFinish();
    // 执行单个双工位出库任务对应的输送站点下发
    public void dualCrnStationOutExecute(WrkMast wrkMast) {
        stationOutboundDispatchProcessor.dualCrnStationOutExecute(wrkMast);
    }
    // 检测单个出库任务是否到达目标站台
    public void stationOutExecuteFinish(WrkMast wrkMast) {
        stationRegularDispatchProcessor.stationOutExecuteFinish(wrkMast);
    }
    // 检测入库任务是否到达站台并转为站台运行完成
    public synchronized void scanInboundStationArrival() {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("io_type", 1)
                .eq("wrk_sts", WrkStsType.INBOUND_STATION_RUN.sts)
                .isNotNull("sta_no"));
        for (WrkMast wrkMast : wrkMasts) {
            if (wrkMast == null || wrkMast.getWrkNo() == null || wrkMast.getStaNo() == null) {
                continue;
            }
            BasStation basStation = basStationService.getOne(new QueryWrapper<BasStation>()
                    .eq("station_id", wrkMast.getStaNo())
                    .last("limit 1"));
            if (basStation == null || basStation.getDeviceNo() == null) {
                continue;
            }
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basStation.getDeviceNo());
            if (stationThread == null) {
                continue;
            }
            Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
            StationProtocol stationProtocol = statusMap == null ? null : statusMap.get(basStation.getStationId());
            boolean arrived = stationProtocol != null
                    && wrkMast.getWrkNo().equals(stationProtocol.getTaskNo())
                    && stationProtocol.isLoading();
            if (!arrived && !stationThread.hasRecentArrival(basStation.getStationId(), wrkMast.getWrkNo())) {
                continue;
            }
            boolean updated = wrkAnalysisService.completeInboundStationRun(wrkMast, new Date());
            if (updated) {
                News.info("入库站点到达扫描命中,工作号={},目标站={}", wrkMast.getWrkNo(), wrkMast.getStaNo());
            }
        }
    public void stationOutExecuteFinish(StationObjModel stationObjModel) {
        stationRegularDispatchProcessor.stationOutExecuteFinish(stationObjModel);
    }
    // 检测单个入库任务是否到达目标站台
    public void scanInboundStationArrival(WrkMast wrkMast) {
        if (wrkMast == null || wrkMast.getWrkNo() == null || wrkMast.getStaNo() == null) {
    public void scanInboundStationArrival(StationObjModel stationObjModel) {
        if (stationObjModel == null) {
            return;
        }
        BasStation basStation = basStationService.getOne(new QueryWrapper<BasStation>()
                .eq("station_id", wrkMast.getStaNo())
                .last("limit 1"));
        if (basStation == null || basStation.getDeviceNo() == null) {
            return;
        }
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basStation.getDeviceNo());
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, stationObjModel.getDeviceNo());
        if (stationThread == null) {
            return;
        }
        Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
        StationProtocol stationProtocol = statusMap == null ? null : statusMap.get(basStation.getStationId());
        boolean arrived = stationProtocol != null
                && wrkMast.getWrkNo().equals(stationProtocol.getTaskNo())
                && stationProtocol.isLoading();
        if (!arrived && !stationThread.hasRecentArrival(basStation.getStationId(), wrkMast.getWrkNo())) {
        StationProtocol stationProtocol = statusMap == null ? null : statusMap.get(stationObjModel.getStationId());
        if (stationProtocol == null) {
            return;
        }
        if (stationProtocol.getTaskNo() <= 0) {
            return;
        }
        WrkMast wrkMast = wrkMastService.selectByWorkNo(stationProtocol.getTaskNo());
        if (wrkMast == null) {
            return;
        }
        boolean updated = wrkAnalysisService.completeInboundStationRun(wrkMast, new Date());
@@ -153,19 +205,9 @@
        }
    }
    // 检测任务转完成
    public synchronized void checkTaskToComplete() {
        stationRegularDispatchProcessor.checkTaskToComplete();
    }
    // 检测单个出库任务是否可以转完成
    public void checkTaskToComplete(WrkMast wrkMast) {
        stationRegularDispatchProcessor.checkTaskToComplete(wrkMast);
    }
    //检测输送站点是否运行堵塞
    public synchronized void checkStationRunBlock() {
        stationRerouteProcessor.checkStationRunBlock();
    }
    // 检测单个站点是否运行堵塞
@@ -173,22 +215,12 @@
        stationRerouteProcessor.checkStationRunBlock(basDevp, stationId);
    }
    //检测输送站点任务停留超时后重新计算路径
    public synchronized void checkStationIdleRecover() {
        stationRerouteProcessor.checkStationIdleRecover();
    }
    // 检测单个站点任务停留超时后的恢复处理
    public void checkStationIdleRecover(BasDevp basDevp, Integer stationId) {
        stationRerouteProcessor.checkStationIdleRecover(basDevp, stationId);
    }
    //获取输送线任务数量
    public int getCurrentStationTaskCount() {
        return stationDispatchLoadSupport.countCurrentStationTask();
    }
    public synchronized int getCurrentOutboundTaskCountByTargetStation(Integer stationId) {
    public int getCurrentOutboundTaskCountByTargetStation(Integer stationId) {
        if (stationId == null) {
            return 0;
        }
@@ -201,24 +233,9 @@
                        WrkStsType.STATION_RUN.sts));
    }
    // 检测出库排序
    public synchronized void checkStationOutOrder() {
        stationRerouteProcessor.checkStationOutOrder();
    }
    // 检测单个站点的出库排序
    public void checkStationOutOrder(BasDevp basDevp, StationObjModel stationObjModel) {
        stationRerouteProcessor.checkStationOutOrder(basDevp, stationObjModel);
    }
    // 监控绕圈站点
    public synchronized void watchCircleStation() {
        stationRerouteProcessor.watchCircleStation();
    }
    // 监控单个绕圈站点
    public void watchCircleStation(BasDevp basDevp, Integer stationId) {
        stationRerouteProcessor.watchCircleStation(basDevp, stationId);
    }
    public void submitStationInTasks(long minIntervalMs) {
@@ -245,7 +262,9 @@
                if (stationProtocol == null
                        || !stationProtocol.isAutoing()
                        || !stationProtocol.isLoading()
                        || stationProtocol.getTaskNo() <= 0) {
                        || stationProtocol.getTaskNo() <= 0
                        || !stationProtocol.isInEnable()
                ) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
@@ -282,22 +301,45 @@
        }
    }
    public void submitDualCrnStationOutTasks(long minIntervalMs) {
        submitDualCrnStationOutTasks(MainProcessLane.DUAL_STATION_OUT, minIntervalMs);
    }
    public void submitDualCrnStationOutTasks(MainProcessLane lane, long minIntervalMs) {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("wrk_sts", WrkStsType.OUTBOUND_RUN_COMPLETE.sts)
                .isNotNull("dual_crn_no"));
        for (WrkMast wrkMast : wrkMasts) {
            Integer laneKey = wrkMast == null ? null : wrkMast.getSourceStaNo();
            if (laneKey == null) {
                laneKey = wrkMast == null ? null : wrkMast.getWrkNo();
            }
            mainProcessTaskSubmitter.submitKeyedSerialTask(
                    lane,
                    laneKey,
                    "dualCrnStationOutExecute",
                    minIntervalMs,
                    () -> dualCrnStationOutExecute(wrkMast)
            );
        }
    }
    public void submitStationOutExecuteFinishTasks(long minIntervalMs) {
        submitStationOutExecuteFinishTasks(MainProcessLane.STATION_OUT_FINISH, minIntervalMs);
    }
    public void submitStationOutExecuteFinishTasks(MainProcessLane lane, long minIntervalMs) {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("wrk_sts", WrkStsType.STATION_RUN.sts)
                .isNotNull("sta_no"));
        for (WrkMast wrkMast : wrkMasts) {
            mainProcessTaskSubmitter.submitKeyedSerialTask(
                    lane,
                    wrkMast.getStaNo(),
                    "stationOutExecuteFinish",
                    minIntervalMs,
                    () -> stationOutExecuteFinish(wrkMast)
            );
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            for (StationObjModel stationObjModel : basDevp.getOutStationList$()) {
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationObjModel.getStationId(),
                        "stationOutExecuteFinish",
                        minIntervalMs,
                        () -> stationOutExecuteFinish(stationObjModel)
                );
            }
        }
    }
@@ -306,18 +348,22 @@
    }
    public void submitInboundStationArrivalTasks(MainProcessLane lane, long minIntervalMs) {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("io_type", 1)
                .eq("wrk_sts", WrkStsType.INBOUND_STATION_RUN.sts)
                .isNotNull("sta_no"));
        for (WrkMast wrkMast : wrkMasts) {
            mainProcessTaskSubmitter.submitKeyedSerialTask(
                    lane,
                    wrkMast.getStaNo(),
                    "scanInboundStationArrival",
                    minIntervalMs,
                    () -> scanInboundStationArrival(wrkMast)
            );
        List<BasCrnp> basCrnps = basCrnpService.list(new QueryWrapper<>());
        for (BasCrnp basCrnp : basCrnps) {
            Integer crnNo = basCrnp == null ? null : basCrnp.getCrnNo();
            if (crnNo == null) {
                continue;
            }
            for (StationObjModel stationObjModel : basCrnp.getInStationList$()) {
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationObjModel.getStationId(),
                        "scanInboundStationArrival",
                        minIntervalMs,
                        () -> scanInboundStationArrival(stationObjModel)
                );
            }
        }
    }
@@ -347,9 +393,24 @@
    public void submitCheckStationOutOrderTasks(MainProcessLane lane, long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            for (StationObjModel stationObjModel : basDevp.getOutOrderList$()) {
                Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
                if (stationId == null) {
                    continue;
                }
                Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
                StationProtocol stationProtocol = statusMap.get(stationId);
                if (stationProtocol == null
                        || !stationProtocol.isAutoing()
                        || !stationProtocol.isLoading()
                        || stationProtocol.getTaskNo() <= 0
                        || stationProtocol.isRunBlock()
                        || !stationProtocol.getStationId().equals(stationProtocol.getTargetStaNo())) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
@@ -358,33 +419,6 @@
                        "checkStationOutOrder",
                        minIntervalMs,
                        () -> checkStationOutOrder(basDevp, stationObjModel)
                );
            }
        }
    }
    public void submitWatchCircleStationTasks(long minIntervalMs) {
        submitWatchCircleStationTasks(MainProcessLane.STATION_WATCH_CIRCLE, minIntervalMs);
    }
    public void submitWatchCircleStationTasks(MainProcessLane lane, long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            for (StationProtocol stationProtocol : stationThread.getStatus()) {
                Integer stationId = stationProtocol == null ? null : stationProtocol.getStationId();
                if (stationId == null) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "watchCircleStation",
                        minIntervalMs,
                        () -> watchCircleStation(basDevp, stationId)
                );
            }
        }
@@ -406,39 +440,18 @@
                if (stationId == null) {
                    continue;
                }
                if (!stationProtocol.isAutoing()
                        || !stationProtocol.isLoading()
                        || stationProtocol.getTaskNo() <= 0
                        || !stationProtocol.isRunBlock()) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "checkStationRunBlock",
                        minIntervalMs,
                        () -> checkStationRunBlock(basDevp, stationId)
                );
            }
        }
    }
    public void submitCheckStationIdleRecoverTasks(long minIntervalMs) {
        submitCheckStationIdleRecoverTasks(MainProcessLane.STATION_IDLE_RECOVER, minIntervalMs);
    }
    public void submitCheckStationIdleRecoverTasks(MainProcessLane lane, long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            for (StationProtocol stationProtocol : stationThread.getStatus()) {
                Integer stationId = stationProtocol == null ? null : stationProtocol.getStationId();
                if (stationId == null) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "checkStationIdleRecover",
                        minIntervalMs,
                        () -> checkStationIdleRecover(basDevp, stationId)
                );
            }
        }
@@ -464,7 +477,7 @@
        return stationRerouteProcessor.shouldUseRunBlockDirectReassign(wrkMast, stationId, runBlockReassignLocStationList);
    }
    private boolean shouldSkipIdleRecoverForRecentDispatch(Integer taskNo, Integer stationId) {
        return stationRerouteProcessor.shouldSkipIdleRecoverForRecentDispatch(taskNo, stationId);
    public void attemptClearTaskPath(StationThread stationThread, Integer taskNo) {
        stationRegularDispatchProcessor.attemptClearTaskPath(stationThread, taskNo);
    }
}