From 14cc8925be94a6c07e8e48278afc8f2d4aa284f1 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期四, 19 三月 2026 20:45:42 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/core/utils/WmsOperateUtils.java |  142 ++++++++++++++++++++++++++++++++++-------------
 1 files changed, 103 insertions(+), 39 deletions(-)

diff --git a/src/main/java/com/zy/core/utils/WmsOperateUtils.java b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
index e08a79b..a99d80a 100644
--- a/src/main/java/com/zy/core/utils/WmsOperateUtils.java
+++ b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -2,7 +2,7 @@
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.mapper.EntityWrapper;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.core.common.Cools;
 import com.core.exception.CoolException;
 import com.zy.asrs.entity.BasCrnp;
@@ -32,10 +32,36 @@
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @Component
 public class WmsOperateUtils {
+
+    private static final int APPLY_IN_TASK_ASYNC_THREADS = Math.max(2,
+            Math.min(4, Runtime.getRuntime().availableProcessors()));
+    private static final int APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY = 200;
+    private static final int APPLY_IN_TASK_REQUEST_TTL_SECONDS = 10 * 60;
+    private static final int APPLY_IN_TASK_RESPONSE_TTL_SECONDS = 60;
+    private static final AtomicInteger APPLY_IN_TASK_THREAD_NO = new AtomicInteger(1);
+    private static final ThreadPoolExecutor APPLY_IN_TASK_EXECUTOR = new ThreadPoolExecutor(
+            APPLY_IN_TASK_ASYNC_THREADS,
+            APPLY_IN_TASK_ASYNC_THREADS,
+            60L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY),
+            runnable -> {
+                Thread thread = new Thread(runnable);
+                thread.setName("WmsApplyInTask-" + APPLY_IN_TASK_THREAD_NO.getAndIncrement());
+                thread.setDaemon(true);
+                return thread;
+            }
+    );
 
     @Autowired
     private ConfigService configService;
@@ -54,8 +80,21 @@
     @Autowired
     private RedisUtil redisUtil;
 
+    private final ConcurrentMap<String, Boolean> asyncInTaskInflight = new ConcurrentHashMap<>();
+
+    private String buildAsyncInTaskKey(String prefix, String barcode, Integer stationId, Integer taskNo) {
+        StringBuilder keyBuilder = new StringBuilder(prefix)
+                .append(barcode)
+                .append("_")
+                .append(stationId);
+        if (taskNo != null && taskNo > 0) {
+            keyBuilder.append("_").append(taskNo);
+        }
+        return keyBuilder.toString();
+    }
+
     // 鐢宠鍏ュ簱浠诲姟
-    public synchronized String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
+    public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
         Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
         if (systemConfigMapObj == null) {
             News.error("绯荤粺Config缂撳瓨澶辨晥");
@@ -80,7 +119,7 @@
         int result = 0;
         try {
             BasStation basStation = basStationService
-                    .selectOne(new EntityWrapper<BasStation>().eq("station_id", sourceStaNo));
+                    .getOne(new QueryWrapper<BasStation>().eq("station_id", sourceStaNo));
             if (basStation == null) {
                 News.error("绔欑偣{}涓嶅瓨鍦�", sourceStaNo);
                 return null;
@@ -127,7 +166,7 @@
             httpRequestLog.setResponse(response);
             httpRequestLog.setCreateTime(new Date());
             httpRequestLog.setResult(result);
-            httpRequestLogService.insert(httpRequestLog);
+            httpRequestLogService.save(httpRequestLog);
         }
         return response;
     }
@@ -141,39 +180,56 @@
      * @param locType1    鎵樼洏楂樺害
      */
     public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer locType1) {
-        String requestKey = RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key + barcode + "_" + sourceStaNo;
-        String responseKey = RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key + barcode + "_" + sourceStaNo;
+        applyInTaskAsync(barcode, sourceStaNo, null, locType1);
+    }
+
+    public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer taskNo, Integer locType1) {
+        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, barcode, sourceStaNo, taskNo);
+        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, barcode, sourceStaNo, taskNo);
+
+        if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) {
+            return;
+        }
 
         // 妫�鏌ユ槸鍚﹀凡鏈夎姹傚湪杩涜涓�
         Object existingRequest = redisUtil.get(requestKey);
         if (existingRequest != null) {
+            asyncInTaskInflight.remove(requestKey);
             return; // 宸叉湁璇锋眰鍦ㄨ繘琛屼腑锛岃烦杩�
         }
 
-        // 鏍囪璇锋眰杩涜涓紝璁剧疆60绉掕秴鏃�
-        redisUtil.set(requestKey, "processing", 60);
+        // 鏍囪璇锋眰杩涜涓紝閬垮厤璇锋眰鍦ㄧ嚎绋嬫睜鎺掗槦鏃惰閲嶅鎻愪氦
+        redisUtil.set(requestKey, "processing", APPLY_IN_TASK_REQUEST_TTL_SECONDS);
 
-        // 鎻愪氦寮傛浠诲姟
-        new Thread(() -> {
-            try {
-                String response = applyInTask(barcode, sourceStaNo, locType1);
-                if (response != null) {
-                    // 瀛樺偍鍝嶅簲缁撴灉锛岃缃�60绉掕秴鏃�
-                    redisUtil.set(responseKey, response, 60);
-                    News.info("寮傛WMS鍏ュ簱璇锋眰瀹屾垚锛宐arcode={}锛宻tationId={}锛宺esponse={}", barcode, sourceStaNo, response);
-                } else {
-                    // 璇锋眰澶辫触锛屽瓨鍌ㄥけ璐ユ爣璁�
-                    redisUtil.set(responseKey, "FAILED", 10);
-                    News.error("寮傛WMS鍏ュ簱璇锋眰澶辫触锛宐arcode={}锛宻tationId={}", barcode, sourceStaNo);
+        try {
+            APPLY_IN_TASK_EXECUTOR.execute(() -> {
+                try {
+                    String response = applyInTask(barcode, sourceStaNo, locType1);
+                    if (response != null) {
+                        redisUtil.set(responseKey, response, APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
+                        News.info("寮傛WMS鍏ュ簱璇锋眰瀹屾垚锛宐arcode={}锛宻tationId={}锛宼askNo={}锛宺esponse={}",
+                                barcode, sourceStaNo, taskNo, response);
+                    } else {
+                        redisUtil.set(responseKey, "FAILED", 10);
+                        News.error("寮傛WMS鍏ュ簱璇锋眰澶辫触锛宐arcode={}锛宻tationId={}锛宼askNo={}",
+                                barcode, sourceStaNo, taskNo);
+                    }
+                } catch (Exception e) {
+                    News.error("寮傛WMS鍏ュ簱璇锋眰寮傚父锛宐arcode={}锛宻tationId={}锛宼askNo={}锛宔rror={}",
+                            barcode, sourceStaNo, taskNo, e.getMessage());
+                    redisUtil.set(responseKey, "ERROR:" + e.getMessage(), 10);
+                } finally {
+                    asyncInTaskInflight.remove(requestKey);
+                    redisUtil.del(requestKey);
                 }
-            } catch (Exception e) {
-                News.error("寮傛WMS鍏ュ簱璇锋眰寮傚父锛宐arcode={}锛宻tationId={}锛宔rror={}", barcode, sourceStaNo, e.getMessage());
-                redisUtil.set(responseKey, "ERROR:" + e.getMessage(), 10);
-            } finally {
-                // 娓呴櫎璇锋眰杩涜涓爣璁�
-                redisUtil.del(requestKey);
-            }
-        }).start();
+            });
+        } catch (RejectedExecutionException e) {
+            asyncInTaskInflight.remove(requestKey);
+            redisUtil.del(requestKey);
+            redisUtil.set(responseKey, "ERROR:ASYNC_QUEUE_FULL", 10);
+            News.error("寮傛WMS鍏ュ簱璇锋眰琚嫆缁濓紝绾跨▼姹犲凡婊★紝barcode={}锛宻tationId={}锛宼askNo={}",
+                    barcode, sourceStaNo, taskNo);
+        }
     }
 
     /**
@@ -184,7 +240,11 @@
      * @return 鍝嶅簲缁撴灉锛宯ull琛ㄧず杩樻湭瀹屾垚鎴栨湭鎵惧埌
      */
     public String queryAsyncInTaskResponse(String barcode, Integer stationId) {
-        String responseKey = RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key + barcode + "_" + stationId;
+        return queryAsyncInTaskResponse(barcode, stationId, null);
+    }
+
+    public String queryAsyncInTaskResponse(String barcode, Integer stationId, Integer taskNo) {
+        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, barcode, stationId, taskNo);
         Object response = redisUtil.get(responseKey);
         if (response != null) {
             // 鑾峰彇鍚庡垹闄わ紝閬垮厤閲嶅澶勭悊
@@ -202,14 +262,18 @@
      * @return true琛ㄧず姝e湪璇锋眰涓�
      */
     public boolean isAsyncRequestInProgress(String barcode, Integer stationId) {
-        String requestKey = RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key + barcode + "_" + stationId;
-        return redisUtil.get(requestKey) != null;
+        return isAsyncRequestInProgress(barcode, stationId, null);
+    }
+
+    public boolean isAsyncRequestInProgress(String barcode, Integer stationId, Integer taskNo) {
+        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, barcode, stationId, taskNo);
+        return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null;
     }
 
     // 鐢宠浠诲姟閲嶆柊鍒嗛厤搴撲綅
     public synchronized String applyReassignTaskLocNo(Integer taskNo, Integer stationId) {
         String wmsUrl = null;
-        Config wmsSystemUriConfig = configService.selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemUri"));
+        Config wmsSystemUriConfig = configService.getOne(new QueryWrapper<Config>().eq("code", "wmsSystemUri"));
         if (wmsSystemUriConfig != null) {
             wmsUrl = wmsSystemUriConfig.getValue();
         }
@@ -221,7 +285,7 @@
 
         String wmsSystemReassignInTaskUrl = null;
         Config wmsSystemReassignInTaskUrlConfig = configService
-                .selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemReassignInTaskUrl"));
+                .getOne(new QueryWrapper<Config>().eq("code", "wmsSystemReassignInTaskUrl"));
         if (wmsSystemReassignInTaskUrlConfig != null) {
             wmsSystemReassignInTaskUrl = wmsSystemReassignInTaskUrlConfig.getValue();
         }
@@ -284,7 +348,7 @@
             httpRequestLog.setResponse(response);
             httpRequestLog.setCreateTime(new Date());
             httpRequestLog.setResult(result);
-            httpRequestLogService.insert(httpRequestLog);
+            httpRequestLogService.save(httpRequestLog);
         }
         return response;
     }
@@ -292,7 +356,7 @@
     // 鐢宠鍦ㄥ簱搴撲綅鏇存崲搴撲綅
     public synchronized String applyChangeLocNo(String locNo) {
         String wmsUrl = null;
-        Config wmsSystemUriConfig = configService.selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemUri"));
+        Config wmsSystemUriConfig = configService.getOne(new QueryWrapper<Config>().eq("code", "wmsSystemUri"));
         if (wmsSystemUriConfig != null) {
             wmsUrl = wmsSystemUriConfig.getValue();
         }
@@ -304,7 +368,7 @@
 
         String wmsSystemChangeLocNoUrl = null;
         Config wmsSystemChangeLocNoUrlConfig = configService
-                .selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemChangeLocNoUrl"));
+                .getOne(new QueryWrapper<Config>().eq("code", "wmsSystemChangeLocNoUrl"));
         if (wmsSystemChangeLocNoUrlConfig != null) {
             wmsSystemChangeLocNoUrl = wmsSystemChangeLocNoUrlConfig.getValue();
         }
@@ -322,7 +386,7 @@
         List<Integer> crnRows = new ArrayList<>();
 
         if (findCrnNoResult.getCrnType().equals(SlaveType.Crn)) {
-            BasCrnp basCrnp = basCrnpService.selectOne(new EntityWrapper<BasCrnp>().eq("crn_no", crnNo));
+            BasCrnp basCrnp = basCrnpService.getOne(new QueryWrapper<BasCrnp>().eq("crn_no", crnNo));
             if (basCrnp == null) {
                 return null;
             }
@@ -332,7 +396,7 @@
             }
         } else if (findCrnNoResult.getCrnType().equals(SlaveType.DualCrn)) {
             BasDualCrnp basDualCrnp = basDualCrnpService
-                    .selectOne(new EntityWrapper<BasDualCrnp>().eq("crn_no", crnNo));
+                    .getOne(new QueryWrapper<BasDualCrnp>().eq("crn_no", crnNo));
             if (basDualCrnp == null) {
                 return null;
             }
@@ -383,7 +447,7 @@
             httpRequestLog.setResponse(response);
             httpRequestLog.setCreateTime(new Date());
             httpRequestLog.setResult(result);
-            httpRequestLogService.insert(httpRequestLog);
+            httpRequestLogService.save(httpRequestLog);
         }
         return response;
     }

--
Gitblit v1.9.1