From 5a56b56dc646c89669bfbc373853689d2888f103 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期一, 16 三月 2026 13:41:15 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/core/utils/WmsOperateUtils.java | 113 +++++++++++++++++++++++++++++++++++++-------------------
1 files changed, 75 insertions(+), 38 deletions(-)
diff --git a/src/main/java/com/zy/core/utils/WmsOperateUtils.java b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
index 4ef1979..7bbcb33 100644
--- a/src/main/java/com/zy/core/utils/WmsOperateUtils.java
+++ b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -2,7 +2,7 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.mapper.EntityWrapper;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.core.common.Cools;
import com.core.exception.CoolException;
import com.zy.asrs.entity.BasCrnp;
@@ -32,10 +32,36 @@
import java.util.Date;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
@Component
public class WmsOperateUtils {
+
+ private static final int APPLY_IN_TASK_ASYNC_THREADS = Math.max(2,
+ Math.min(4, Runtime.getRuntime().availableProcessors()));
+ private static final int APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY = 200;
+ private static final int APPLY_IN_TASK_REQUEST_TTL_SECONDS = 10 * 60;
+ private static final int APPLY_IN_TASK_RESPONSE_TTL_SECONDS = 60;
+ private static final AtomicInteger APPLY_IN_TASK_THREAD_NO = new AtomicInteger(1);
+ private static final ThreadPoolExecutor APPLY_IN_TASK_EXECUTOR = new ThreadPoolExecutor(
+ APPLY_IN_TASK_ASYNC_THREADS,
+ APPLY_IN_TASK_ASYNC_THREADS,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY),
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("WmsApplyInTask-" + APPLY_IN_TASK_THREAD_NO.getAndIncrement());
+ thread.setDaemon(true);
+ return thread;
+ }
+ );
@Autowired
private ConfigService configService;
@@ -54,8 +80,10 @@
@Autowired
private RedisUtil redisUtil;
+ private final ConcurrentMap<String, Boolean> asyncInTaskInflight = new ConcurrentHashMap<>();
+
// 鐢宠鍏ュ簱浠诲姟
- public synchronized String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
+ public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
if (systemConfigMapObj == null) {
News.error("绯荤粺Config缂撳瓨澶辨晥");
@@ -80,7 +108,7 @@
int result = 0;
try {
BasStation basStation = basStationService
- .selectOne(new EntityWrapper<BasStation>().eq("station_id", sourceStaNo));
+ .getOne(new QueryWrapper<BasStation>().eq("station_id", sourceStaNo));
if (basStation == null) {
News.error("绔欑偣{}涓嶅瓨鍦�", sourceStaNo);
return null;
@@ -103,7 +131,7 @@
.setTimeout(30, TimeUnit.SECONDS)
.build()
.doPost();
- if (response != null) {
+ if (!Cools.isEmpty(response)) {
JSONObject jsonObject = JSON.parseObject(response);
if (jsonObject.getInteger("code") == 200) {
result = 1;
@@ -127,7 +155,7 @@
httpRequestLog.setResponse(response);
httpRequestLog.setCreateTime(new Date());
httpRequestLog.setResult(result);
- httpRequestLogService.insert(httpRequestLog);
+ httpRequestLogService.save(httpRequestLog);
}
return response;
}
@@ -144,36 +172,45 @@
String requestKey = RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key + barcode + "_" + sourceStaNo;
String responseKey = RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key + barcode + "_" + sourceStaNo;
+ if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) {
+ return;
+ }
+
// 妫�鏌ユ槸鍚﹀凡鏈夎姹傚湪杩涜涓�
Object existingRequest = redisUtil.get(requestKey);
if (existingRequest != null) {
+ asyncInTaskInflight.remove(requestKey);
return; // 宸叉湁璇锋眰鍦ㄨ繘琛屼腑锛岃烦杩�
}
- // 鏍囪璇锋眰杩涜涓紝璁剧疆60绉掕秴鏃�
- redisUtil.set(requestKey, "processing", 60);
+ // 鏍囪璇锋眰杩涜涓紝閬垮厤璇锋眰鍦ㄧ嚎绋嬫睜鎺掗槦鏃惰閲嶅鎻愪氦
+ redisUtil.set(requestKey, "processing", APPLY_IN_TASK_REQUEST_TTL_SECONDS);
- // 鎻愪氦寮傛浠诲姟
- new Thread(() -> {
- try {
- String response = applyInTask(barcode, sourceStaNo, locType1);
- if (response != null) {
- // 瀛樺偍鍝嶅簲缁撴灉锛岃缃�60绉掕秴鏃�
- redisUtil.set(responseKey, response, 60);
- News.info("寮傛WMS鍏ュ簱璇锋眰瀹屾垚锛宐arcode={}锛宻tationId={}锛宺esponse={}", barcode, sourceStaNo, response);
- } else {
- // 璇锋眰澶辫触锛屽瓨鍌ㄥけ璐ユ爣璁�
- redisUtil.set(responseKey, "FAILED", 10);
- News.error("寮傛WMS鍏ュ簱璇锋眰澶辫触锛宐arcode={}锛宻tationId={}", barcode, sourceStaNo);
+ try {
+ APPLY_IN_TASK_EXECUTOR.execute(() -> {
+ try {
+ String response = applyInTask(barcode, sourceStaNo, locType1);
+ if (response != null) {
+ redisUtil.set(responseKey, response, APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
+ News.info("寮傛WMS鍏ュ簱璇锋眰瀹屾垚锛宐arcode={}锛宻tationId={}锛宺esponse={}", barcode, sourceStaNo, response);
+ } else {
+ redisUtil.set(responseKey, "FAILED", 10);
+ News.error("寮傛WMS鍏ュ簱璇锋眰澶辫触锛宐arcode={}锛宻tationId={}", barcode, sourceStaNo);
+ }
+ } catch (Exception e) {
+ News.error("寮傛WMS鍏ュ簱璇锋眰寮傚父锛宐arcode={}锛宻tationId={}锛宔rror={}", barcode, sourceStaNo, e.getMessage());
+ redisUtil.set(responseKey, "ERROR:" + e.getMessage(), 10);
+ } finally {
+ asyncInTaskInflight.remove(requestKey);
+ redisUtil.del(requestKey);
}
- } catch (Exception e) {
- News.error("寮傛WMS鍏ュ簱璇锋眰寮傚父锛宐arcode={}锛宻tationId={}锛宔rror={}", barcode, sourceStaNo, e.getMessage());
- redisUtil.set(responseKey, "ERROR:" + e.getMessage(), 10);
- } finally {
- // 娓呴櫎璇锋眰杩涜涓爣璁�
- redisUtil.del(requestKey);
- }
- }).start();
+ });
+ } catch (RejectedExecutionException e) {
+ asyncInTaskInflight.remove(requestKey);
+ redisUtil.del(requestKey);
+ redisUtil.set(responseKey, "ERROR:ASYNC_QUEUE_FULL", 10);
+ News.error("寮傛WMS鍏ュ簱璇锋眰琚嫆缁濓紝绾跨▼姹犲凡婊★紝barcode={}锛宻tationId={}", barcode, sourceStaNo);
+ }
}
/**
@@ -203,13 +240,13 @@
*/
public boolean isAsyncRequestInProgress(String barcode, Integer stationId) {
String requestKey = RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key + barcode + "_" + stationId;
- return redisUtil.get(requestKey) != null;
+ return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null;
}
// 鐢宠浠诲姟閲嶆柊鍒嗛厤搴撲綅
public synchronized String applyReassignTaskLocNo(Integer taskNo, Integer stationId) {
String wmsUrl = null;
- Config wmsSystemUriConfig = configService.selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemUri"));
+ Config wmsSystemUriConfig = configService.getOne(new QueryWrapper<Config>().eq("code", "wmsSystemUri"));
if (wmsSystemUriConfig != null) {
wmsUrl = wmsSystemUriConfig.getValue();
}
@@ -221,7 +258,7 @@
String wmsSystemReassignInTaskUrl = null;
Config wmsSystemReassignInTaskUrlConfig = configService
- .selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemReassignInTaskUrl"));
+ .getOne(new QueryWrapper<Config>().eq("code", "wmsSystemReassignInTaskUrl"));
if (wmsSystemReassignInTaskUrlConfig != null) {
wmsSystemReassignInTaskUrl = wmsSystemReassignInTaskUrlConfig.getValue();
}
@@ -260,7 +297,7 @@
.setTimeout(30, TimeUnit.SECONDS)
.build()
.doPost();
- if (response != null) {
+ if (!Cools.isEmpty(response)) {
JSONObject jsonObject = JSON.parseObject(response);
if (jsonObject.getInteger("code") == 200) {
result = 1;
@@ -284,7 +321,7 @@
httpRequestLog.setResponse(response);
httpRequestLog.setCreateTime(new Date());
httpRequestLog.setResult(result);
- httpRequestLogService.insert(httpRequestLog);
+ httpRequestLogService.save(httpRequestLog);
}
return response;
}
@@ -292,7 +329,7 @@
// 鐢宠鍦ㄥ簱搴撲綅鏇存崲搴撲綅
public synchronized String applyChangeLocNo(String locNo) {
String wmsUrl = null;
- Config wmsSystemUriConfig = configService.selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemUri"));
+ Config wmsSystemUriConfig = configService.getOne(new QueryWrapper<Config>().eq("code", "wmsSystemUri"));
if (wmsSystemUriConfig != null) {
wmsUrl = wmsSystemUriConfig.getValue();
}
@@ -304,7 +341,7 @@
String wmsSystemChangeLocNoUrl = null;
Config wmsSystemChangeLocNoUrlConfig = configService
- .selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemChangeLocNoUrl"));
+ .getOne(new QueryWrapper<Config>().eq("code", "wmsSystemChangeLocNoUrl"));
if (wmsSystemChangeLocNoUrlConfig != null) {
wmsSystemChangeLocNoUrl = wmsSystemChangeLocNoUrlConfig.getValue();
}
@@ -322,7 +359,7 @@
List<Integer> crnRows = new ArrayList<>();
if (findCrnNoResult.getCrnType().equals(SlaveType.Crn)) {
- BasCrnp basCrnp = basCrnpService.selectOne(new EntityWrapper<BasCrnp>().eq("crn_no", crnNo));
+ BasCrnp basCrnp = basCrnpService.getOne(new QueryWrapper<BasCrnp>().eq("crn_no", crnNo));
if (basCrnp == null) {
return null;
}
@@ -332,7 +369,7 @@
}
} else if (findCrnNoResult.getCrnType().equals(SlaveType.DualCrn)) {
BasDualCrnp basDualCrnp = basDualCrnpService
- .selectOne(new EntityWrapper<BasDualCrnp>().eq("crn_no", crnNo));
+ .getOne(new QueryWrapper<BasDualCrnp>().eq("crn_no", crnNo));
if (basDualCrnp == null) {
return null;
}
@@ -359,7 +396,7 @@
.build()
.doPost();
- if (response != null) {
+ if (!Cools.isEmpty(response)) {
JSONObject jsonObject = JSON.parseObject(response);
if (jsonObject.getInteger("code") == 200) {
result = 1;
@@ -383,7 +420,7 @@
httpRequestLog.setResponse(response);
httpRequestLog.setCreateTime(new Date());
httpRequestLog.setResult(result);
- httpRequestLogService.insert(httpRequestLog);
+ httpRequestLogService.save(httpRequestLog);
}
return response;
}
--
Gitblit v1.9.1