package com.zy.core.plugin.store; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.core.common.Cools; import com.zy.asrs.domain.param.CreateInTaskParam; import com.zy.asrs.entity.BasDevp; import com.zy.asrs.entity.WrkMast; import com.zy.asrs.service.BasDevpService; import com.zy.asrs.service.WrkMastService; import com.zy.common.model.StartupDto; import com.zy.common.service.CommonService; import com.zy.common.utils.RedisUtil; import com.zy.core.News; import com.zy.core.cache.SlaveConnection; import com.zy.core.enums.RedisKeyType; import com.zy.core.enums.SlaveType; import com.zy.core.model.StationObjModel; import com.zy.core.model.protocol.StationProtocol; import com.zy.core.thread.StationThread; import com.zy.core.utils.StationOperateProcessUtils; import com.zy.core.utils.WmsOperateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.Map; @Service public class StoreInTaskGenerationService { private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5; @Autowired private BasDevpService basDevpService; @Autowired private WrkMastService wrkMastService; @Autowired private StationOperateProcessUtils stationOperateProcessUtils; @Autowired private RedisUtil redisUtil; @Autowired private WmsOperateUtils wmsOperateUtils; @Autowired private CommonService commonService; /** * 保留当前按站点 lane 并发的能力,同时用一个简单计数避免并发生成把站点任务数顶穿上限。 */ private int inFlightGenerateCount = 0; public void generate(StoreInTaskPolicy policy) { try { if (!policy.isEnabled()) { return; } HashMap systemConfigMap = getSystemConfigMap(); if (systemConfigMap == null) { return; } if (!hasAvailableStationTaskCapacity(systemConfigMap)) { return; } List basDevps = basDevpService.list(new QueryWrapper<>()); for (BasDevp basDevp : basDevps) { List barcodeStations = policy.getBarcodeStations(basDevp); for (StationObjModel stationObjModel : barcodeStations) { generateByStation(policy, basDevp, stationObjModel, systemConfigMap); } } } catch (Exception e) { News.error("生成入库任务异常,policy={}", policy.getPolicyName(), e); } } public void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel) { try { if (!policy.isEnabled()) { return; } HashMap systemConfigMap = getSystemConfigMap(); if (systemConfigMap == null) { return; } if (!hasAvailableStationTaskCapacity(systemConfigMap)) { return; } generateByStation(policy, basDevp, stationObjModel, systemConfigMap); } catch (Exception e) { Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId(); News.error("生成入库任务异常,policy={},stationId={}", policy.getPolicyName(), stationId, e); } } private void generateByStation(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel, HashMap systemConfigMap) { StoreInTaskContext context = buildContext(basDevp, stationObjModel); if (context == null) { return; } if (!policy.matchCandidate(context)) { return; } if (!policy.beforeApply(context)) { return; } if (hasCreatedTask(context.getStationProtocol().getBarcode())) { return; } if (redisUtil.get(policy.getGenerateLockKey(context)) != null) { return; } if (!tryReserveGenerateCapacity(systemConfigMap)) { return; } InTaskApplyRequest request = policy.buildApplyRequest(context); try { policy.onRequestPermitGranted(context); policy.setSystemWarning(context, "请求WMS中"); News.info("发起同步WMS入库请求,barcode={},stationId={},timeout={}s", request.getBarcode(), request.getSourceStaNo(), APPLY_IN_TASK_TIMEOUT_SECONDS); String response = wmsOperateUtils.applyInTask(request); handleSyncApplyResponse(policy, context, request, response); } finally { releaseGenerateCapacity(); } } private StoreInTaskContext buildContext(BasDevp basDevp, StationObjModel stationObjModel) { if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) { return null; } StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo()); if (stationThread == null) { return null; } Integer stationId = stationObjModel.getStationId(); Map stationMap = stationThread.getStatusMap(); if (stationMap == null || !stationMap.containsKey(stationId)) { return null; } StationProtocol stationProtocol = stationMap.get(stationId); if (stationProtocol == null) { return null; } return new StoreInTaskContext(basDevp, stationThread, stationObjModel, stationProtocol); } private boolean hasCreatedTask(String barcode) { List wrkMasts = wrkMastService.list(new QueryWrapper().eq("barcode", barcode)); return !wrkMasts.isEmpty(); } private void handleSyncApplyResponse(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request, String response) { if (Cools.isEmpty(response)) { markApplyFailed(policy, context, request, null, "FAILED"); return; } try { JSONObject jsonObject = JSON.parseObject(response); if (jsonObject == null || !Integer.valueOf(200).equals(jsonObject.getInteger("code"))) { markApplyFailed(policy, context, request, response, "WMS返回非200"); return; } StartupDto dto = jsonObject.getObject("data", StartupDto.class); if (dto == null) { markApplyFailed(policy, context, request, response, "WMS返回data为空"); return; } CreateInTaskParam taskParam = policy.buildCreateInTaskParam(context, dto); WrkMast wrkMast = commonService.createInTask(taskParam); policy.afterTaskCreated(context, wrkMast); policy.clearSystemWarning(context); } catch (Exception e) { News.error("处理WMS入库响应异常,barcode={},stationId={}", request.getBarcode(), request.getSourceStaNo(), e); markApplyFailed(policy, context, request, response, e.getMessage()); } } private void markApplyFailed(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request, String response, String message) { InTaskApplyResult result = new InTaskApplyResult(); result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL); result.setResponse(response); result.setMessage(message); News.error("WMS入库请求失败,barcode={},stationId={},response={}", request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result)); redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context)); policy.onApplyFailed(context, result); } private HashMap getSystemConfigMap() { Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key); if (systemConfigMapObj == null) { return null; } return (HashMap) systemConfigMapObj; } private boolean hasAvailableStationTaskCapacity(HashMap systemConfigMap) { int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap); int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount(); if (currentStationTaskCount > conveyorStationTaskLimit) { News.error("输送站点任务已达到上限,上限值:{},站点任务数:{}", conveyorStationTaskLimit, currentStationTaskCount); return false; } return true; } private synchronized boolean tryReserveGenerateCapacity(HashMap systemConfigMap) { int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap); int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount(); if (currentStationTaskCount + inFlightGenerateCount >= conveyorStationTaskLimit) { News.error("输送站点任务已达到上限,上限值:{},站点任务数:{},生成中任务数:{}", conveyorStationTaskLimit, currentStationTaskCount, inFlightGenerateCount); return false; } inFlightGenerateCount++; return true; } private synchronized void releaseGenerateCapacity() { if (inFlightGenerateCount > 0) { inFlightGenerateCount--; } } private int getConveyorStationTaskLimit(HashMap systemConfigMap) { int conveyorStationTaskLimit = 30; String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit"); if (conveyorStationTaskLimitStr != null) { conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr); } return conveyorStationTaskLimit; } }