From 904bdc4a777cd67f05754890b00bde54f280e750 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 31 三月 2026 23:48:47 +0800
Subject: [PATCH] #入库同步串行请求

---
 src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java |  246 +++++++++++++++++++++++++++++++++---------------
 1 files changed, 169 insertions(+), 77 deletions(-)

diff --git a/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java b/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
index 972621c..9997796 100644
--- a/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
+++ b/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
@@ -27,9 +27,11 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @Service
 public class StoreInTaskGenerationService {
+    private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5;
 
     @Autowired
     private BasDevpService basDevpService;
@@ -44,88 +46,27 @@
     @Autowired
     private CommonService commonService;
 
+    private final AtomicInteger inFlightGenerateCount = new AtomicInteger(0);
+
     public void generate(StoreInTaskPolicy policy) {
         try {
             if (!policy.isEnabled()) {
                 return;
             }
 
-            Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
-            if (systemConfigMapObj == null) {
+            HashMap<String, String> systemConfigMap = getSystemConfigMap();
+            if (systemConfigMap == null) {
                 return;
             }
-            HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
-
-            int conveyorStationTaskLimit = 30;
-            String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit");
-            if (conveyorStationTaskLimitStr != null) {
-                conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr);
-            }
-            int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
-            if (currentStationTaskCount > conveyorStationTaskLimit) {
-                News.error("杈撻�佺珯鐐逛换鍔″凡杈惧埌涓婇檺锛屼笂闄愬�硷細{}锛岀珯鐐逛换鍔℃暟锛歿}", conveyorStationTaskLimit, currentStationTaskCount);
+            if (!hasAvailableStationTaskCapacity(systemConfigMap)) {
                 return;
             }
 
             List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
             for (BasDevp basDevp : basDevps) {
-                StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
-                if (stationThread == null) {
-                    continue;
-                }
-
-                Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
                 List<StationObjModel> barcodeStations = policy.getBarcodeStations(basDevp);
                 for (StationObjModel stationObjModel : barcodeStations) {
-                    Integer stationId = stationObjModel.getStationId();
-                    if (!stationMap.containsKey(stationId)) {
-                        continue;
-                    }
-
-                    StationProtocol stationProtocol = stationMap.get(stationId);
-                    if (stationProtocol == null) {
-                        continue;
-                    }
-
-                    StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel,
-                            stationProtocol);
-                    if (!policy.matchCandidate(context)) {
-                        continue;
-                    }
-                    if (!policy.beforeApply(context)) {
-                        continue;
-                    }
-
-                    List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
-                            .eq("barcode", stationProtocol.getBarcode()));
-                    if (!wrkMasts.isEmpty()) {
-                        continue;
-                    }
-
-                    String generateLockKey = policy.getGenerateLockKey(context);
-                    Object lock = redisUtil.get(generateLockKey);
-                    if (lock != null) {
-                        continue;
-                    }
-
-                    policy.onRequestPermitGranted(context);
-
-                    InTaskApplyRequest request = policy.buildApplyRequest(context);
-                    AsyncInTaskResult result = wmsOperateUtils.queryAsyncInTaskResponse(request);
-                    if (result != null) {
-                        handleApplyResult(policy, context, request, result);
-                        continue;
-                    }
-
-                    if (wmsOperateUtils.isAsyncRequestInProgress(request)) {
-                        continue;
-                    }
-
-                    News.info("鍙戣捣寮傛WMS鍏ュ簱璇锋眰锛宐arcode={}锛宻tationId={}", request.getBarcode(),
-                            request.getSourceStaNo());
-                    wmsOperateUtils.applyInTaskAsync(request);
-                    redisUtil.set(generateLockKey, "lock", policy.getSubmitLockSeconds(context));
-//                    policy.onApplySubmitted(context);
+                    generate(policy, basDevp, stationObjModel, systemConfigMap);
                 }
             }
         } catch (Exception e) {
@@ -133,18 +74,97 @@
         }
     }
 
+    public void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel) {
+        try {
+            if (!policy.isEnabled()) {
+                return;
+            }
+
+            HashMap<String, String> systemConfigMap = getSystemConfigMap();
+            if (systemConfigMap == null) {
+                return;
+            }
+            if (!hasAvailableStationTaskCapacity(systemConfigMap)) {
+                return;
+            }
+
+            generate(policy, basDevp, stationObjModel, systemConfigMap);
+        } catch (Exception e) {
+            Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
+            News.error("鐢熸垚鍏ュ簱浠诲姟寮傚父锛宲olicy={}锛宻tationId={}", policy.getPolicyName(), stationId, e);
+        }
+    }
+
+    private void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel,
+                          HashMap<String, String> systemConfigMap) {
+        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
+            return;
+        }
+
+        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
+        if (stationThread == null) {
+            return;
+        }
+
+        Integer stationId = stationObjModel.getStationId();
+        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
+        if (!stationMap.containsKey(stationId)) {
+            return;
+        }
+
+        StationProtocol stationProtocol = stationMap.get(stationId);
+        if (stationProtocol == null) {
+            return;
+        }
+
+        StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel,
+                stationProtocol);
+        if (!policy.matchCandidate(context)) {
+            return;
+        }
+        if (!policy.beforeApply(context)) {
+            return;
+        }
+
+        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
+                .eq("barcode", stationProtocol.getBarcode()));
+        if (!wrkMasts.isEmpty()) {
+            return;
+        }
+
+        String generateLockKey = policy.getGenerateLockKey(context);
+        Object lock = redisUtil.get(generateLockKey);
+        if (lock != null) {
+            return;
+        }
+
+        if (!tryReserveGenerateCapacity(systemConfigMap)) {
+            return;
+        }
+
+        try {
+            policy.onRequestPermitGranted(context);
+
+            InTaskApplyRequest request = policy.buildApplyRequest(context);
+            News.info("鍙戣捣鍚屾WMS鍏ュ簱璇锋眰锛宐arcode={}锛宻tationId={}锛宼imeout={}s",
+                    request.getBarcode(), request.getSourceStaNo(), APPLY_IN_TASK_TIMEOUT_SECONDS);
+            InTaskApplyResult result = applySyncInTask(request);
+            handleApplyResult(policy, context, request, result);
+        } finally {
+            releaseGenerateCapacity();
+        }
+    }
+
     private void handleApplyResult(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
-                                   AsyncInTaskResult result) {
+                                   InTaskApplyResult result) {
         if (result.isSuccess()) {
             handleApplySuccess(policy, context, request, result);
             return;
         }
 
         if (result.isRetryableFailure()) {
-            News.error("WMS鍏ュ簱璇锋眰澶辫触锛岄噸鏂板彂璧疯姹傦紝barcode={}锛宻tationId={}锛宺esponse={}",
+            News.error("WMS鍏ュ簱璇锋眰澶辫触锛宐arcode={}锛宻tationId={}锛宺esponse={}",
                     request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result));
-            wmsOperateUtils.clearAsyncInTaskResponse(request);
-            wmsOperateUtils.applyInTaskAsync(request);
             redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context));
             policy.onApplyFailed(context, result);
             return;
@@ -154,12 +174,12 @@
     }
 
     private void handleApplySuccess(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
-                                    AsyncInTaskResult result) {
+                                    InTaskApplyResult result) {
         try {
             JSONObject jsonObject = JSON.parseObject(result.getResponse());
             if (jsonObject == null || !Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
-                AsyncInTaskResult failResult = new AsyncInTaskResult();
-                failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
+                InTaskApplyResult failResult = new InTaskApplyResult();
+                failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
                 failResult.setResponse(result.getResponse());
                 failResult.setMessage("WMS杩斿洖闈�200");
                 handleApplyResult(policy, context, request, failResult);
@@ -168,8 +188,8 @@
 
             StartupDto dto = jsonObject.getObject("data", StartupDto.class);
             if (dto == null) {
-                AsyncInTaskResult failResult = new AsyncInTaskResult();
-                failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
+                InTaskApplyResult failResult = new InTaskApplyResult();
+                failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
                 failResult.setResponse(result.getResponse());
                 failResult.setMessage("WMS杩斿洖data涓虹┖");
                 handleApplyResult(policy, context, request, failResult);
@@ -180,11 +200,83 @@
             WrkMast wrkMast = commonService.createInTask(taskParam);
             policy.afterTaskCreated(context, wrkMast);
             context.getStationProtocol().setSystemWarning("");
-            wmsOperateUtils.clearAsyncInTaskResponse(request);
         } catch (Exception e) {
             News.error("澶勭悊WMS鍏ュ簱鎴愬姛鍝嶅簲澶辫触锛宐arcode={}锛宻tationId={}", request.getBarcode(),
                     request.getSourceStaNo(), e);
         }
     }
 
+    private InTaskApplyResult applySyncInTask(InTaskApplyRequest request) {
+        InTaskApplyResult result = new InTaskApplyResult();
+        result.setBizKey(request.getBizKey());
+
+        String response = wmsOperateUtils.applyInTask(request);
+        result.setResponse(response);
+        if (Cools.isEmpty(response)) {
+            result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
+            result.setMessage("FAILED");
+            return result;
+        }
+
+        try {
+            JSONObject jsonObject = JSON.parseObject(response);
+            if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
+                result.setStatus(InTaskApplyStatus.SUCCESS);
+                return result;
+            }
+        } catch (Exception ignored) {
+        }
+
+        result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
+        result.setMessage(response);
+        return result;
+    }
+
+    private HashMap<String, String> getSystemConfigMap() {
+        Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
+        if (systemConfigMapObj == null) {
+            return null;
+        }
+        return (HashMap<String, String>) systemConfigMapObj;
+    }
+
+    private boolean hasAvailableStationTaskCapacity(HashMap<String, String> systemConfigMap) {
+        int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
+        int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
+        if (currentStationTaskCount > conveyorStationTaskLimit) {
+            News.error("杈撻�佺珯鐐逛换鍔″凡杈惧埌涓婇檺锛屼笂闄愬�硷細{}锛岀珯鐐逛换鍔℃暟锛歿}", conveyorStationTaskLimit, currentStationTaskCount);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) {
+        int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
+        while (true) {
+            int reservedCount = inFlightGenerateCount.get();
+            int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
+            if (currentStationTaskCount + reservedCount >= conveyorStationTaskLimit) {
+                News.error("杈撻�佺珯鐐逛换鍔″凡杈惧埌涓婇檺锛屼笂闄愬�硷細{}锛岀珯鐐逛换鍔℃暟锛歿}锛岀敓鎴愪腑浠诲姟鏁帮細{}",
+                        conveyorStationTaskLimit, currentStationTaskCount, reservedCount);
+                return false;
+            }
+            if (inFlightGenerateCount.compareAndSet(reservedCount, reservedCount + 1)) {
+                return true;
+            }
+        }
+    }
+
+    private void releaseGenerateCapacity() {
+        inFlightGenerateCount.updateAndGet(current -> current > 0 ? current - 1 : 0);
+    }
+
+    private int getConveyorStationTaskLimit(HashMap<String, String> systemConfigMap) {
+        int conveyorStationTaskLimit = 30;
+        String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit");
+        if (conveyorStationTaskLimitStr != null) {
+            conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr);
+        }
+        return conveyorStationTaskLimit;
+    }
+
 }

--
Gitblit v1.9.1