| | |
| | | |
| | | @Service |
| | | public class StoreInTaskGenerationService { |
| | | private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5; |
| | | |
| | | @Autowired |
| | | private BasDevpService basDevpService; |
| | |
| | | @Autowired |
| | | private CommonService commonService; |
| | | |
| | | /** |
| | | * 保留当前按站点 lane 并发的能力,同时用一个简单计数避免并发生成把站点任务数顶穿上限。 |
| | | */ |
| | | private int inFlightGenerateCount = 0; |
| | | |
| | | public void generate(StoreInTaskPolicy policy) { |
| | | try { |
| | | if (!policy.isEnabled()) { |
| | | return; |
| | | } |
| | | |
| | | Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key); |
| | | if (systemConfigMapObj == null) { |
| | | HashMap<String, String> systemConfigMap = getSystemConfigMap(); |
| | | if (systemConfigMap == null) { |
| | | return; |
| | | } |
| | | HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj; |
| | | |
| | | int conveyorStationTaskLimit = 30; |
| | | String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit"); |
| | | if (conveyorStationTaskLimitStr != null) { |
| | | conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr); |
| | | } |
| | | int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount(); |
| | | if (currentStationTaskCount > conveyorStationTaskLimit) { |
| | | News.error("输送站点任务已达到上限,上限值:{},站点任务数:{}", conveyorStationTaskLimit, currentStationTaskCount); |
| | | if (!hasAvailableStationTaskCapacity(systemConfigMap)) { |
| | | return; |
| | | } |
| | | |
| | | List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>()); |
| | | for (BasDevp basDevp : basDevps) { |
| | | StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo()); |
| | | if (stationThread == null) { |
| | | continue; |
| | | } |
| | | |
| | | Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap(); |
| | | List<StationObjModel> barcodeStations = policy.getBarcodeStations(basDevp); |
| | | for (StationObjModel stationObjModel : barcodeStations) { |
| | | Integer stationId = stationObjModel.getStationId(); |
| | | if (!stationMap.containsKey(stationId)) { |
| | | continue; |
| | | } |
| | | |
| | | StationProtocol stationProtocol = stationMap.get(stationId); |
| | | if (stationProtocol == null) { |
| | | continue; |
| | | } |
| | | |
| | | StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel, |
| | | stationProtocol); |
| | | if (!policy.matchCandidate(context)) { |
| | | continue; |
| | | } |
| | | if (!policy.beforeApply(context)) { |
| | | continue; |
| | | } |
| | | |
| | | List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>() |
| | | .eq("barcode", stationProtocol.getBarcode())); |
| | | if (!wrkMasts.isEmpty()) { |
| | | continue; |
| | | } |
| | | |
| | | String generateLockKey = policy.getGenerateLockKey(context); |
| | | Object lock = redisUtil.get(generateLockKey); |
| | | if (lock != null) { |
| | | continue; |
| | | } |
| | | |
| | | policy.onRequestPermitGranted(context); |
| | | |
| | | InTaskApplyRequest request = policy.buildApplyRequest(context); |
| | | AsyncInTaskResult result = wmsOperateUtils.queryAsyncInTaskResponse(request); |
| | | if (result != null) { |
| | | handleApplyResult(policy, context, request, result); |
| | | continue; |
| | | } |
| | | |
| | | if (wmsOperateUtils.isAsyncRequestInProgress(request)) { |
| | | continue; |
| | | } |
| | | |
| | | News.info("发起异步WMS入库请求,barcode={},stationId={}", request.getBarcode(), |
| | | request.getSourceStaNo()); |
| | | wmsOperateUtils.applyInTaskAsync(request); |
| | | redisUtil.set(generateLockKey, "lock", policy.getSubmitLockSeconds(context)); |
| | | policy.onApplySubmitted(context); |
| | | generateByStation(policy, basDevp, stationObjModel, systemConfigMap); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | |
| | | } |
| | | } |
| | | |
| | | private void handleApplyResult(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request, |
| | | AsyncInTaskResult result) { |
| | | if (result.isSuccess()) { |
| | | handleApplySuccess(policy, context, request, result); |
| | | return; |
| | | } |
| | | public void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel) { |
| | | try { |
| | | if (!policy.isEnabled()) { |
| | | return; |
| | | } |
| | | |
| | | if (result.isRetryableFailure()) { |
| | | News.error("WMS入库请求失败,重新发起请求,barcode={},stationId={},response={}", |
| | | request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result)); |
| | | wmsOperateUtils.clearAsyncInTaskResponse(request); |
| | | wmsOperateUtils.applyInTaskAsync(request); |
| | | redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context)); |
| | | policy.onApplyFailed(context, result); |
| | | return; |
| | | } |
| | | HashMap<String, String> systemConfigMap = getSystemConfigMap(); |
| | | if (systemConfigMap == null) { |
| | | return; |
| | | } |
| | | if (!hasAvailableStationTaskCapacity(systemConfigMap)) { |
| | | return; |
| | | } |
| | | |
| | | policy.onApplyFailed(context, result); |
| | | generateByStation(policy, basDevp, stationObjModel, systemConfigMap); |
| | | } catch (Exception e) { |
| | | Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId(); |
| | | News.error("生成入库任务异常,policy={},stationId={}", policy.getPolicyName(), stationId, e); |
| | | } |
| | | } |
| | | |
| | | private void handleApplySuccess(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request, |
| | | AsyncInTaskResult result) { |
| | | private void generateByStation(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel, |
| | | HashMap<String, String> systemConfigMap) { |
| | | StoreInTaskContext context = buildContext(basDevp, stationObjModel); |
| | | if (context == null) { |
| | | return; |
| | | } |
| | | if (!policy.matchCandidate(context)) { |
| | | return; |
| | | } |
| | | if (!policy.beforeApply(context)) { |
| | | return; |
| | | } |
| | | |
| | | if (hasCreatedTask(context.getStationProtocol().getBarcode())) { |
| | | return; |
| | | } |
| | | |
| | | if (redisUtil.get(policy.getGenerateLockKey(context)) != null) { |
| | | return; |
| | | } |
| | | |
| | | if (!tryReserveGenerateCapacity(systemConfigMap)) { |
| | | return; |
| | | } |
| | | |
| | | InTaskApplyRequest request = policy.buildApplyRequest(context); |
| | | try { |
| | | JSONObject jsonObject = JSON.parseObject(result.getResponse()); |
| | | policy.onRequestPermitGranted(context); |
| | | policy.setSystemWarning(context, "请求WMS中"); |
| | | News.info("发起同步WMS入库请求,barcode={},stationId={},timeout={}s", |
| | | request.getBarcode(), request.getSourceStaNo(), APPLY_IN_TASK_TIMEOUT_SECONDS); |
| | | String response = wmsOperateUtils.applyInTask(request); |
| | | handleSyncApplyResponse(policy, context, request, response); |
| | | } finally { |
| | | releaseGenerateCapacity(); |
| | | } |
| | | } |
| | | |
| | | private StoreInTaskContext buildContext(BasDevp basDevp, StationObjModel stationObjModel) { |
| | | if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) { |
| | | return null; |
| | | } |
| | | |
| | | StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo()); |
| | | if (stationThread == null) { |
| | | return null; |
| | | } |
| | | |
| | | Integer stationId = stationObjModel.getStationId(); |
| | | Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap(); |
| | | if (stationMap == null || !stationMap.containsKey(stationId)) { |
| | | return null; |
| | | } |
| | | |
| | | StationProtocol stationProtocol = stationMap.get(stationId); |
| | | if (stationProtocol == null) { |
| | | return null; |
| | | } |
| | | |
| | | return new StoreInTaskContext(basDevp, stationThread, stationObjModel, stationProtocol); |
| | | } |
| | | |
| | | private boolean hasCreatedTask(String barcode) { |
| | | List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>().eq("barcode", barcode)); |
| | | return !wrkMasts.isEmpty(); |
| | | } |
| | | |
| | | private void handleSyncApplyResponse(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request, |
| | | String response) { |
| | | if (Cools.isEmpty(response)) { |
| | | markApplyFailed(policy, context, request, null, "FAILED"); |
| | | return; |
| | | } |
| | | |
| | | try { |
| | | JSONObject jsonObject = JSON.parseObject(response); |
| | | if (jsonObject == null || !Integer.valueOf(200).equals(jsonObject.getInteger("code"))) { |
| | | AsyncInTaskResult failResult = new AsyncInTaskResult(); |
| | | failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | failResult.setResponse(result.getResponse()); |
| | | failResult.setMessage("WMS返回非200"); |
| | | handleApplyResult(policy, context, request, failResult); |
| | | markApplyFailed(policy, context, request, response, "WMS返回非200"); |
| | | return; |
| | | } |
| | | |
| | | StartupDto dto = jsonObject.getObject("data", StartupDto.class); |
| | | if (dto == null) { |
| | | AsyncInTaskResult failResult = new AsyncInTaskResult(); |
| | | failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | failResult.setResponse(result.getResponse()); |
| | | failResult.setMessage("WMS返回data为空"); |
| | | handleApplyResult(policy, context, request, failResult); |
| | | markApplyFailed(policy, context, request, response, "WMS返回data为空"); |
| | | return; |
| | | } |
| | | |
| | | CreateInTaskParam taskParam = policy.buildCreateInTaskParam(context, dto); |
| | | WrkMast wrkMast = commonService.createInTask(taskParam); |
| | | policy.afterTaskCreated(context, wrkMast); |
| | | context.getStationProtocol().setSystemWarning(""); |
| | | wmsOperateUtils.clearAsyncInTaskResponse(request); |
| | | policy.clearSystemWarning(context); |
| | | } catch (Exception e) { |
| | | News.error("处理WMS入库成功响应失败,barcode={},stationId={}", request.getBarcode(), |
| | | News.error("处理WMS入库响应异常,barcode={},stationId={}", request.getBarcode(), |
| | | request.getSourceStaNo(), e); |
| | | markApplyFailed(policy, context, request, response, e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | private void markApplyFailed(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request, |
| | | String response, String message) { |
| | | InTaskApplyResult result = new InTaskApplyResult(); |
| | | result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL); |
| | | result.setResponse(response); |
| | | result.setMessage(message); |
| | | |
| | | News.error("WMS入库请求失败,barcode={},stationId={},response={}", |
| | | request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result)); |
| | | redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context)); |
| | | policy.onApplyFailed(context, result); |
| | | } |
| | | |
| | | private HashMap<String, String> getSystemConfigMap() { |
| | | Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key); |
| | | if (systemConfigMapObj == null) { |
| | | return null; |
| | | } |
| | | return (HashMap<String, String>) systemConfigMapObj; |
| | | } |
| | | |
| | | private boolean hasAvailableStationTaskCapacity(HashMap<String, String> systemConfigMap) { |
| | | int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap); |
| | | int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount(); |
| | | if (currentStationTaskCount > conveyorStationTaskLimit) { |
| | | News.error("输送站点任务已达到上限,上限值:{},站点任务数:{}", conveyorStationTaskLimit, currentStationTaskCount); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | private synchronized boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) { |
| | | int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap); |
| | | int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount(); |
| | | if (currentStationTaskCount + inFlightGenerateCount >= conveyorStationTaskLimit) { |
| | | News.error("输送站点任务已达到上限,上限值:{},站点任务数:{},生成中任务数:{}", |
| | | conveyorStationTaskLimit, currentStationTaskCount, inFlightGenerateCount); |
| | | return false; |
| | | } |
| | | inFlightGenerateCount++; |
| | | return true; |
| | | } |
| | | |
| | | private synchronized void releaseGenerateCapacity() { |
| | | if (inFlightGenerateCount > 0) { |
| | | inFlightGenerateCount--; |
| | | } |
| | | } |
| | | |
| | | private int getConveyorStationTaskLimit(HashMap<String, String> systemConfigMap) { |
| | | int conveyorStationTaskLimit = 30; |
| | | String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit"); |
| | | if (conveyorStationTaskLimitStr != null) { |
| | | conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr); |
| | | } |
| | | return conveyorStationTaskLimit; |
| | | } |
| | | |
| | | } |