From 641bf75f1b6684ee5b6d13497ad1106b82c59043 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 31 三月 2026 23:13:59 +0800
Subject: [PATCH] #入库同步串行请求
---
src/main/java/com/zy/core/utils/WmsOperateUtils.java | 259 ---------------------------------------------------
1 files changed, 0 insertions(+), 259 deletions(-)
diff --git a/src/main/java/com/zy/core/utils/WmsOperateUtils.java b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
index e11ffec..4b81b92 100644
--- a/src/main/java/com/zy/core/utils/WmsOperateUtils.java
+++ b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -23,8 +23,6 @@
import com.zy.core.News;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
-import com.zy.core.plugin.store.AsyncInTaskResult;
-import com.zy.core.plugin.store.AsyncInTaskStatus;
import com.zy.core.plugin.store.InTaskApplyRequest;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
@@ -37,36 +35,10 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
@Component
public class WmsOperateUtils {
-
- private static final int APPLY_IN_TASK_ASYNC_THREADS = Math.max(2,
- Math.min(4, Runtime.getRuntime().availableProcessors()));
- private static final int APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY = 200;
- private static final int APPLY_IN_TASK_REQUEST_TTL_SECONDS = 10 * 60;
- private static final int APPLY_IN_TASK_RESPONSE_TTL_SECONDS = 60;
- private static final AtomicInteger APPLY_IN_TASK_THREAD_NO = new AtomicInteger(1);
- private static final ThreadPoolExecutor APPLY_IN_TASK_EXECUTOR = new ThreadPoolExecutor(
- APPLY_IN_TASK_ASYNC_THREADS,
- APPLY_IN_TASK_ASYNC_THREADS,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY),
- runnable -> {
- Thread thread = new Thread(runnable);
- thread.setName("WmsApplyInTask-" + APPLY_IN_TASK_THREAD_NO.getAndIncrement());
- thread.setDaemon(true);
- return thread;
- }
- );
@Autowired
private ConfigService configService;
@@ -84,36 +56,6 @@
private BasStationService basStationService;
@Autowired
private RedisUtil redisUtil;
-
- private final ConcurrentMap<String, Boolean> asyncInTaskInflight = new ConcurrentHashMap<>();
-
- private String buildAsyncInTaskBizKey(InTaskApplyRequest request) {
- if (!Cools.isEmpty(request.getBizKey())) {
- return request.getBizKey();
- }
- StringBuilder keyBuilder = new StringBuilder(request.getBarcode())
- .append("_")
- .append(request.getSourceStaNo());
- if (request.getTaskNo() != null && request.getTaskNo() > 0) {
- keyBuilder.append("_").append(request.getTaskNo());
- }
- return keyBuilder.toString();
- }
-
- private String buildAsyncInTaskKey(String prefix, InTaskApplyRequest request) {
- return prefix + buildAsyncInTaskBizKey(request);
- }
-
- private String buildAsyncInTaskKey(String prefix, String barcode, Integer stationId, Integer taskNo) {
- StringBuilder keyBuilder = new StringBuilder(prefix)
- .append(barcode)
- .append("_")
- .append(stationId);
- if (taskNo != null && taskNo > 0) {
- keyBuilder.append("_").append(taskNo);
- }
- return keyBuilder.toString();
- }
// 鐢宠鍏ュ簱浠诲姟
public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
@@ -202,207 +144,6 @@
httpRequestLogService.save(httpRequestLog);
}
return response;
- }
-
- /**
- * 寮傛鐢宠鍏ュ簱浠诲姟 - 闈為樆濉炵増鏈�
- * 灏嗚姹傛彁浜ゅ埌绾跨▼姹犲紓姝ユ墽琛岋紝缁撴灉瀛樺偍鍒癛edis涓�
- *
- * @param barcode 鎵樼洏鐮�
- * @param sourceStaNo 绔欑偣缂栧彿
- * @param locType1 鎵樼洏楂樺害
- */
- public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer locType1) {
- applyInTaskAsync(barcode, sourceStaNo, null, locType1);
- }
-
- public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer taskNo, Integer locType1) {
- InTaskApplyRequest request = new InTaskApplyRequest();
- request.setBarcode(barcode);
- request.setSourceStaNo(sourceStaNo);
- request.setTaskNo(taskNo);
- request.setLocType1(locType1);
- applyInTaskAsync(request);
- }
-
- public void applyInTaskAsync(InTaskApplyRequest request) {
- String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request);
- String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request);
-
- if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) {
- return;
- }
-
- // 妫�鏌ユ槸鍚﹀凡鏈夎姹傚湪杩涜涓�
- Object existingRequest = redisUtil.get(requestKey);
- if (existingRequest != null) {
- asyncInTaskInflight.remove(requestKey);
- return; // 宸叉湁璇锋眰鍦ㄨ繘琛屼腑锛岃烦杩�
- }
-
- // 鏍囪璇锋眰杩涜涓紝閬垮厤璇锋眰鍦ㄧ嚎绋嬫睜鎺掗槦鏃惰閲嶅鎻愪氦
- redisUtil.set(requestKey, "processing", APPLY_IN_TASK_REQUEST_TTL_SECONDS);
-
- try {
- APPLY_IN_TASK_EXECUTOR.execute(() -> {
- try {
- String response = applyInTask(request);
- AsyncInTaskResult result = buildAsyncInTaskResult(request, response, null);
- redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
- News.info("寮傛WMS鍏ュ簱璇锋眰瀹屾垚锛宐arcode={}锛宻tationId={}锛宼askNo={}锛宻tatus={}锛宺esponse={}",
- request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(),
- result.getStatus(), response);
- } catch (Exception e) {
- News.error("寮傛WMS鍏ュ簱璇锋眰寮傚父锛宐arcode={}锛宻tationId={}锛宼askNo={}锛宔rror={}",
- request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(), e.getMessage());
- AsyncInTaskResult result = buildAsyncInTaskResult(request, null, e);
- redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
- } finally {
- asyncInTaskInflight.remove(requestKey);
- redisUtil.del(requestKey);
- }
- });
- } catch (RejectedExecutionException e) {
- asyncInTaskInflight.remove(requestKey);
- redisUtil.del(requestKey);
- AsyncInTaskResult result = new AsyncInTaskResult();
- result.setBizKey(buildAsyncInTaskBizKey(request));
- result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
- result.setMessage("ERROR:ASYNC_QUEUE_FULL");
- redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
- News.error("寮傛WMS鍏ュ簱璇锋眰琚嫆缁濓紝绾跨▼姹犲凡婊★紝barcode={}锛宻tationId={}锛宼askNo={}",
- request.getBarcode(), request.getSourceStaNo(), request.getTaskNo());
- }
- }
-
- /**
- * 鏌ヨ寮傛鍏ュ簱浠诲姟璇锋眰缁撴灉
- *
- * @param barcode 鎵樼洏鐮�
- * @param stationId 绔欑偣缂栧彿
- * @return 鍝嶅簲缁撴灉锛宯ull琛ㄧず杩樻湭瀹屾垚鎴栨湭鎵惧埌
- */
- public String queryAsyncInTaskResponse(String barcode, Integer stationId) {
- return queryAsyncInTaskResponse(barcode, stationId, null);
- }
-
- public String queryAsyncInTaskResponse(String barcode, Integer stationId, Integer taskNo) {
- InTaskApplyRequest request = new InTaskApplyRequest();
- request.setBarcode(barcode);
- request.setSourceStaNo(stationId);
- request.setTaskNo(taskNo);
- AsyncInTaskResult result = queryAsyncInTaskResponse(request);
- if (result == null) {
- return null;
- }
- clearAsyncInTaskResponse(request);
- if (result.isSuccess()) {
- return result.getResponse();
- }
- if (!Cools.isEmpty(result.getMessage())) {
- return result.getMessage();
- }
- return result.getResponse();
- }
-
- public AsyncInTaskResult queryAsyncInTaskResponse(InTaskApplyRequest request) {
- String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request);
- Object response = redisUtil.get(responseKey);
- if (response == null) {
- return null;
- }
- return parseAsyncInTaskResult(buildAsyncInTaskBizKey(request), response.toString());
- }
-
- public void clearAsyncInTaskResponse(InTaskApplyRequest request) {
- String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request);
- redisUtil.del(responseKey);
- }
-
- private AsyncInTaskResult parseAsyncInTaskResult(String bizKey, String responseValue) {
- AsyncInTaskResult result = null;
- try {
- JSONObject jsonObject = JSON.parseObject(responseValue);
- if (jsonObject != null && jsonObject.containsKey("status")) {
- result = jsonObject.toJavaObject(AsyncInTaskResult.class);
- } else if (jsonObject != null && jsonObject.containsKey("code")) {
- result = new AsyncInTaskResult();
- result.setBizKey(bizKey);
- result.setResponse(responseValue);
- if (Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
- result.setStatus(AsyncInTaskStatus.SUCCESS);
- } else {
- result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
- result.setMessage(responseValue);
- }
- }
- } catch (Exception ignored) {
- }
-
- if (result != null) {
- return result;
- }
-
- result = new AsyncInTaskResult();
- result.setBizKey(bizKey);
- result.setResponse(responseValue);
- result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
- result.setMessage(responseValue);
- return result;
- }
-
- /**
- * 妫�鏌ユ槸鍚︽湁寮傛璇锋眰姝e湪杩涜涓�
- *
- * @param barcode 鎵樼洏鐮�
- * @param stationId 绔欑偣缂栧彿
- * @return true琛ㄧず姝e湪璇锋眰涓�
- */
- public boolean isAsyncRequestInProgress(String barcode, Integer stationId) {
- return isAsyncRequestInProgress(barcode, stationId, null);
- }
-
- public boolean isAsyncRequestInProgress(String barcode, Integer stationId, Integer taskNo) {
- InTaskApplyRequest request = new InTaskApplyRequest();
- request.setBarcode(barcode);
- request.setSourceStaNo(stationId);
- request.setTaskNo(taskNo);
- return isAsyncRequestInProgress(request);
- }
-
- public boolean isAsyncRequestInProgress(InTaskApplyRequest request) {
- String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request);
- return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null;
- }
-
- private AsyncInTaskResult buildAsyncInTaskResult(InTaskApplyRequest request, String response, Exception exception) {
- AsyncInTaskResult result = new AsyncInTaskResult();
- result.setBizKey(buildAsyncInTaskBizKey(request));
- result.setResponse(response);
- if (exception != null) {
- result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
- result.setMessage("ERROR:" + exception.getMessage());
- return result;
- }
-
- if (response == null) {
- result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
- result.setMessage("FAILED");
- return result;
- }
-
- try {
- JSONObject jsonObject = JSON.parseObject(response);
- if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
- result.setStatus(AsyncInTaskStatus.SUCCESS);
- return result;
- }
- } catch (Exception ignored) {
- }
-
- result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
- result.setMessage(response);
- return result;
}
// 鐢宠浠诲姟閲嶆柊鍒嗛厤搴撲綅
--
Gitblit v1.9.1