lsh
2026-04-21 720e0926fa1c94b952c26e111206c5d6e1ed5ba2
src/main/java/com/zy/asrs/task/WrkMastScheduler.java
@@ -1,21 +1,12 @@
package com.zy.asrs.task;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.zy.asrs.domain.enums.NotifyMsgType;
import com.zy.asrs.entity.BasStation;
import com.zy.asrs.entity.LocMast;
import com.zy.asrs.entity.WrkMastLog;
import com.zy.asrs.entity.WrkMast;
import com.zy.asrs.service.*;
import com.zy.asrs.utils.NotifyUtils;
import com.zy.core.cache.SlaveConnection;
import com.zy.core.enums.LocStsType;
import com.zy.core.enums.SlaveType;
import com.zy.core.enums.WrkIoType;
import com.zy.asrs.service.LocMastService;
import com.zy.asrs.service.WrkMastService;
import com.zy.core.enums.WrkStsType;
import com.zy.core.thread.StationThread;
import com.zy.core.utils.StationOperateProcessUtils;
import com.zy.core.task.MainProcessTaskSubmitter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -24,301 +15,61 @@
import java.util.Date;
import java.util.List;
@Component
@Slf4j
public class WrkMastScheduler {
    private static final long MIN_SUBMIT_INTERVAL_MS = 0L;
    private static final String WRK_MAST_FINALIZE_LANE_PREFIX = "wrk-mast-finalize-";
    private final WrkMastService wrkMastService;
    private final WrkMastLogService wrkMastLogService;
    private final WrkAnalysisService wrkAnalysisService;
    private final LocMastService locMastService;
    private final NotifyUtils notifyUtils;
    private final StationOperateProcessUtils stationOperateProcessUtils;
    private final BasStationService basStationService;
    private final MainProcessTaskSubmitter mainProcessTaskSubmitter;
    private final WrkMastFinalizeProcessor wrkMastFinalizeProcessor;
    public WrkMastScheduler(WrkMastService wrkMastService,
                            WrkMastLogService wrkMastLogService,
                            WrkAnalysisService wrkAnalysisService,
                            LocMastService locMastService,
                            NotifyUtils notifyUtils,
                            StationOperateProcessUtils stationOperateProcessUtils,
                            BasStationService basStationService
    ) {
                            MainProcessTaskSubmitter mainProcessTaskSubmitter,
                            WrkMastFinalizeProcessor wrkMastFinalizeProcessor) {
        this.wrkMastService = wrkMastService;
        this.wrkMastLogService = wrkMastLogService;
        this.wrkAnalysisService = wrkAnalysisService;
        this.locMastService = locMastService;
        this.notifyUtils = notifyUtils;
        this.stationOperateProcessUtils = stationOperateProcessUtils;
        this.basStationService = basStationService;
        this.mainProcessTaskSubmitter = mainProcessTaskSubmitter;
        this.wrkMastFinalizeProcessor = wrkMastFinalizeProcessor;
    }
    @Scheduled(cron = "0/1 * * * * ? ")
    @Transactional
    public void executeIn(){
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>().eq("wrk_sts", WrkStsType.COMPLETE_INBOUND.sts));
        if (wrkMasts.isEmpty()) {
            return;
        }
        for (WrkMast wrkMast : wrkMasts) {
            Integer taskNo = wrkMast.getWrkNo();
            Integer sourceStaNo = wrkMast.getSourceStaNo();
            Integer staNo = wrkMast.getStaNo();
            String locNo = wrkMast.getLocNo();
            LocMast locMast = locMastService.queryByLoc(locNo);
            if (locMast == null) {
                log.info("[workNo={}]库位不存在", wrkMast.getWrkNo());
                continue;
            }
            if (!locMast.getLocSts().equals("S")) {
                log.info("[workNo={}]库位状态不处于S", wrkMast.getWrkNo());
                continue;
            }
            locMast.setLocSts("F");
            locMast.setBarcode(wrkMast.getBarcode());
            locMast.setModiTime(new Date());
            boolean result = locMastService.updateById(locMast);
            if (!result) {
                log.info("[workNo={}]库位状态F更新失败", wrkMast.getWrkNo());
                continue;
            }
            // 保存工作主档历史档
            WrkMastLog wrkMastLog = wrkMastLogService.saveRecord(wrkMast.getWrkNo());
            if (wrkMastLog == null) {
                log.info("保存工作历史档[workNo={}]失败", wrkMast.getWrkNo());
            } else {
                wrkAnalysisService.finishTask(wrkMast, resolveFinishTime(wrkMast), wrkMastLog.getId());
            }
            // 删除工作主档
            if (!wrkMastService.removeById(wrkMast.getWrkNo())) {
                log.info("删除工作主档[workNo={}]失败", wrkMast.getWrkNo());
            }
            //上报
            notifyUtils.notify("task", 1, String.valueOf(wrkMast.getWrkNo()), wrkMast.getWmsWrkNo(), NotifyMsgType.TASK_COMPLETE, JSON.toJSONString(wrkMast));
            //清理路径
            List<BasStation> basStations = basStationService.list(new QueryWrapper<BasStation>().in("station_id", sourceStaNo, staNo));
            if (!basStations.isEmpty()) {
                for (BasStation basStation : basStations) {
                    StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basStation.getDeviceNo());
                    if (stationThread != null) {
                        stationOperateProcessUtils.attemptClearTaskPath(stationThread, taskNo);
                    }
                }
            }
        }
    public void executeIn() {
        submitByWrkNo(new QueryWrapper<WrkMast>().eq("wrk_sts", WrkStsType.COMPLETE_INBOUND.sts),
                "executeIn",
                wrkMastFinalizeProcessor::processCompleteInbound);
    }
    @Scheduled(cron = "0/1 * * * * ? ")
    @Transactional
    public void executeOut(){
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>().eq("wrk_sts", WrkStsType.COMPLETE_OUTBOUND.sts));
        if (wrkMasts.isEmpty()) {
            return;
        }
        for (WrkMast wrkMast : wrkMasts) {
            Integer taskNo = wrkMast.getWrkNo();
            Integer sourceStaNo = wrkMast.getSourceStaNo();
            Integer staNo = wrkMast.getStaNo();
            String locNo = wrkMast.getSourceLocNo();
            LocMast locMast = locMastService.queryByLoc(locNo);
            if (locMast == null) {
                log.info("[workNo={}]库位不存在", wrkMast.getWrkNo());
                continue;
            }
            if (!(locMast.getLocSts().equals("R") || locMast.getLocSts().equals("O"))) {
                log.info("[workNo={}]库位状态不处于R or O", wrkMast.getWrkNo());
                continue;
            }
            locMast.setLocSts("O");
            locMast.setBarcode("");
            locMast.setModiTime(new Date());
            boolean result = locMastService.updateById(locMast);
            if (!result) {
                log.info("[workNo={}]库位状态O更新失败", wrkMast.getWrkNo());
                continue;
            }
            // 保存工作主档历史档
            WrkMastLog wrkMastLog = wrkMastLogService.saveRecord(wrkMast.getWrkNo());
            if (wrkMastLog == null) {
                log.info("保存工作历史档[workNo={}]失败", wrkMast.getWrkNo());
            } else {
                wrkAnalysisService.finishTask(wrkMast, resolveFinishTime(wrkMast), wrkMastLog.getId());
            }
            // 删除工作主档
            if (!wrkMastService.removeById(wrkMast.getWrkNo())) {
                log.info("删除工作主档[workNo={}]失败", wrkMast.getWrkNo());
            }
            //上报
            notifyUtils.notify("task", 1, String.valueOf(wrkMast.getWrkNo()), wrkMast.getWmsWrkNo(), NotifyMsgType.TASK_COMPLETE, JSON.toJSONString(wrkMast));
            //清理路径
            List<BasStation> basStations = basStationService.list(new QueryWrapper<BasStation>().in("station_id", sourceStaNo, staNo));
            if (!basStations.isEmpty()) {
                for (BasStation basStation : basStations) {
                    StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basStation.getDeviceNo());
                    if (stationThread != null) {
                        stationOperateProcessUtils.attemptClearTaskPath(stationThread, taskNo);
                    }
                }
            }
        }
    public void executeOut() {
        submitByWrkNo(new QueryWrapper<WrkMast>().eq("wrk_sts", WrkStsType.COMPLETE_OUTBOUND.sts),
                "executeOut",
                wrkMastFinalizeProcessor::processCompleteOutbound);
    }
    @Scheduled(cron = "0/1 * * * * ? ")
    @Transactional
    public void executeLocMove(){
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>().eq("wrk_sts", WrkStsType.COMPLETE_LOC_MOVE.sts));
        if (wrkMasts.isEmpty()) {
            return;
        }
        for (WrkMast wrkMast : wrkMasts) {
            String sourceLocNo = wrkMast.getSourceLocNo();
            String locNo = wrkMast.getLocNo();
            LocMast locMast = locMastService.queryByLoc(locNo);
            if (locMast == null) {
                log.info("[workNo={}]库位不存在", wrkMast.getWrkNo());
                continue;
            }
            if (!locMast.getLocSts().equals("S")) {
                log.info("[workNo={}]库位状态不处于S", wrkMast.getWrkNo());
                continue;
            }
            LocMast sourceLocMast = locMastService.queryByLoc(sourceLocNo);
            if (sourceLocMast == null) {
                log.info("[workNo={}]库位不存在", wrkMast.getWrkNo());
                continue;
            }
            if (!sourceLocMast.getLocSts().equals("R")) {
                log.info("[workNo={}]库位状态不处于R", wrkMast.getWrkNo());
                continue;
            }
            locMast.setLocSts("F");
            locMast.setBarcode(wrkMast.getBarcode());
            locMast.setModiTime(new Date());
            boolean result = locMastService.updateById(locMast);
            if (!result) {
                log.info("[workNo={}]库位状态F更新失败", wrkMast.getWrkNo());
                continue;
            }
            sourceLocMast.setLocSts("O");
            sourceLocMast.setBarcode("");
            sourceLocMast.setModiTime(new Date());
            boolean result2 = locMastService.updateById(sourceLocMast);
            if (!result2) {
                log.info("[workNo={}]库位状态O更新失败", wrkMast.getWrkNo());
                continue;
            }
            // 保存工作主档历史档
            WrkMastLog wrkMastLog = wrkMastLogService.saveRecord(wrkMast.getWrkNo());
            if (wrkMastLog == null) {
                log.info("保存工作历史档[workNo={}]失败", wrkMast.getWrkNo());
            } else {
                wrkAnalysisService.finishTask(wrkMast, resolveFinishTime(wrkMast), wrkMastLog.getId());
            }
            // 删除工作主档
            if (!wrkMastService.removeById(wrkMast.getWrkNo())) {
                log.info("删除工作主档[workNo={}]失败", wrkMast.getWrkNo());
            }
            //上报
            notifyUtils.notify("task", 1, String.valueOf(wrkMast.getWrkNo()), wrkMast.getWmsWrkNo(), NotifyMsgType.TASK_COMPLETE, JSON.toJSONString(wrkMast));
        }
    public void executeLocMove() {
        submitByWrkNo(new QueryWrapper<WrkMast>().eq("wrk_sts", WrkStsType.COMPLETE_LOC_MOVE.sts),
                "executeLocMove",
                wrkMastFinalizeProcessor::processCompleteLocMove);
    }
    @Scheduled(cron = "0/1 * * * * ? ")
    @Transactional
    public void executeCrnMove(){
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>().eq("wrk_sts", WrkStsType.COMPLETE_CRN_MOVE.sts));
        if (wrkMasts.isEmpty()) {
            return;
        }
        for (WrkMast wrkMast : wrkMasts) {
            WrkMastLog wrkMastLog = wrkMastLogService.saveRecord(wrkMast.getWrkNo());
            if (wrkMastLog == null) {
                log.info("保存工作历史档[workNo={}]失败", wrkMast.getWrkNo());
            } else {
                wrkAnalysisService.finishTask(wrkMast, resolveFinishTime(wrkMast), wrkMastLog.getId());
            }
            if (!wrkMastService.removeById(wrkMast.getWrkNo())) {
                log.info("删除工作主档[workNo={}]失败", wrkMast.getWrkNo());
            }
        }
    public void executeCrnMove() {
        submitByWrkNo(new QueryWrapper<WrkMast>().eq("wrk_sts", WrkStsType.COMPLETE_CRN_MOVE.sts),
                "executeCrnMove",
                wrkMastFinalizeProcessor::processCompleteCrnMove);
    }
    @Scheduled(cron = "0/1 * * * * ? ")
    @Transactional
    public void executeCancelTask(){
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .in("mk", "taskCancel", "taskForceCancel"));
        if (wrkMasts.isEmpty()) {
            return;
        }
        for (WrkMast wrkMast : wrkMasts) {
            // 保存工作主档历史档
            WrkMastLog wrkMastLog = wrkMastLogService.saveRecord(wrkMast.getWrkNo());
            if (wrkMastLog == null) {
                log.info("保存工作历史档[workNo={}]失败", wrkMast.getWrkNo());
            } else {
                wrkAnalysisService.finishTask(wrkMast, resolveFinishTime(wrkMast), wrkMastLog.getId());
            }
            // 删除工作主档
            if (!wrkMastService.removeById(wrkMast.getWrkNo())) {
                log.info("删除工作主档[workNo={}]失败", wrkMast.getWrkNo());
            }
            if (wrkMast.getIoType() == WrkIoType.IN.id) {
                LocMast locMast = locMastService.queryByLoc(wrkMast.getLocNo());
                locMast.setLocSts(String.valueOf(LocStsType.O));
                locMast.setModiTime(new Date());
                locMastService.updateById(locMast);
            } else if (wrkMast.getIoType() == WrkIoType.OUT.id) {
                LocMast locMast = locMastService.queryByLoc(wrkMast.getSourceLocNo());
                locMast.setLocSts(String.valueOf(LocStsType.F));
                locMast.setModiTime(new Date());
                locMastService.updateById(locMast);
            } else if (wrkMast.getIoType() == WrkIoType.LOC_MOVE.id) {
                LocMast sourceLocMast = locMastService.queryByLoc(wrkMast.getSourceLocNo());
                LocMast locMast = locMastService.queryByLoc(wrkMast.getLocNo());
                if (sourceLocMast.getLocSts().equals(String.valueOf(LocStsType.R))) {
                    sourceLocMast.setLocSts(String.valueOf(LocStsType.F));
                    sourceLocMast.setModiTime(new Date());
                    locMastService.updateById(sourceLocMast);
                }
                if (locMast.getLocSts().equals(String.valueOf(LocStsType.S))) {
                    locMast.setLocSts(String.valueOf(LocStsType.O));
                    locMast.setModiTime(new Date());
                    locMastService.updateById(locMast);
                }
            }
            //上报
            notifyUtils.notify("task", 1, String.valueOf(wrkMast.getWrkNo()), wrkMast.getWmsWrkNo(), NotifyMsgType.TASK_CANCEL, JSON.toJSONString(wrkMast));
        }
    public void executeCancelTask() {
        submitByWrkNo(new QueryWrapper<WrkMast>().in("mk", "taskCancel", "taskForceCancel"),
                "executeCancelTask",
                wrkMastFinalizeProcessor::processCancelTask);
    }
    @Scheduled(cron = "0/1 * * * * ? ")
@@ -352,22 +103,37 @@
            boolean result = locMastService.updateById(locMast);
            if (!result) {
                log.info("[workNo={}]库位状态O更新失败", wrkMast.getWrkNo());
                continue;
            }
        }
    }
    private Date resolveFinishTime(WrkMast wrkMast) {
        if (wrkMast == null) {
            return new Date();
    private void submitByWrkNo(QueryWrapper<WrkMast> queryWrapper, String taskNamePrefix, WrkNoHandler handler) {
        List<WrkMast> wrkMasts = wrkMastService.list(queryWrapper);
        if (wrkMasts.isEmpty()) {
            return;
        }
        if (wrkMast.getModiTime() != null) {
            return wrkMast.getModiTime();
        for (WrkMast wrkMast : wrkMasts) {
            if (wrkMast == null || wrkMast.getWrkNo() == null || wrkMast.getWrkNo() <= 0) {
                log.error("WrkMastScheduler提交任务跳过,工作档为空或工作号非法。taskNamePrefix={}, wrkMast={}", taskNamePrefix, wrkMast);
                continue;
            }
            Integer wrkNo = wrkMast.getWrkNo();
            boolean submitted = mainProcessTaskSubmitter.submitKeyedSerialTask(
                    WRK_MAST_FINALIZE_LANE_PREFIX,
                    wrkNo,
                    taskNamePrefix + "-" + wrkNo,
                    MIN_SUBMIT_INTERVAL_MS,
                    () -> handler.handle(wrkNo)
            );
            if (!submitted) {
                log.error("WrkMastScheduler提交单任务处理失败。taskNamePrefix={}, wrkNo={}, lanePrefix={}",
                        taskNamePrefix, wrkNo, WRK_MAST_FINALIZE_LANE_PREFIX);
            }
        }
        if (wrkMast.getIoTime() != null) {
            return wrkMast.getIoTime();
        }
        return new Date();
    }
    @FunctionalInterface
    private interface WrkNoHandler {
        void handle(Integer wrkNo);
    }
}