From 641bf75f1b6684ee5b6d13497ad1106b82c59043 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 31 三月 2026 23:13:59 +0800
Subject: [PATCH] #入库同步串行请求

---
 src/main/java/com/zy/core/utils/WmsOperateUtils.java |  166 +++++++------------------------------------------------
 1 files changed, 21 insertions(+), 145 deletions(-)

diff --git a/src/main/java/com/zy/core/utils/WmsOperateUtils.java b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
index a99d80a..4b81b92 100644
--- a/src/main/java/com/zy/core/utils/WmsOperateUtils.java
+++ b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -23,6 +23,7 @@
 import com.zy.core.News;
 import com.zy.core.enums.RedisKeyType;
 import com.zy.core.enums.SlaveType;
+import com.zy.core.plugin.store.InTaskApplyRequest;
 import com.zy.system.entity.Config;
 import com.zy.system.service.ConfigService;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -31,37 +32,13 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 @Component
 public class WmsOperateUtils {
-
-    private static final int APPLY_IN_TASK_ASYNC_THREADS = Math.max(2,
-            Math.min(4, Runtime.getRuntime().availableProcessors()));
-    private static final int APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY = 200;
-    private static final int APPLY_IN_TASK_REQUEST_TTL_SECONDS = 10 * 60;
-    private static final int APPLY_IN_TASK_RESPONSE_TTL_SECONDS = 60;
-    private static final AtomicInteger APPLY_IN_TASK_THREAD_NO = new AtomicInteger(1);
-    private static final ThreadPoolExecutor APPLY_IN_TASK_EXECUTOR = new ThreadPoolExecutor(
-            APPLY_IN_TASK_ASYNC_THREADS,
-            APPLY_IN_TASK_ASYNC_THREADS,
-            60L,
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY),
-            runnable -> {
-                Thread thread = new Thread(runnable);
-                thread.setName("WmsApplyInTask-" + APPLY_IN_TASK_THREAD_NO.getAndIncrement());
-                thread.setDaemon(true);
-                return thread;
-            }
-    );
 
     @Autowired
     private ConfigService configService;
@@ -80,21 +57,16 @@
     @Autowired
     private RedisUtil redisUtil;
 
-    private final ConcurrentMap<String, Boolean> asyncInTaskInflight = new ConcurrentHashMap<>();
-
-    private String buildAsyncInTaskKey(String prefix, String barcode, Integer stationId, Integer taskNo) {
-        StringBuilder keyBuilder = new StringBuilder(prefix)
-                .append(barcode)
-                .append("_")
-                .append(stationId);
-        if (taskNo != null && taskNo > 0) {
-            keyBuilder.append("_").append(taskNo);
-        }
-        return keyBuilder.toString();
-    }
-
     // 鐢宠鍏ュ簱浠诲姟
     public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
+        InTaskApplyRequest request = new InTaskApplyRequest();
+        request.setBarcode(barcode);
+        request.setSourceStaNo(sourceStaNo);
+        request.setLocType1(locType1);
+        return applyInTask(request);
+    }
+
+    public String applyInTask(InTaskApplyRequest request) {
         Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
         if (systemConfigMapObj == null) {
             News.error("绯荤粺Config缂撳瓨澶辨晥");
@@ -114,26 +86,29 @@
             return null;
         }
 
-        HashMap<String, Object> requestParam = new HashMap<>();
+        Map<String, Object> requestParam = new LinkedHashMap<>();
         String response = null;
         int result = 0;
         try {
             BasStation basStation = basStationService
-                    .getOne(new QueryWrapper<BasStation>().eq("station_id", sourceStaNo));
+                    .getOne(new QueryWrapper<BasStation>().eq("station_id", request.getSourceStaNo()));
             if (basStation == null) {
-                News.error("绔欑偣{}涓嶅瓨鍦�", sourceStaNo);
+                News.error("绔欑偣{}涓嶅瓨鍦�", request.getSourceStaNo());
                 return null;
             }
 
-            String stationNo = String.valueOf(sourceStaNo);
+            String stationNo = String.valueOf(request.getSourceStaNo());
             if (!Cools.isEmpty(basStation.getStationAlias())) {
                 stationNo = basStation.getStationAlias();
             }
 
-            requestParam.put("barcode", barcode);
+            requestParam.put("barcode", request.getBarcode());
             requestParam.put("sourceStaNo", stationNo);
-            requestParam.put("locType1", locType1 == null ? 1 : locType1);
-            requestParam.put("row", Utils.getInTaskEnableRow(sourceStaNo));
+            requestParam.put("locType1", request.getLocType1() == null ? 1 : request.getLocType1());
+            requestParam.put("row", Utils.getInTaskEnableRow(request.getSourceStaNo()));
+            if (request.getExtraParams() != null && !request.getExtraParams().isEmpty()) {
+                requestParam.putAll(request.getExtraParams());
+            }
 
             response = new HttpHandler.Builder()
                     .setUri(wmsUrl)
@@ -169,105 +144,6 @@
             httpRequestLogService.save(httpRequestLog);
         }
         return response;
-    }
-
-    /**
-     * 寮傛鐢宠鍏ュ簱浠诲姟 - 闈為樆濉炵増鏈�
-     * 灏嗚姹傛彁浜ゅ埌绾跨▼姹犲紓姝ユ墽琛岋紝缁撴灉瀛樺偍鍒癛edis涓�
-     * 
-     * @param barcode     鎵樼洏鐮�
-     * @param sourceStaNo 绔欑偣缂栧彿
-     * @param locType1    鎵樼洏楂樺害
-     */
-    public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer locType1) {
-        applyInTaskAsync(barcode, sourceStaNo, null, locType1);
-    }
-
-    public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer taskNo, Integer locType1) {
-        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, barcode, sourceStaNo, taskNo);
-        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, barcode, sourceStaNo, taskNo);
-
-        if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) {
-            return;
-        }
-
-        // 妫�鏌ユ槸鍚﹀凡鏈夎姹傚湪杩涜涓�
-        Object existingRequest = redisUtil.get(requestKey);
-        if (existingRequest != null) {
-            asyncInTaskInflight.remove(requestKey);
-            return; // 宸叉湁璇锋眰鍦ㄨ繘琛屼腑锛岃烦杩�
-        }
-
-        // 鏍囪璇锋眰杩涜涓紝閬垮厤璇锋眰鍦ㄧ嚎绋嬫睜鎺掗槦鏃惰閲嶅鎻愪氦
-        redisUtil.set(requestKey, "processing", APPLY_IN_TASK_REQUEST_TTL_SECONDS);
-
-        try {
-            APPLY_IN_TASK_EXECUTOR.execute(() -> {
-                try {
-                    String response = applyInTask(barcode, sourceStaNo, locType1);
-                    if (response != null) {
-                        redisUtil.set(responseKey, response, APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
-                        News.info("寮傛WMS鍏ュ簱璇锋眰瀹屾垚锛宐arcode={}锛宻tationId={}锛宼askNo={}锛宺esponse={}",
-                                barcode, sourceStaNo, taskNo, response);
-                    } else {
-                        redisUtil.set(responseKey, "FAILED", 10);
-                        News.error("寮傛WMS鍏ュ簱璇锋眰澶辫触锛宐arcode={}锛宻tationId={}锛宼askNo={}",
-                                barcode, sourceStaNo, taskNo);
-                    }
-                } catch (Exception e) {
-                    News.error("寮傛WMS鍏ュ簱璇锋眰寮傚父锛宐arcode={}锛宻tationId={}锛宼askNo={}锛宔rror={}",
-                            barcode, sourceStaNo, taskNo, e.getMessage());
-                    redisUtil.set(responseKey, "ERROR:" + e.getMessage(), 10);
-                } finally {
-                    asyncInTaskInflight.remove(requestKey);
-                    redisUtil.del(requestKey);
-                }
-            });
-        } catch (RejectedExecutionException e) {
-            asyncInTaskInflight.remove(requestKey);
-            redisUtil.del(requestKey);
-            redisUtil.set(responseKey, "ERROR:ASYNC_QUEUE_FULL", 10);
-            News.error("寮傛WMS鍏ュ簱璇锋眰琚嫆缁濓紝绾跨▼姹犲凡婊★紝barcode={}锛宻tationId={}锛宼askNo={}",
-                    barcode, sourceStaNo, taskNo);
-        }
-    }
-
-    /**
-     * 鏌ヨ寮傛鍏ュ簱浠诲姟璇锋眰缁撴灉
-     * 
-     * @param barcode   鎵樼洏鐮�
-     * @param stationId 绔欑偣缂栧彿
-     * @return 鍝嶅簲缁撴灉锛宯ull琛ㄧず杩樻湭瀹屾垚鎴栨湭鎵惧埌
-     */
-    public String queryAsyncInTaskResponse(String barcode, Integer stationId) {
-        return queryAsyncInTaskResponse(barcode, stationId, null);
-    }
-
-    public String queryAsyncInTaskResponse(String barcode, Integer stationId, Integer taskNo) {
-        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, barcode, stationId, taskNo);
-        Object response = redisUtil.get(responseKey);
-        if (response != null) {
-            // 鑾峰彇鍚庡垹闄わ紝閬垮厤閲嶅澶勭悊
-            redisUtil.del(responseKey);
-            return response.toString();
-        }
-        return null;
-    }
-
-    /**
-     * 妫�鏌ユ槸鍚︽湁寮傛璇锋眰姝e湪杩涜涓�
-     * 
-     * @param barcode   鎵樼洏鐮�
-     * @param stationId 绔欑偣缂栧彿
-     * @return true琛ㄧず姝e湪璇锋眰涓�
-     */
-    public boolean isAsyncRequestInProgress(String barcode, Integer stationId) {
-        return isAsyncRequestInProgress(barcode, stationId, null);
-    }
-
-    public boolean isAsyncRequestInProgress(String barcode, Integer stationId, Integer taskNo) {
-        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, barcode, stationId, taskNo);
-        return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null;
     }
 
     // 鐢宠浠诲姟閲嶆柊鍒嗛厤搴撲綅

--
Gitblit v1.9.1