Junjie
3 天以前 3e793a6d2173889f4d006f2c8174f3eec4992745
src/main/java/com/zy/core/utils/StationOperateProcessUtils.java
@@ -1,19 +1,24 @@
package com.zy.core.utils;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.zy.asrs.entity.BasCrnp;
import com.zy.asrs.entity.BasDevp;
import com.zy.asrs.entity.BasStation;
import com.zy.asrs.entity.WrkMast;
import com.zy.asrs.service.BasDevpService;
import com.zy.asrs.service.BasStationService;
import com.zy.asrs.service.WrkAnalysisService;
import com.zy.asrs.service.WrkMastService;
import com.zy.asrs.utils.Utils;
import com.zy.asrs.service.*;
import com.zy.common.service.CommonService;
import com.zy.common.utils.RedisUtil;
import com.zy.core.News;
import com.zy.core.cache.SlaveConnection;
import com.zy.core.dispatch.StationCommandDispatcher;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
import com.zy.core.enums.StationCommandType;
import com.zy.core.enums.WrkIoType;
import com.zy.core.enums.WrkStsType;
import com.zy.core.model.StationObjModel;
import com.zy.core.model.command.StationCommand;
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.task.MainProcessLane;
import com.zy.core.task.MainProcessTaskSubmitter;
@@ -27,11 +32,15 @@
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Component
public class StationOperateProcessUtils {
    private static final String STATION_COMMAND_SOURCE = "station-operate-process";
    @Autowired
    private WrkMastService wrkMastService;
    @Autowired
@@ -52,10 +61,45 @@
    private MainProcessTaskSubmitter mainProcessTaskSubmitter;
    @Autowired
    private StationOutboundDecisionSupport stationOutboundDecisionSupport;
    @Autowired
    private BasCrnpService basCrnpService;
    @Autowired
    private CommonService commonService;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private StationCommandDispatcher stationCommandDispatcher;
    //执行输送站点入库任务
    public synchronized void stationInExecute() {
        stationRegularDispatchProcessor.stationInExecute();
    public void submitStationEnableInTasks(long minIntervalMs) {
        submitStationEnableInTasks(MainProcessLane.STATION_ENABLE_IN, minIntervalMs);
    }
    public void submitStationEnableInTasks(MainProcessLane lane,
                                           long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
            if (stationMap == null || stationMap.isEmpty()) {
                continue;
            }
            for (StationObjModel stationObjModel : basDevp.getInStationList$()) {
                Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
                if (stationId == null || !stationMap.containsKey(stationId)) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "stationEnableInExecute",
                        minIntervalMs,
                        () -> stationEnableInExecute(basDevp, stationObjModel)
                );
            }
        }
    }
    // 执行单个站点的入库任务下发
@@ -63,9 +107,62 @@
        stationRegularDispatchProcessor.stationInExecute(basDevp, stationObjModel);
    }
    //执行堆垛机输送站点出库任务
    public synchronized void crnStationOutExecute() {
        stationOutboundDispatchProcessor.crnStationOutExecute();
    // 执行单个站点的启动入库下发
    public void stationEnableInExecute(BasDevp basDevp, StationObjModel stationObjModel) {
        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
            return;
        }
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
        if (stationThread == null) {
            return;
        }
        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
        if (stationMap == null || stationMap.isEmpty()) {
            return;
        }
        Integer stationId = stationObjModel.getStationId();
        if (!stationMap.containsKey(stationId)) {
            return;
        }
        StationProtocol stationProtocol = stationMap.get(stationId);
        if (stationProtocol == null) {
            return;
        }
        Object lock = redisUtil.get(RedisKeyType.GENERATE_ENABLE_IN_STATION_DATA_LIMIT.key + stationId);
        if (lock != null) {
            return;
        }
        if (!stationProtocol.isAutoing()
                || !stationProtocol.isLoading()
                || stationProtocol.getTaskNo() != 0
                || !stationProtocol.isEnableIn()) {
            return;
        }
        Integer barcodeStationId = stationObjModel.getBarcodeStation() == null ? null : stationObjModel.getBarcodeStation().getStationId();
        if (barcodeStationId == null) {
            return;
        }
        StationCommand command = stationThread.getCommand(
                StationCommandType.MOVE,
                commonService.getWorkNo(WrkIoType.ENABLE_IN.id),
                stationId,
                barcodeStationId,
                0
        );
        stationCommandDispatcher.dispatch(basDevp.getDevpNo(), command, STATION_COMMAND_SOURCE, "enable-in");
        Utils.precomputeInTaskEnableRow(barcodeStationId);
        redisUtil.set(RedisKeyType.GENERATE_ENABLE_IN_STATION_DATA_LIMIT.key + stationId, "lock", 15);
        // 启动入库时删除退回控制key,允许后续异常时再次生成退回命令
        redisUtil.del(RedisKeyType.GENERATE_STATION_BACK_LIMIT.key + barcodeStationId);
        News.info("{}站点启动入库成功,数据包:{}", stationId, JSON.toJSONString(command));
    }
    // 执行单个出库任务对应的输送站点下发
@@ -73,88 +170,56 @@
        stationOutboundDispatchProcessor.crnStationOutExecute(wrkMast);
    }
    //执行双工位堆垛机输送站点出库任务
    public synchronized void dualCrnStationOutExecute() {
        stationOutboundDispatchProcessor.dualCrnStationOutExecute();
    }
    //检测输送站点出库任务执行完成
    public synchronized void stationOutExecuteFinish() {
        stationRegularDispatchProcessor.stationOutExecuteFinish();
    // 执行单个双工位出库任务对应的输送站点下发
    public void dualCrnStationOutExecute(WrkMast wrkMast) {
        stationOutboundDispatchProcessor.dualCrnStationOutExecute(wrkMast);
    }
    // 检测单个出库任务是否到达目标站台
    public void stationOutExecuteFinish(WrkMast wrkMast) {
        stationRegularDispatchProcessor.stationOutExecuteFinish(wrkMast);
    }
    // 检测入库任务是否到达站台并转为站台运行完成
    public synchronized void scanInboundStationArrival() {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("io_type", 1)
                .eq("wrk_sts", WrkStsType.INBOUND_STATION_RUN.sts)
                .isNotNull("sta_no"));
        for (WrkMast wrkMast : wrkMasts) {
            if (wrkMast == null || wrkMast.getWrkNo() == null || wrkMast.getStaNo() == null) {
                continue;
            }
            BasStation basStation = basStationService.getOne(new QueryWrapper<BasStation>()
                    .eq("station_id", wrkMast.getStaNo())
                    .last("limit 1"));
            if (basStation == null || basStation.getDeviceNo() == null) {
                continue;
            }
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basStation.getDeviceNo());
            if (stationThread == null) {
                continue;
            }
            Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
            StationProtocol stationProtocol = statusMap == null ? null : statusMap.get(basStation.getStationId());
            boolean arrived = stationProtocol != null
                    && wrkMast.getWrkNo().equals(stationProtocol.getTaskNo())
                    && stationProtocol.isLoading();
            if (!arrived && !stationThread.hasRecentArrival(basStation.getStationId(), wrkMast.getWrkNo())) {
                continue;
            }
            boolean updated = wrkAnalysisService.completeInboundStationRun(wrkMast, new Date());
            if (updated) {
                News.info("入库站点到达扫描命中,工作号={},目标站={}", wrkMast.getWrkNo(), wrkMast.getStaNo());
            }
        }
    public void stationOutExecuteFinish(StationObjModel stationObjModel) {
        stationRegularDispatchProcessor.stationOutExecuteFinish(stationObjModel);
    }
    // 检测单个入库任务是否到达目标站台
    public void scanInboundStationArrival(WrkMast wrkMast) {
        if (wrkMast == null || wrkMast.getWrkNo() == null || wrkMast.getStaNo() == null) {
    public void scanInboundStationArrival(StationObjModel stationObjModel) {
        if (stationObjModel == null) {
            return;
        }
        BasStation basStation = basStationService.getOne(new QueryWrapper<BasStation>()
                .eq("station_id", wrkMast.getStaNo())
                .last("limit 1"));
        if (basStation == null || basStation.getDeviceNo() == null) {
            return;
        }
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basStation.getDeviceNo());
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, stationObjModel.getDeviceNo());
        if (stationThread == null) {
            return;
        }
        Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
        StationProtocol stationProtocol = statusMap == null ? null : statusMap.get(basStation.getStationId());
        boolean arrived = stationProtocol != null
                && wrkMast.getWrkNo().equals(stationProtocol.getTaskNo())
                && stationProtocol.isLoading();
        if (!arrived && !stationThread.hasRecentArrival(basStation.getStationId(), wrkMast.getWrkNo())) {
        StationProtocol stationProtocol = statusMap == null ? null : statusMap.get(stationObjModel.getStationId());
        if (stationProtocol == null) {
            return;
        }
        if (stationProtocol.getTaskNo() <= 0) {
            return;
        }
        WrkMast wrkMast = wrkMastService.selectByWorkNo(stationProtocol.getTaskNo());
        if (wrkMast == null) {
            return;
        }
        if (!Objects.equals(wrkMast.getStaNo(), stationObjModel.getStationId())) {
            News.info("入库站点到达扫描忽略,工作号={},扫描站点={},任务目标站={},原因=target_mismatch",
                    wrkMast.getWrkNo(), stationObjModel.getStationId(), wrkMast.getStaNo());
            return;
        }
        if (!Objects.equals(wrkMast.getWrkSts(), WrkStsType.INBOUND_STATION_RUN.sts)) {
            News.info("入库站点到达扫描忽略,工作号={},扫描站点={},任务状态={},原因=wrk_sts_mismatch",
                    wrkMast.getWrkNo(), stationObjModel.getStationId(), wrkMast.getWrkSts());
            return;
        }
        News.info("入库站点到达扫描命中,工作号={},扫描站点={},目标站={},站点taskNo={},准备转状态3",
                wrkMast.getWrkNo(), stationObjModel.getStationId(), wrkMast.getStaNo(), stationProtocol.getTaskNo());
        boolean updated = wrkAnalysisService.completeInboundStationRun(wrkMast, new Date());
        if (updated) {
            News.info("入库站点到达扫描命中,工作号={},目标站={}", wrkMast.getWrkNo(), wrkMast.getStaNo());
            News.info("入库站点到达扫描完成,工作号={},目标站={},结果=updated_to_3", wrkMast.getWrkNo(), wrkMast.getStaNo());
        }
    }
    // 检测任务转完成
    public void checkTaskToComplete() {
        stationRegularDispatchProcessor.checkTaskToComplete();
        else {
            News.info("入库站点到达扫描结束,工作号={},目标站={},结果=skip_update", wrkMast.getWrkNo(), wrkMast.getStaNo());
        }
    }
    // 检测单个出库任务是否可以转完成
@@ -162,24 +227,9 @@
        stationRegularDispatchProcessor.checkTaskToComplete(wrkMast);
    }
    //检测输送站点是否运行堵塞
    public void checkStationRunBlock() {
        stationRerouteProcessor.checkStationRunBlock();
    }
    // 检测单个站点是否运行堵塞
    public void checkStationRunBlock(BasDevp basDevp, Integer stationId) {
        stationRerouteProcessor.checkStationRunBlock(basDevp, stationId);
    }
    //检测输送站点任务停留超时后重新计算路径
    public void checkStationIdleRecover() {
        stationRerouteProcessor.checkStationIdleRecover();
    }
    // 检测单个站点任务停留超时后的恢复处理
    public void checkStationIdleRecover(BasDevp basDevp, Integer stationId) {
        stationRerouteProcessor.checkStationIdleRecover(basDevp, stationId);
    }
    //获取输送线任务数量
@@ -200,24 +250,9 @@
                        WrkStsType.STATION_RUN.sts));
    }
    // 检测出库排序
    public synchronized void checkStationOutOrder() {
        stationRerouteProcessor.checkStationOutOrder();
    }
    // 检测单个站点的出库排序
    public void checkStationOutOrder(BasDevp basDevp, StationObjModel stationObjModel) {
        stationRerouteProcessor.checkStationOutOrder(basDevp, stationObjModel);
    }
    // 监控绕圈站点
    public synchronized void watchCircleStation() {
        stationRerouteProcessor.watchCircleStation();
    }
    // 监控单个绕圈站点
    public void watchCircleStation(BasDevp basDevp, Integer stationId) {
        stationRerouteProcessor.watchCircleStation(basDevp, stationId);
    }
    public void submitStationInTasks(long minIntervalMs) {
@@ -244,7 +279,9 @@
                if (stationProtocol == null
                        || !stationProtocol.isAutoing()
                        || !stationProtocol.isLoading()
                        || stationProtocol.getTaskNo() <= 0) {
                        || stationProtocol.getTaskNo() <= 0
                        || !stationProtocol.isInEnable()
                ) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
@@ -265,7 +302,51 @@
    public void submitCrnStationOutTasks(MainProcessLane lane, long minIntervalMs) {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("wrk_sts", WrkStsType.OUTBOUND_RUN_COMPLETE.sts)
                .isNotNull("crn_no"));
                .isNotNull("crn_no")
                .orderByAsc("io_time", "wrk_no"));
        MainProcessLane pendingConfirmLane = resolveStationOutPendingConfirmLane(lane);
        LinkedHashSet<Integer> sourceStationIdSet = new LinkedHashSet<>();
        for (WrkMast wrkMast : wrkMasts) {
            if (wrkMast == null || wrkMast.getWrkNo() == null) {
                continue;
            }
            mainProcessTaskSubmitter.submitKeyedSerialTask(
                    pendingConfirmLane,
                    wrkMast.getWrkNo(),
                    "confirmPendingCrnStationOutDispatch",
                    minIntervalMs,
                    () -> stationOutboundDispatchProcessor.confirmPendingCrnStationOutDispatch(wrkMast)
            );
            if (wrkMast.getSourceStaNo() != null) {
                sourceStationIdSet.add(wrkMast.getSourceStaNo());
            }
        }
        for (Integer sourceStationId : sourceStationIdSet) {
            mainProcessTaskSubmitter.submitKeyedSerialTask(
                    lane,
                    sourceStationId,
                    "dispatchNextCrnStationOutTask",
                    minIntervalMs,
                    () -> stationOutboundDispatchProcessor.dispatchNextCrnStationOutTask(sourceStationId)
            );
        }
    }
    private MainProcessLane resolveStationOutPendingConfirmLane(MainProcessLane lane) {
        if (lane == MainProcessLane.FAKE_STATION_OUT) {
            return MainProcessLane.FAKE_STATION_OUT_PENDING_CONFIRM;
        }
        return MainProcessLane.STATION_OUT_PENDING_CONFIRM;
    }
    public void submitDualCrnStationOutTasks(long minIntervalMs) {
        submitDualCrnStationOutTasks(MainProcessLane.DUAL_STATION_OUT, minIntervalMs);
    }
    public void submitDualCrnStationOutTasks(MainProcessLane lane, long minIntervalMs) {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("wrk_sts", WrkStsType.OUTBOUND_RUN_COMPLETE.sts)
                .isNotNull("dual_crn_no"));
        for (WrkMast wrkMast : wrkMasts) {
            Integer laneKey = wrkMast == null ? null : wrkMast.getSourceStaNo();
            if (laneKey == null) {
@@ -274,9 +355,9 @@
            mainProcessTaskSubmitter.submitKeyedSerialTask(
                    lane,
                    laneKey,
                    "crnStationOutExecute",
                    "dualCrnStationOutExecute",
                    minIntervalMs,
                    () -> crnStationOutExecute(wrkMast)
                    () -> dualCrnStationOutExecute(wrkMast)
            );
        }
    }
@@ -286,17 +367,17 @@
    }
    public void submitStationOutExecuteFinishTasks(MainProcessLane lane, long minIntervalMs) {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("wrk_sts", WrkStsType.STATION_RUN.sts)
                .isNotNull("sta_no"));
        for (WrkMast wrkMast : wrkMasts) {
            mainProcessTaskSubmitter.submitKeyedSerialTask(
                    lane,
                    wrkMast.getStaNo(),
                    "stationOutExecuteFinish",
                    minIntervalMs,
                    () -> stationOutExecuteFinish(wrkMast)
            );
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            for (StationObjModel stationObjModel : basDevp.getOutStationList$()) {
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationObjModel.getStationId(),
                        "stationOutExecuteFinish",
                        minIntervalMs,
                        () -> stationOutExecuteFinish(stationObjModel)
                );
            }
        }
    }
@@ -305,18 +386,22 @@
    }
    public void submitInboundStationArrivalTasks(MainProcessLane lane, long minIntervalMs) {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("io_type", 1)
                .eq("wrk_sts", WrkStsType.INBOUND_STATION_RUN.sts)
                .isNotNull("sta_no"));
        for (WrkMast wrkMast : wrkMasts) {
            mainProcessTaskSubmitter.submitKeyedSerialTask(
                    lane,
                    wrkMast.getStaNo(),
                    "scanInboundStationArrival",
                    minIntervalMs,
                    () -> scanInboundStationArrival(wrkMast)
            );
        List<BasCrnp> basCrnps = basCrnpService.list(new QueryWrapper<>());
        for (BasCrnp basCrnp : basCrnps) {
            Integer crnNo = basCrnp == null ? null : basCrnp.getCrnNo();
            if (crnNo == null) {
                continue;
            }
            for (StationObjModel stationObjModel : basCrnp.getInStationList$()) {
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationObjModel.getStationId(),
                        "scanInboundStationArrival",
                        minIntervalMs,
                        () -> scanInboundStationArrival(stationObjModel)
                );
            }
        }
    }
@@ -377,40 +462,6 @@
        }
    }
    public void submitWatchCircleStationTasks(long minIntervalMs) {
        submitWatchCircleStationTasks(MainProcessLane.STATION_WATCH_CIRCLE, minIntervalMs);
    }
    public void submitWatchCircleStationTasks(MainProcessLane lane, long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            for (StationProtocol stationProtocol : stationThread.getStatus()) {
                Integer stationId = stationProtocol == null ? null : stationProtocol.getStationId();
                if (stationId == null) {
                    continue;
                }
                if (!stationProtocol.isAutoing()
                        || !stationProtocol.isLoading()
                        || stationProtocol.getTaskNo() <= 0
                        || !stationOutboundDecisionSupport.isWatchingCircleArrival(stationProtocol.getTaskNo(), stationProtocol.getStationId())) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "watchCircleStation",
                        minIntervalMs,
                        () -> watchCircleStation(basDevp, stationId)
                );
            }
        }
    }
    public void submitCheckStationRunBlockTasks(long minIntervalMs) {
        submitCheckStationRunBlockTasks(MainProcessLane.STATION_RUN_BLOCK, minIntervalMs);
    }
@@ -444,39 +495,6 @@
        }
    }
    public void submitCheckStationIdleRecoverTasks(long minIntervalMs) {
        submitCheckStationIdleRecoverTasks(MainProcessLane.STATION_IDLE_RECOVER, minIntervalMs);
    }
    public void submitCheckStationIdleRecoverTasks(MainProcessLane lane, long minIntervalMs) {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
            if (stationThread == null) {
                continue;
            }
            for (StationProtocol stationProtocol : stationThread.getStatus()) {
                Integer stationId = stationProtocol == null ? null : stationProtocol.getStationId();
                if (stationId == null) {
                    continue;
                }
                if (!stationProtocol.isAutoing()
                        || !stationProtocol.isLoading()
                        || stationProtocol.getTaskNo() <= 0
                        || stationProtocol.isRunBlock()) {
                    continue;
                }
                mainProcessTaskSubmitter.submitKeyedSerialTask(
                        lane,
                        stationId,
                        "checkStationIdleRecover",
                        minIntervalMs,
                        () -> checkStationIdleRecover(basDevp, stationId)
                );
            }
        }
    }
    RerouteCommandPlan buildRerouteCommandPlan(RerouteContext context,
                                               RerouteDecision decision) {
        return stationRerouteProcessor.buildRerouteCommandPlan(context, decision);
@@ -497,7 +515,7 @@
        return stationRerouteProcessor.shouldUseRunBlockDirectReassign(wrkMast, stationId, runBlockReassignLocStationList);
    }
    private boolean shouldSkipIdleRecoverForRecentDispatch(Integer taskNo, Integer stationId) {
        return stationRerouteProcessor.shouldSkipIdleRecoverForRecentDispatch(taskNo, stationId);
    public void attemptClearTaskPath(StationThread stationThread, Integer taskNo) {
        stationRegularDispatchProcessor.attemptClearTaskPath(stationThread, taskNo);
    }
}