From 41aeff86351d1dd94fe2408175f96475f227c1b9 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期四, 02 四月 2026 17:15:27 +0800
Subject: [PATCH] #执行优化

---
 src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java |  194 ++++++++++++++++++++++++------------------------
 1 files changed, 96 insertions(+), 98 deletions(-)

diff --git a/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java b/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
index a1a685e..32116ca 100644
--- a/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
+++ b/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
@@ -18,6 +18,8 @@
 import com.zy.core.enums.SlaveType;
 import com.zy.core.model.StationObjModel;
 import com.zy.core.model.protocol.StationProtocol;
+import com.zy.core.task.MainProcessLane;
+import com.zy.core.task.MainProcessTaskSubmitter;
 import com.zy.core.thread.StationThread;
 import com.zy.core.utils.StationOperateProcessUtils;
 import com.zy.core.utils.WmsOperateUtils;
@@ -27,7 +29,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 @Service
 public class StoreInTaskGenerationService {
@@ -45,8 +46,13 @@
     private WmsOperateUtils wmsOperateUtils;
     @Autowired
     private CommonService commonService;
+    @Autowired
+    private MainProcessTaskSubmitter mainProcessTaskSubmitter;
 
-    private final AtomicInteger inFlightGenerateCount = new AtomicInteger(0);
+    /**
+     * 淇濈暀褰撳墠鎸夌珯鐐� lane 骞跺彂鐨勮兘鍔涳紝鍚屾椂鐢ㄤ竴涓畝鍗曡鏁伴伩鍏嶅苟鍙戠敓鎴愭妸绔欑偣浠诲姟鏁伴《绌夸笂闄愩��
+     */
+    private int inFlightGenerateCount = 0;
 
     public void generate(StoreInTaskPolicy policy) {
         try {
@@ -66,7 +72,7 @@
             for (BasDevp basDevp : basDevps) {
                 List<StationObjModel> barcodeStations = policy.getBarcodeStations(basDevp);
                 for (StationObjModel stationObjModel : barcodeStations) {
-                    generate(policy, basDevp, stationObjModel, systemConfigMap);
+                    generateByStation(policy, basDevp, stationObjModel, systemConfigMap);
                 }
             }
         } catch (Exception e) {
@@ -88,37 +94,43 @@
                 return;
             }
 
-            generate(policy, basDevp, stationObjModel, systemConfigMap);
+            generateByStation(policy, basDevp, stationObjModel, systemConfigMap);
         } catch (Exception e) {
             Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
             News.error("鐢熸垚鍏ュ簱浠诲姟寮傚父锛宲olicy={}锛宻tationId={}", policy.getPolicyName(), stationId, e);
         }
     }
 
-    private void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel,
-                          HashMap<String, String> systemConfigMap) {
-        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
+    public void submitGenerateStoreTask(StoreInTaskPolicy policy,
+                                        BasDevp basDevp,
+                                        StationObjModel stationObjModel,
+                                        long minIntervalMs,
+                                        Runnable task) {
+        submitGenerateStoreTask(policy, basDevp, stationObjModel, MainProcessLane.GENERATE_STORE, minIntervalMs, task);
+    }
+
+    public void submitGenerateStoreTask(StoreInTaskPolicy policy,
+                                        BasDevp basDevp,
+                                        StationObjModel stationObjModel,
+                                        MainProcessLane lane,
+                                        long minIntervalMs,
+                                        Runnable task) {
+        Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
+        mainProcessTaskSubmitter.submitKeyedSerialTask(
+                lane,
+                stationId,
+                "generateStoreWrkFile",
+                minIntervalMs,
+                task
+        );
+    }
+
+    private void generateByStation(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel,
+                                   HashMap<String, String> systemConfigMap) {
+        StoreInTaskContext context = buildContext(basDevp, stationObjModel);
+        if (context == null) {
             return;
         }
-
-        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
-        if (stationThread == null) {
-            return;
-        }
-
-        Integer stationId = stationObjModel.getStationId();
-        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
-        if (!stationMap.containsKey(stationId)) {
-            return;
-        }
-
-        StationProtocol stationProtocol = stationMap.get(stationId);
-        if (stationProtocol == null) {
-            return;
-        }
-
-        StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel,
-                stationProtocol);
         if (!policy.matchCandidate(context)) {
             return;
         }
@@ -126,15 +138,13 @@
             return;
         }
 
-        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
-                .eq("barcode", stationProtocol.getBarcode()));
-        if (!wrkMasts.isEmpty()) {
+        long count = wrkMastService.count(new QueryWrapper<WrkMast>().eq("barcode", context.getStationProtocol().getBarcode()));
+        if (count > 0) {
+            policy.setSystemWarning(context, "绯荤粺浠诲姟宸插瓨鍦�");
             return;
         }
 
-        String generateLockKey = policy.getGenerateLockKey(context);
-        Object lock = redisUtil.get(generateLockKey);
-        if (lock != null) {
+        if (redisUtil.get(policy.getGenerateLockKey(context)) != null) {
             return;
         }
 
@@ -142,58 +152,60 @@
             return;
         }
 
+        InTaskApplyRequest request = policy.buildApplyRequest(context);
         try {
             policy.onRequestPermitGranted(context);
-
-            InTaskApplyRequest request = policy.buildApplyRequest(context);
             policy.setSystemWarning(context, "璇锋眰WMS涓�");
             News.info("鍙戣捣鍚屾WMS鍏ュ簱璇锋眰锛宐arcode={}锛宻tationId={}锛宼imeout={}s",
                     request.getBarcode(), request.getSourceStaNo(), APPLY_IN_TASK_TIMEOUT_SECONDS);
-            InTaskApplyResult result = applySyncInTask(request);
-            handleApplyResult(policy, context, request, result);
+            String response = wmsOperateUtils.applyInTask(request);
+            handleSyncApplyResponse(policy, context, request, response);
         } finally {
             releaseGenerateCapacity();
         }
     }
 
-    private void handleApplyResult(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
-                                   InTaskApplyResult result) {
-        if (result.isSuccess()) {
-            handleApplySuccess(policy, context, request, result);
-            return;
+    private StoreInTaskContext buildContext(BasDevp basDevp, StationObjModel stationObjModel) {
+        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
+            return null;
         }
 
-        if (result.isRetryableFailure()) {
-            News.error("WMS鍏ュ簱璇锋眰澶辫触锛宐arcode={}锛宻tationId={}锛宺esponse={}",
-                    request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result));
-            redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context));
-            policy.onApplyFailed(context, result);
-            return;
+        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
+        if (stationThread == null) {
+            return null;
         }
 
-        policy.onApplyFailed(context, result);
+        Integer stationId = stationObjModel.getStationId();
+        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
+        if (stationMap == null || !stationMap.containsKey(stationId)) {
+            return null;
+        }
+
+        StationProtocol stationProtocol = stationMap.get(stationId);
+        if (stationProtocol == null) {
+            return null;
+        }
+
+        return new StoreInTaskContext(basDevp, stationThread, stationObjModel, stationProtocol);
     }
 
-    private void handleApplySuccess(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
-                                    InTaskApplyResult result) {
+    private void handleSyncApplyResponse(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
+                                         String response) {
+        if (Cools.isEmpty(response)) {
+            markApplyFailed(policy, context, request, null, "FAILED");
+            return;
+        }
+
         try {
-            JSONObject jsonObject = JSON.parseObject(result.getResponse());
+            JSONObject jsonObject = JSON.parseObject(response);
             if (jsonObject == null || !Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
-                InTaskApplyResult failResult = new InTaskApplyResult();
-                failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
-                failResult.setResponse(result.getResponse());
-                failResult.setMessage("WMS杩斿洖闈�200");
-                handleApplyResult(policy, context, request, failResult);
+                markApplyFailed(policy, context, request, response, "WMS杩斿洖闈�200");
                 return;
             }
 
             StartupDto dto = jsonObject.getObject("data", StartupDto.class);
             if (dto == null) {
-                InTaskApplyResult failResult = new InTaskApplyResult();
-                failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
-                failResult.setResponse(result.getResponse());
-                failResult.setMessage("WMS杩斿洖data涓虹┖");
-                handleApplyResult(policy, context, request, failResult);
+                markApplyFailed(policy, context, request, response, "WMS杩斿洖data涓虹┖");
                 return;
             }
 
@@ -202,35 +214,23 @@
             policy.afterTaskCreated(context, wrkMast);
             policy.clearSystemWarning(context);
         } catch (Exception e) {
-            News.error("澶勭悊WMS鍏ュ簱鎴愬姛鍝嶅簲澶辫触锛宐arcode={}锛宻tationId={}", request.getBarcode(),
+            News.error("澶勭悊WMS鍏ュ簱鍝嶅簲寮傚父锛宐arcode={}锛宻tationId={}", request.getBarcode(),
                     request.getSourceStaNo(), e);
+            markApplyFailed(policy, context, request, response, e.getMessage());
         }
     }
 
-    private InTaskApplyResult applySyncInTask(InTaskApplyRequest request) {
+    private void markApplyFailed(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
+                                 String response, String message) {
         InTaskApplyResult result = new InTaskApplyResult();
-        result.setBizKey(request.getBizKey());
-
-        String response = wmsOperateUtils.applyInTask(request);
-        result.setResponse(response);
-        if (Cools.isEmpty(response)) {
-            result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
-            result.setMessage("FAILED");
-            return result;
-        }
-
-        try {
-            JSONObject jsonObject = JSON.parseObject(response);
-            if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
-                result.setStatus(InTaskApplyStatus.SUCCESS);
-                return result;
-            }
-        } catch (Exception ignored) {
-        }
-
         result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
-        result.setMessage(response);
-        return result;
+        result.setResponse(response);
+        result.setMessage(message);
+
+        News.error("WMS鍏ュ簱璇锋眰澶辫触锛宐arcode={}锛宻tationId={}锛宺esponse={}",
+                request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result));
+        redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context));
+        policy.onApplyFailed(context, result);
     }
 
     private HashMap<String, String> getSystemConfigMap() {
@@ -251,24 +251,22 @@
         return true;
     }
 
-    private boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) {
+    private synchronized boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) {
         int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
-        while (true) {
-            int reservedCount = inFlightGenerateCount.get();
-            int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
-            if (currentStationTaskCount + reservedCount >= conveyorStationTaskLimit) {
-                News.error("杈撻�佺珯鐐逛换鍔″凡杈惧埌涓婇檺锛屼笂闄愬�硷細{}锛岀珯鐐逛换鍔℃暟锛歿}锛岀敓鎴愪腑浠诲姟鏁帮細{}",
-                        conveyorStationTaskLimit, currentStationTaskCount, reservedCount);
-                return false;
-            }
-            if (inFlightGenerateCount.compareAndSet(reservedCount, reservedCount + 1)) {
-                return true;
-            }
+        int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
+        if (currentStationTaskCount + inFlightGenerateCount >= conveyorStationTaskLimit) {
+            News.error("杈撻�佺珯鐐逛换鍔″凡杈惧埌涓婇檺锛屼笂闄愬�硷細{}锛岀珯鐐逛换鍔℃暟锛歿}锛岀敓鎴愪腑浠诲姟鏁帮細{}",
+                    conveyorStationTaskLimit, currentStationTaskCount, inFlightGenerateCount);
+            return false;
         }
+        inFlightGenerateCount++;
+        return true;
     }
 
-    private void releaseGenerateCapacity() {
-        inFlightGenerateCount.updateAndGet(current -> current > 0 ? current - 1 : 0);
+    private synchronized void releaseGenerateCapacity() {
+        if (inFlightGenerateCount > 0) {
+            inFlightGenerateCount--;
+        }
     }
 
     private int getConveyorStationTaskLimit(HashMap<String, String> systemConfigMap) {

--
Gitblit v1.9.1