| | |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | @Service |
| | | public class StoreInTaskGenerationService { |
| | | private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5; |
| | | |
| | | @Autowired |
| | | private BasDevpService basDevpService; |
| | |
| | | @Autowired |
| | | private CommonService commonService; |
| | | |
| | | private final AtomicInteger inFlightGenerateCount = new AtomicInteger(0); |
| | | |
| | | public void generate(StoreInTaskPolicy policy) { |
| | | try { |
| | | if (!policy.isEnabled()) { |
| | | return; |
| | | } |
| | | |
| | | Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key); |
| | | if (systemConfigMapObj == null) { |
| | | HashMap<String, String> systemConfigMap = getSystemConfigMap(); |
| | | if (systemConfigMap == null) { |
| | | return; |
| | | } |
| | | HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj; |
| | | |
| | | int conveyorStationTaskLimit = 30; |
| | | String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit"); |
| | | if (conveyorStationTaskLimitStr != null) { |
| | | conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr); |
| | | } |
| | | int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount(); |
| | | if (currentStationTaskCount > conveyorStationTaskLimit) { |
| | | News.error("输送站点任务已达到上限,上限值:{},站点任务数:{}", conveyorStationTaskLimit, currentStationTaskCount); |
| | | if (!hasAvailableStationTaskCapacity(systemConfigMap)) { |
| | | return; |
| | | } |
| | | |
| | | List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>()); |
| | | for (BasDevp basDevp : basDevps) { |
| | | StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo()); |
| | | if (stationThread == null) { |
| | | continue; |
| | | } |
| | | |
| | | Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap(); |
| | | List<StationObjModel> barcodeStations = policy.getBarcodeStations(basDevp); |
| | | for (StationObjModel stationObjModel : barcodeStations) { |
| | | Integer stationId = stationObjModel.getStationId(); |
| | | if (!stationMap.containsKey(stationId)) { |
| | | continue; |
| | | } |
| | | |
| | | StationProtocol stationProtocol = stationMap.get(stationId); |
| | | if (stationProtocol == null) { |
| | | continue; |
| | | } |
| | | |
| | | StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel, |
| | | stationProtocol); |
| | | if (!policy.matchCandidate(context)) { |
| | | continue; |
| | | } |
| | | if (!policy.beforeApply(context)) { |
| | | continue; |
| | | } |
| | | |
| | | List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>() |
| | | .eq("barcode", stationProtocol.getBarcode())); |
| | | if (!wrkMasts.isEmpty()) { |
| | | continue; |
| | | } |
| | | |
| | | String generateLockKey = policy.getGenerateLockKey(context); |
| | | Object lock = redisUtil.get(generateLockKey); |
| | | if (lock != null) { |
| | | continue; |
| | | } |
| | | |
| | | policy.onRequestPermitGranted(context); |
| | | |
| | | InTaskApplyRequest request = policy.buildApplyRequest(context); |
| | | AsyncInTaskResult result = wmsOperateUtils.queryAsyncInTaskResponse(request); |
| | | if (result != null) { |
| | | handleApplyResult(policy, context, request, result); |
| | | continue; |
| | | } |
| | | |
| | | if (wmsOperateUtils.isAsyncRequestInProgress(request)) { |
| | | continue; |
| | | } |
| | | |
| | | News.info("发起异步WMS入库请求,barcode={},stationId={}", request.getBarcode(), |
| | | request.getSourceStaNo()); |
| | | wmsOperateUtils.applyInTaskAsync(request); |
| | | redisUtil.set(generateLockKey, "lock", policy.getSubmitLockSeconds(context)); |
| | | // policy.onApplySubmitted(context); |
| | | generate(policy, basDevp, stationObjModel, systemConfigMap); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | |
| | | } |
| | | } |
| | | |
| | | public void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel) { |
| | | try { |
| | | if (!policy.isEnabled()) { |
| | | return; |
| | | } |
| | | |
| | | HashMap<String, String> systemConfigMap = getSystemConfigMap(); |
| | | if (systemConfigMap == null) { |
| | | return; |
| | | } |
| | | if (!hasAvailableStationTaskCapacity(systemConfigMap)) { |
| | | return; |
| | | } |
| | | |
| | | generate(policy, basDevp, stationObjModel, systemConfigMap); |
| | | } catch (Exception e) { |
| | | Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId(); |
| | | News.error("生成入库任务异常,policy={},stationId={}", policy.getPolicyName(), stationId, e); |
| | | } |
| | | } |
| | | |
| | | private void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel, |
| | | HashMap<String, String> systemConfigMap) { |
| | | if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) { |
| | | return; |
| | | } |
| | | |
| | | StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo()); |
| | | if (stationThread == null) { |
| | | return; |
| | | } |
| | | |
| | | Integer stationId = stationObjModel.getStationId(); |
| | | Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap(); |
| | | if (!stationMap.containsKey(stationId)) { |
| | | return; |
| | | } |
| | | |
| | | StationProtocol stationProtocol = stationMap.get(stationId); |
| | | if (stationProtocol == null) { |
| | | return; |
| | | } |
| | | |
| | | StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel, |
| | | stationProtocol); |
| | | if (!policy.matchCandidate(context)) { |
| | | return; |
| | | } |
| | | if (!policy.beforeApply(context)) { |
| | | return; |
| | | } |
| | | |
| | | List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>() |
| | | .eq("barcode", stationProtocol.getBarcode())); |
| | | if (!wrkMasts.isEmpty()) { |
| | | return; |
| | | } |
| | | |
| | | String generateLockKey = policy.getGenerateLockKey(context); |
| | | Object lock = redisUtil.get(generateLockKey); |
| | | if (lock != null) { |
| | | return; |
| | | } |
| | | |
| | | if (!tryReserveGenerateCapacity(systemConfigMap)) { |
| | | return; |
| | | } |
| | | |
| | | try { |
| | | policy.onRequestPermitGranted(context); |
| | | |
| | | InTaskApplyRequest request = policy.buildApplyRequest(context); |
| | | News.info("发起同步WMS入库请求,barcode={},stationId={},timeout={}s", |
| | | request.getBarcode(), request.getSourceStaNo(), APPLY_IN_TASK_TIMEOUT_SECONDS); |
| | | InTaskApplyResult result = applySyncInTask(request); |
| | | handleApplyResult(policy, context, request, result); |
| | | } finally { |
| | | releaseGenerateCapacity(); |
| | | } |
| | | } |
| | | |
| | | private void handleApplyResult(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request, |
| | | AsyncInTaskResult result) { |
| | | InTaskApplyResult result) { |
| | | if (result.isSuccess()) { |
| | | handleApplySuccess(policy, context, request, result); |
| | | return; |
| | | } |
| | | |
| | | if (result.isRetryableFailure()) { |
| | | News.error("WMS入库请求失败,重新发起请求,barcode={},stationId={},response={}", |
| | | News.error("WMS入库请求失败,barcode={},stationId={},response={}", |
| | | request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result)); |
| | | wmsOperateUtils.clearAsyncInTaskResponse(request); |
| | | wmsOperateUtils.applyInTaskAsync(request); |
| | | redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context)); |
| | | policy.onApplyFailed(context, result); |
| | | return; |
| | |
| | | } |
| | | |
| | | private void handleApplySuccess(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request, |
| | | AsyncInTaskResult result) { |
| | | InTaskApplyResult result) { |
| | | try { |
| | | JSONObject jsonObject = JSON.parseObject(result.getResponse()); |
| | | if (jsonObject == null || !Integer.valueOf(200).equals(jsonObject.getInteger("code"))) { |
| | | AsyncInTaskResult failResult = new AsyncInTaskResult(); |
| | | failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | InTaskApplyResult failResult = new InTaskApplyResult(); |
| | | failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL); |
| | | failResult.setResponse(result.getResponse()); |
| | | failResult.setMessage("WMS返回非200"); |
| | | handleApplyResult(policy, context, request, failResult); |
| | |
| | | |
| | | StartupDto dto = jsonObject.getObject("data", StartupDto.class); |
| | | if (dto == null) { |
| | | AsyncInTaskResult failResult = new AsyncInTaskResult(); |
| | | failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | InTaskApplyResult failResult = new InTaskApplyResult(); |
| | | failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL); |
| | | failResult.setResponse(result.getResponse()); |
| | | failResult.setMessage("WMS返回data为空"); |
| | | handleApplyResult(policy, context, request, failResult); |
| | |
| | | WrkMast wrkMast = commonService.createInTask(taskParam); |
| | | policy.afterTaskCreated(context, wrkMast); |
| | | context.getStationProtocol().setSystemWarning(""); |
| | | wmsOperateUtils.clearAsyncInTaskResponse(request); |
| | | } catch (Exception e) { |
| | | News.error("处理WMS入库成功响应失败,barcode={},stationId={}", request.getBarcode(), |
| | | request.getSourceStaNo(), e); |
| | | } |
| | | } |
| | | |
| | | private InTaskApplyResult applySyncInTask(InTaskApplyRequest request) { |
| | | InTaskApplyResult result = new InTaskApplyResult(); |
| | | result.setBizKey(request.getBizKey()); |
| | | |
| | | String response = wmsOperateUtils.applyInTask(request); |
| | | result.setResponse(response); |
| | | if (Cools.isEmpty(response)) { |
| | | result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL); |
| | | result.setMessage("FAILED"); |
| | | return result; |
| | | } |
| | | |
| | | try { |
| | | JSONObject jsonObject = JSON.parseObject(response); |
| | | if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) { |
| | | result.setStatus(InTaskApplyStatus.SUCCESS); |
| | | return result; |
| | | } |
| | | } catch (Exception ignored) { |
| | | } |
| | | |
| | | result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL); |
| | | result.setMessage(response); |
| | | return result; |
| | | } |
| | | |
| | | private HashMap<String, String> getSystemConfigMap() { |
| | | Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key); |
| | | if (systemConfigMapObj == null) { |
| | | return null; |
| | | } |
| | | return (HashMap<String, String>) systemConfigMapObj; |
| | | } |
| | | |
| | | private boolean hasAvailableStationTaskCapacity(HashMap<String, String> systemConfigMap) { |
| | | int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap); |
| | | int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount(); |
| | | if (currentStationTaskCount > conveyorStationTaskLimit) { |
| | | News.error("输送站点任务已达到上限,上限值:{},站点任务数:{}", conveyorStationTaskLimit, currentStationTaskCount); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | private boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) { |
| | | int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap); |
| | | while (true) { |
| | | int reservedCount = inFlightGenerateCount.get(); |
| | | int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount(); |
| | | if (currentStationTaskCount + reservedCount >= conveyorStationTaskLimit) { |
| | | News.error("输送站点任务已达到上限,上限值:{},站点任务数:{},生成中任务数:{}", |
| | | conveyorStationTaskLimit, currentStationTaskCount, reservedCount); |
| | | return false; |
| | | } |
| | | if (inFlightGenerateCount.compareAndSet(reservedCount, reservedCount + 1)) { |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void releaseGenerateCapacity() { |
| | | inFlightGenerateCount.updateAndGet(current -> current > 0 ? current - 1 : 0); |
| | | } |
| | | |
| | | private int getConveyorStationTaskLimit(HashMap<String, String> systemConfigMap) { |
| | | int conveyorStationTaskLimit = 30; |
| | | String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit"); |
| | | if (conveyorStationTaskLimitStr != null) { |
| | | conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr); |
| | | } |
| | | return conveyorStationTaskLimit; |
| | | } |
| | | |
| | | } |