Junjie
2026-04-15 fa2d5c5b4fc5cda69fb9a8534dc8dedcda479a76
src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
@@ -6,18 +6,26 @@
import com.core.common.Cools;
import com.zy.asrs.domain.param.CreateInTaskParam;
import com.zy.asrs.entity.BasDevp;
import com.zy.asrs.entity.WrkLastno;
import com.zy.asrs.entity.WrkMast;
import com.zy.asrs.service.BasDevpService;
import com.zy.asrs.service.WrkLastnoService;
import com.zy.asrs.service.WrkMastService;
import com.zy.common.model.StartupDto;
import com.zy.common.service.CommonService;
import com.zy.common.utils.RedisUtil;
import com.zy.core.News;
import com.zy.core.cache.SlaveConnection;
import com.zy.core.dispatch.StationCommandDispatchResult;
import com.zy.core.dispatch.StationCommandDispatcher;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
import com.zy.core.enums.StationCommandType;
import com.zy.core.enums.WrkIoType;
import com.zy.core.model.StationObjModel;
import com.zy.core.model.command.StationCommand;
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.task.MainProcessLane;
import com.zy.core.task.MainProcessTaskSubmitter;
import com.zy.core.thread.StationThread;
import com.zy.core.utils.StationOperateProcessUtils;
import com.zy.core.utils.WmsOperateUtils;
@@ -30,9 +38,9 @@
@Service
public class StoreInTaskGenerationService {
    private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5;
    private static final int APPLY_FAIL_STATION_BACK_LOCK_SECONDS = 30;
    @Autowired
    private BasDevpService basDevpService;
    @Autowired
    private WrkMastService wrkMastService;
    @Autowired
@@ -43,148 +51,424 @@
    private WmsOperateUtils wmsOperateUtils;
    @Autowired
    private CommonService commonService;
    @Autowired
    private MainProcessTaskSubmitter mainProcessTaskSubmitter;
    @Autowired
    private StationCommandDispatcher stationCommandDispatcher;
    @Autowired
    private WrkLastnoService wrkLastnoService;
    public void generate(StoreInTaskPolicy policy) {
    /**
     * 保留当前按站点 lane 并发的能力,同时用一个简单计数避免并发生成把站点任务数顶穿上限。
     */
    private int inFlightGenerateCount = 0;
    public void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel) {
        try {
            if (!policy.isEnabled()) {
                return;
            }
            Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
            if (systemConfigMapObj == null) {
                return;
            }
            HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
            int conveyorStationTaskLimit = 30;
            String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit");
            if (conveyorStationTaskLimitStr != null) {
                conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr);
            }
            int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
            if (currentStationTaskCount > conveyorStationTaskLimit) {
                News.error("输送站点任务已达到上限,上限值:{},站点任务数:{}", conveyorStationTaskLimit, currentStationTaskCount);
            StoreInTaskContext earlyContext = buildContext(basDevp, stationObjModel);
            if (earlyContext == null || !handleErrorStationBack(policy, earlyContext)) {
                return;
            }
            List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
            for (BasDevp basDevp : basDevps) {
                StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
                if (stationThread == null) {
                    continue;
                }
                Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
                List<StationObjModel> barcodeStations = policy.getBarcodeStations(basDevp);
                for (StationObjModel stationObjModel : barcodeStations) {
                    Integer stationId = stationObjModel.getStationId();
                    if (!stationMap.containsKey(stationId)) {
                        continue;
                    }
                    StationProtocol stationProtocol = stationMap.get(stationId);
                    if (stationProtocol == null) {
                        continue;
                    }
                    StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel,
                            stationProtocol);
                    if (!policy.matchCandidate(context)) {
                        continue;
                    }
                    if (!policy.beforeApply(context)) {
                        continue;
                    }
                    List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                            .eq("barcode", stationProtocol.getBarcode()));
                    if (!wrkMasts.isEmpty()) {
                        continue;
                    }
                    String generateLockKey = policy.getGenerateLockKey(context);
                    Object lock = redisUtil.get(generateLockKey);
                    if (lock != null) {
                        continue;
                    }
                    policy.onRequestPermitGranted(context);
                    InTaskApplyRequest request = policy.buildApplyRequest(context);
                    AsyncInTaskResult result = wmsOperateUtils.queryAsyncInTaskResponse(request);
                    if (result != null) {
                        handleApplyResult(policy, context, request, result);
                        continue;
                    }
                    if (wmsOperateUtils.isAsyncRequestInProgress(request)) {
                        continue;
                    }
                    News.info("发起异步WMS入库请求,barcode={},stationId={}", request.getBarcode(),
                            request.getSourceStaNo());
                    wmsOperateUtils.applyInTaskAsync(request);
                    redisUtil.set(generateLockKey, "lock", policy.getSubmitLockSeconds(context));
//                    policy.onApplySubmitted(context);
                }
            HashMap<String, String> systemConfigMap = getSystemConfigMap();
            if (systemConfigMap == null) {
                return;
            }
            if (!hasAvailableStationTaskCapacity(systemConfigMap)) {
                return;
            }
            generateByStation(policy, basDevp, stationObjModel, systemConfigMap);
        } catch (Exception e) {
            News.error("生成入库任务异常,policy={}", policy.getPolicyName(), e);
            Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
            News.error("生成入库任务异常,policy={},stationId={}", policy.getPolicyName(), stationId, e);
        }
    }
    private void handleApplyResult(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                   AsyncInTaskResult result) {
        if (result.isSuccess()) {
            handleApplySuccess(policy, context, request, result);
            return;
        }
        if (result.isRetryableFailure()) {
            News.error("WMS入库请求失败,重新发起请求,barcode={},stationId={},response={}",
                    request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result));
            wmsOperateUtils.clearAsyncInTaskResponse(request);
            wmsOperateUtils.applyInTaskAsync(request);
            redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context));
            policy.onApplyFailed(context, result);
            return;
        }
        policy.onApplyFailed(context, result);
    public void submitGenerateStoreTask(StoreInTaskPolicy policy,
                                        BasDevp basDevp,
                                        StationObjModel stationObjModel,
                                        long minIntervalMs,
                                        Runnable task) {
        submitGenerateStoreTask(policy, basDevp, stationObjModel, MainProcessLane.GENERATE_STORE, minIntervalMs, task);
    }
    private void handleApplySuccess(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                    AsyncInTaskResult result) {
    public void submitGenerateStoreTask(StoreInTaskPolicy policy,
                                        BasDevp basDevp,
                                        StationObjModel stationObjModel,
                                        MainProcessLane lane,
                                        long minIntervalMs,
                                        Runnable task) {
        Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
        mainProcessTaskSubmitter.submitKeyedSerialTask(
                lane,
                stationId,
                "generateStoreWrkFile",
                minIntervalMs,
                task
        );
    }
    private void generateByStation(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel,
                                   HashMap<String, String> systemConfigMap) {
        StoreInTaskContext context = buildContext(basDevp, stationObjModel);
        if (context == null) {
            return;
        }
        StationProtocol stationProtocol = context.getStationProtocol();
        if (stationProtocol == null) {
            return;
        }
        if (!stationProtocol.isAutoing()) {
            return;
        }
        if (!stationProtocol.isLoading()) {
            return;
        }
        if (!stationProtocol.isInEnable()) {
            return;
        }
        if (stationProtocol.getTaskNo() == 0) {
            return;
        }
        if (Cools.isEmpty(stationProtocol.getBarcode())) {
            return;
        }
        if (stationProtocol.getError() > 0) {
            return;
        }
        if (stationProtocol.isInBarcodeError()) {
            return;
        }
        if (!stationProtocol.getIoMode().equals(1)) {
            policy.setSystemWarning(context, "当前站点不处于入库模式");
            return;
        }
        String barcode = context.getStationProtocol().getBarcode();
        long count = wrkMastService.count(new QueryWrapper<WrkMast>().eq("barcode", barcode));
        if (count > 0) {
            Object tipsLimit = redisUtil.get(RedisKeyType.GENERATE_IN_TASK_SUCCESS_REPEAT_WARNING_TIPS_LIMIT.key + barcode);
            if (tipsLimit == null) {
                policy.setSystemWarning(context, "系统任务已存在");
            }
            return;
        }
        if (redisUtil.get(policy.getGenerateLockKey(context)) != null) {
            return;
        }
        if (!tryReserveGenerateCapacity(systemConfigMap)) {
            return;
        }
        InTaskApplyRequest request = policy.buildApplyRequest(context);
        try {
            JSONObject jsonObject = JSON.parseObject(result.getResponse());
            policy.onRequestPermitGranted(context);
            policy.setSystemWarning(context, "请求WMS中");
            News.info("发起同步WMS入库请求,barcode={},stationId={},timeout={}s",
                    request.getBarcode(), request.getSourceStaNo(), APPLY_IN_TASK_TIMEOUT_SECONDS);
            String response = wmsOperateUtils.applyInTask(request);
            handleSyncApplyResponse(policy, context, request, response);
        } finally {
            releaseGenerateCapacity();
        }
    }
    private boolean handleErrorStationBack(StoreInTaskPolicy policy, StoreInTaskContext context) {
        StationProtocol stationProtocol = context.getStationProtocol();
        if (stationProtocol == null) {
            return false;
        }
        if (!stationProtocol.isAutoing()) {
            return false;
        }
        if (!stationProtocol.isLoading()) {
            return false;
        }
        if (stationProtocol.getError() <= 0) {
            return true;
        }
        if (!stationProtocol.isInBarcodeError()) {
            return true;
        }
        WrkLastno stationBackTaskRange = wrkLastnoService.getById(WrkIoType.STATION_BACK.id);
        Integer currentTaskNo = stationProtocol.getTaskNo();
        if (currentTaskNo != null
                && currentTaskNo > 0
                && stationBackTaskRange != null
                && stationBackTaskRange.getsNo() != null
                && stationBackTaskRange.geteNo() != null
                && currentTaskNo >= stationBackTaskRange.getsNo()
                && currentTaskNo <= stationBackTaskRange.geteNo()) {
            News.info("条码站已处于退回工作号范围,跳过重复生成退回命令。stationId={},taskNo={},range=[{}, {}]",
                    stationProtocol.getStationId(),
                    currentTaskNo,
                    stationBackTaskRange.getsNo(),
                    stationBackTaskRange.geteNo());
            return false;
        }
        StationObjModel backStation = context.getStationObjModel().getBackStation();
        if (backStation == null || backStation.getStationId() == null) {
            News.warn("条码站退回失败,退回站未配置。deviceNo={},stationId={}",
                    context.getBasDevp() == null ? null : context.getBasDevp().getDevpNo(),
                    stationProtocol.getStationId());
            return false;
        }
        if (stationProtocol.getTaskNo() != null
                && stationProtocol.getTaskNo() > 0
                && backStation.getStationId().equals(stationProtocol.getTargetStaNo())) {
            return false;
        }
        Object lock = redisUtil.get(RedisKeyType.GENERATE_STATION_BACK_LIMIT.key + stationProtocol.getStationId());
        if (lock != null) {
            return false;
        }
        StationCommand command = context.getStationThread().getCommand(StationCommandType.MOVE,
                commonService.getWorkNo(WrkIoType.STATION_BACK.id),
                context.getStationObjModel().getStationId(),
                backStation.getStationId(), 0);
        if (command == null) {
            News.taskInfo(stationProtocol.getTaskNo(), "{}工作,获取输送线命令失败", stationProtocol.getTaskNo());
            return false;
        }
        stationCommandDispatcher.dispatch(context.getBasDevp().getDevpNo(), command, "store-in-task", "station-back");
        String errorMsg = Cools.isEmpty(stationProtocol.getErrorMsg()) ? "未知异常" : stationProtocol.getErrorMsg();
        String warning = "条码站异常退回,报警信息:" + errorMsg;
        if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
            warning = stationProtocol.getSystemWarning() + ";" + warning;
        }
        policy.setSystemWarning(context, warning);
        News.info("{}扫码站异常,已退回至{},条码站状态:{}", stationProtocol.getTaskNo(),
                backStation.getStationId(), JSON.toJSONString(stationProtocol));
        redisUtil.set(RedisKeyType.GENERATE_STATION_BACK_LIMIT.key + stationProtocol.getStationId(),
                "lock", 60 * 60);
        return false;
    }
    private StoreInTaskContext buildContext(BasDevp basDevp, StationObjModel stationObjModel) {
        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
            return null;
        }
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
        if (stationThread == null) {
            return null;
        }
        Integer stationId = stationObjModel.getStationId();
        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
        if (stationMap == null || !stationMap.containsKey(stationId)) {
            return null;
        }
        StationProtocol stationProtocol = stationMap.get(stationId);
        if (stationProtocol == null) {
            return null;
        }
        return new StoreInTaskContext(basDevp, stationThread, stationObjModel, stationProtocol);
    }
    private void handleSyncApplyResponse(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                         String response) {
        if (Cools.isEmpty(response)) {
            markApplyFailed(policy, context, request, null, "FAILED");
            return;
        }
        try {
            JSONObject jsonObject = JSON.parseObject(response);
            if (jsonObject == null || !Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
                AsyncInTaskResult failResult = new AsyncInTaskResult();
                failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
                failResult.setResponse(result.getResponse());
                failResult.setMessage("WMS返回非200");
                handleApplyResult(policy, context, request, failResult);
                markApplyFailed(policy, context, request, response, "WMS返回非200");
                return;
            }
            StartupDto dto = jsonObject.getObject("data", StartupDto.class);
            if (dto == null) {
                AsyncInTaskResult failResult = new AsyncInTaskResult();
                failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
                failResult.setResponse(result.getResponse());
                failResult.setMessage("WMS返回data为空");
                handleApplyResult(policy, context, request, failResult);
                markApplyFailed(policy, context, request, response, "WMS返回data为空");
                return;
            }
            CreateInTaskParam taskParam = policy.buildCreateInTaskParam(context, dto);
            WrkMast wrkMast = commonService.createInTask(taskParam);
            policy.afterTaskCreated(context, wrkMast);
            context.getStationProtocol().setSystemWarning("");
            wmsOperateUtils.clearAsyncInTaskResponse(request);
            policy.clearSystemWarning(context);
            redisUtil.set(RedisKeyType.GENERATE_IN_TASK_SUCCESS_REPEAT_WARNING_TIPS_LIMIT.key + wrkMast.getBarcode(), "lock", 30);
        } catch (Exception e) {
            News.error("处理WMS入库成功响应失败,barcode={},stationId={}", request.getBarcode(),
            News.error("处理WMS入库响应异常,barcode={},stationId={}", request.getBarcode(),
                    request.getSourceStaNo(), e);
            markApplyFailed(policy, context, request, response, e.getMessage());
        }
    }
    private void markApplyFailed(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                 String response, String message) {
        InTaskApplyResult result = new InTaskApplyResult();
        result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
        result.setResponse(response);
        result.setMessage(message);
        News.error("WMS入库请求失败,barcode={},stationId={},response={},WCS响应={}",
                request.getBarcode(), request.getSourceStaNo(), result.getResponse(), result.getMessage());
        redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context));
        policy.onApplyFailed(context, result);
        triggerStationBackOnApplyFailed(policy, context, request, result);
    }
    /**
     * WMS 申请入库失败后,货物仍停留在扫码站,此时补发退回到入库站的输送命令,避免货物长期滞留。
     */
    private void triggerStationBackOnApplyFailed(StoreInTaskPolicy policy, StoreInTaskContext context,
                                                 InTaskApplyRequest request,
                                                 InTaskApplyResult result) {
        if (context == null || context.getStationThread() == null || context.getStationObjModel() == null) {
            return;
        }
        StationProtocol stationProtocol = context.getStationProtocol();
        if (stationProtocol == null || stationProtocol.getStationId() == null) {
            return;
        }
        StationObjModel backStation = context.getStationObjModel().getBackStation();
        if (backStation == null || backStation.getStationId() == null) {
            News.warn("WMS入库失败后无法退回入库站,退回站未配置。barcode={},stationId={}",
                    request == null ? null : request.getBarcode(), stationProtocol.getStationId());
            return;
        }
        Integer currentStationId = stationProtocol.getStationId();
        Integer backStationId = backStation.getStationId();
        if (backStationId.equals(currentStationId)) {
            return;
        }
        if (stationProtocol.getTaskNo() != null
                && stationProtocol.getTaskNo() > 0
                && backStationId.equals(stationProtocol.getTargetStaNo())) {
            return;
        }
        String lockKey = RedisKeyType.GENERATE_STATION_BACK_LIMIT.key
                + currentStationId + "_" + stationProtocol.getTaskNo();
        if (redisUtil.get(lockKey) != null) {
            return;
        }
        Integer stationBackTaskNo = commonService.getWorkNo(WrkIoType.STATION_BACK.id);
        StationCommand command = context.getStationThread().getCommand(
                StationCommandType.MOVE,
                stationBackTaskNo,
                currentStationId,
                backStationId,
                0
        );
        if (command == null) {
            News.warn("WMS入库失败后生成退回入库站命令失败。barcode={},stationId={},backStationId={},warning={}",
                    request == null ? null : request.getBarcode(),
                    currentStationId,
                    backStationId,
                    result == null ? null : result.getMessage());
            return;
        }
        StationCommandDispatchResult dispatchResult = stationCommandDispatcher.dispatch(
                context.getBasDevp().getDevpNo(),
                command,
                "store-in-task-generation",
                "apply-failed-station-back"
        );
        if (!dispatchResult.isAccepted()) {
            News.warn("WMS入库失败后退回入库站命令入队失败。barcode={},stationId={},backStationId={},reason={},warning={}",
                    request == null ? null : request.getBarcode(),
                    currentStationId,
                    backStationId,
                    dispatchResult.getReason(),
                    result == null ? null : result.getMessage());
            return;
        }
        redisUtil.set(lockKey, "lock", APPLY_FAIL_STATION_BACK_LOCK_SECONDS);
        String currentWarning = stationProtocol.getSystemWarning();
        String backWarning = "WMS入库失败,已退回入库站";
        if (!Cools.isEmpty(currentWarning)) {
            backWarning = currentWarning + ";" + backWarning;
        }
        policy.setSystemWarning(context, backWarning);
        News.warn("WMS入库失败,已触发货物退回入库站。barcode={},stationId={},backStationId={},warning={}",
                request == null ? null : request.getBarcode(),
                currentStationId,
                backStationId,
                result == null ? null : result.getMessage());
    }
    private HashMap<String, String> getSystemConfigMap() {
        Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
        if (systemConfigMapObj == null) {
            return null;
        }
        return (HashMap<String, String>) systemConfigMapObj;
    }
    private boolean hasAvailableStationTaskCapacity(HashMap<String, String> systemConfigMap) {
        int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
        int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
        if (currentStationTaskCount > conveyorStationTaskLimit) {
            News.error("输送站点任务已达到上限,上限值:{},站点任务数:{}", conveyorStationTaskLimit, currentStationTaskCount);
            return false;
        }
        return true;
    }
    private synchronized boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) {
        int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
        int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
        if (currentStationTaskCount + inFlightGenerateCount >= conveyorStationTaskLimit) {
            News.error("输送站点任务已达到上限,上限值:{},站点任务数:{},生成中任务数:{}",
                    conveyorStationTaskLimit, currentStationTaskCount, inFlightGenerateCount);
            return false;
        }
        inFlightGenerateCount++;
        return true;
    }
    private synchronized void releaseGenerateCapacity() {
        if (inFlightGenerateCount > 0) {
            inFlightGenerateCount--;
        }
    }
    private int getConveyorStationTaskLimit(HashMap<String, String> systemConfigMap) {
        int conveyorStationTaskLimit = 30;
        String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit");
        if (conveyorStationTaskLimitStr != null) {
            conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr);
        }
        return conveyorStationTaskLimit;
    }
}