From 88892fc38d50297c5ac5edf3fed629236bb9fd9d Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 31 三月 2026 23:22:31 +0800
Subject: [PATCH] #入库同步串行请求
---
src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java | 192 ++++++++++++++++++++++++++++++++++--------------
src/main/java/com/zy/core/utils/WmsOperateUtils.java | 3
src/main/java/com/zy/core/plugin/GslProcess.java | 33 +++++---
3 files changed, 158 insertions(+), 70 deletions(-)
diff --git a/src/main/java/com/zy/core/plugin/GslProcess.java b/src/main/java/com/zy/core/plugin/GslProcess.java
index 23ccfdd..88b9b2f 100644
--- a/src/main/java/com/zy/core/plugin/GslProcess.java
+++ b/src/main/java/com/zy/core/plugin/GslProcess.java
@@ -38,7 +38,7 @@
public class GslProcess implements MainProcessPluginApi, StoreInTaskPolicy {
private static final String CRN_TASK_LANE = "crn";
private static final String STATION_TASK_LANE = "station";
- private static final String GENERATE_STORE_TASK_LANE = "generate-store";
+ private static final String GENERATE_STORE_TASK_LANE_PREFIX = "generate-store-";
private static final long DISPATCH_INTERVAL_MS = 200L;
private static final long MAINTENANCE_INTERVAL_MS = 500L;
private static final long TASK_SLOW_LOG_THRESHOLD_MS = 1000L;
@@ -64,8 +64,8 @@
public void run() {
//妫�娴嬪叆搴撶珯鏄惁鏈変换鍔$敓鎴愶紝骞跺惎鍔ㄥ叆搴�
checkInStationHasTask();
- //璇锋眰鐢熸垚鍏ュ簱浠诲姟
- submitGenerateStoreTask("generateStoreWrkFile", DISPATCH_INTERVAL_MS, this::generateStoreWrkFile);
+ //鎸夌珯鐐规媶鍒嗙敓鎴愬叆搴撲换鍔★紝閬垮厤鍗曚釜绔欑偣闃诲鏁磋疆鎵弿
+ submitGenerateStoreTasks();
//鍫嗗灈鏈轰笌杈撻�佺珯鐐归兘鎸夊崟涓换鍔℃彁浜ゅ埌鍚勮嚜涓茶閫氶亾锛岄�愪釜鎵ц
submitCrnTask("crnIoExecute", DISPATCH_INTERVAL_MS, crnOperateUtils::crnIoExecute);
@@ -76,14 +76,6 @@
submitStationTask("watchCircleStation", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::watchCircleStation);
submitStationTask("checkStationRunBlock", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationRunBlock);
submitStationTask("checkStationIdleRecover", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationIdleRecover);
- }
-
- /**
- * 璇锋眰鐢熸垚鍏ュ簱浠诲姟
- * 鍏ュ簱绔欙紝鏍规嵁鏉$爜鎵弿鐢熸垚鍏ュ簱宸ヤ綔妗�
- */
- public synchronized void generateStoreWrkFile() {
- storeInTaskGenerationService.generate(this);
}
@Override
@@ -177,8 +169,23 @@
submitProcessTask(STATION_TASK_LANE, taskName, minIntervalMs, task);
}
- private void submitGenerateStoreTask(String taskName, long minIntervalMs, Runnable task) {
- submitProcessTask(GENERATE_STORE_TASK_LANE, taskName, minIntervalMs, task);
+ private void submitGenerateStoreTasks() {
+ List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
+ for (BasDevp basDevp : basDevps) {
+ List<StationObjModel> barcodeStations = getBarcodeStations(basDevp);
+ for (StationObjModel stationObjModel : barcodeStations) {
+ Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
+ if (stationId == null) {
+ continue;
+ }
+ submitGenerateStoreTask(stationId, DISPATCH_INTERVAL_MS,
+ () -> storeInTaskGenerationService.generate(this, basDevp, stationObjModel));
+ }
+ }
+ }
+
+ private void submitGenerateStoreTask(Integer stationId, long minIntervalMs, Runnable task) {
+ submitProcessTask(GENERATE_STORE_TASK_LANE_PREFIX + stationId, "generateStoreWrkFile", minIntervalMs, task);
}
private void submitCrnTask(String taskName, long minIntervalMs, Runnable task) {
diff --git a/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java b/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
index 4405611..9997796 100644
--- a/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
+++ b/src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
@@ -27,9 +27,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
@Service
public class StoreInTaskGenerationService {
+ private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5;
@Autowired
private BasDevpService basDevpService;
@@ -44,81 +46,112 @@
@Autowired
private CommonService commonService;
+ private final AtomicInteger inFlightGenerateCount = new AtomicInteger(0);
+
public void generate(StoreInTaskPolicy policy) {
try {
if (!policy.isEnabled()) {
return;
}
- Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
- if (systemConfigMapObj == null) {
+ HashMap<String, String> systemConfigMap = getSystemConfigMap();
+ if (systemConfigMap == null) {
return;
}
- HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
-
- int conveyorStationTaskLimit = 30;
- String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit");
- if (conveyorStationTaskLimitStr != null) {
- conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr);
- }
- int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
- if (currentStationTaskCount > conveyorStationTaskLimit) {
- News.error("杈撻�佺珯鐐逛换鍔″凡杈惧埌涓婇檺锛屼笂闄愬�硷細{}锛岀珯鐐逛换鍔℃暟锛歿}", conveyorStationTaskLimit, currentStationTaskCount);
+ if (!hasAvailableStationTaskCapacity(systemConfigMap)) {
return;
}
List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
for (BasDevp basDevp : basDevps) {
- StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
- if (stationThread == null) {
- continue;
- }
-
- Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
List<StationObjModel> barcodeStations = policy.getBarcodeStations(basDevp);
for (StationObjModel stationObjModel : barcodeStations) {
- Integer stationId = stationObjModel.getStationId();
- if (!stationMap.containsKey(stationId)) {
- continue;
- }
-
- StationProtocol stationProtocol = stationMap.get(stationId);
- if (stationProtocol == null) {
- continue;
- }
-
- StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel,
- stationProtocol);
- if (!policy.matchCandidate(context)) {
- continue;
- }
- if (!policy.beforeApply(context)) {
- continue;
- }
-
- List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
- .eq("barcode", stationProtocol.getBarcode()));
- if (!wrkMasts.isEmpty()) {
- continue;
- }
-
- String generateLockKey = policy.getGenerateLockKey(context);
- Object lock = redisUtil.get(generateLockKey);
- if (lock != null) {
- continue;
- }
-
- policy.onRequestPermitGranted(context);
-
- InTaskApplyRequest request = policy.buildApplyRequest(context);
- News.info("鍙戣捣鍚屾WMS鍏ュ簱璇锋眰锛宐arcode={}锛宻tationId={}", request.getBarcode(),
- request.getSourceStaNo());
- InTaskApplyResult result = applySyncInTask(request);
- handleApplyResult(policy, context, request, result);
+ generate(policy, basDevp, stationObjModel, systemConfigMap);
}
}
} catch (Exception e) {
News.error("鐢熸垚鍏ュ簱浠诲姟寮傚父锛宲olicy={}", policy.getPolicyName(), e);
+ }
+ }
+
+ public void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel) {
+ try {
+ if (!policy.isEnabled()) {
+ return;
+ }
+
+ HashMap<String, String> systemConfigMap = getSystemConfigMap();
+ if (systemConfigMap == null) {
+ return;
+ }
+ if (!hasAvailableStationTaskCapacity(systemConfigMap)) {
+ return;
+ }
+
+ generate(policy, basDevp, stationObjModel, systemConfigMap);
+ } catch (Exception e) {
+ Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
+ News.error("鐢熸垚鍏ュ簱浠诲姟寮傚父锛宲olicy={}锛宻tationId={}", policy.getPolicyName(), stationId, e);
+ }
+ }
+
+ private void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel,
+ HashMap<String, String> systemConfigMap) {
+ if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
+ return;
+ }
+
+ StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
+ if (stationThread == null) {
+ return;
+ }
+
+ Integer stationId = stationObjModel.getStationId();
+ Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
+ if (!stationMap.containsKey(stationId)) {
+ return;
+ }
+
+ StationProtocol stationProtocol = stationMap.get(stationId);
+ if (stationProtocol == null) {
+ return;
+ }
+
+ StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel,
+ stationProtocol);
+ if (!policy.matchCandidate(context)) {
+ return;
+ }
+ if (!policy.beforeApply(context)) {
+ return;
+ }
+
+ List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
+ .eq("barcode", stationProtocol.getBarcode()));
+ if (!wrkMasts.isEmpty()) {
+ return;
+ }
+
+ String generateLockKey = policy.getGenerateLockKey(context);
+ Object lock = redisUtil.get(generateLockKey);
+ if (lock != null) {
+ return;
+ }
+
+ if (!tryReserveGenerateCapacity(systemConfigMap)) {
+ return;
+ }
+
+ try {
+ policy.onRequestPermitGranted(context);
+
+ InTaskApplyRequest request = policy.buildApplyRequest(context);
+ News.info("鍙戣捣鍚屾WMS鍏ュ簱璇锋眰锛宐arcode={}锛宻tationId={}锛宼imeout={}s",
+ request.getBarcode(), request.getSourceStaNo(), APPLY_IN_TASK_TIMEOUT_SECONDS);
+ InTaskApplyResult result = applySyncInTask(request);
+ handleApplyResult(policy, context, request, result);
+ } finally {
+ releaseGenerateCapacity();
}
}
@@ -199,4 +232,51 @@
return result;
}
+ private HashMap<String, String> getSystemConfigMap() {
+ Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
+ if (systemConfigMapObj == null) {
+ return null;
+ }
+ return (HashMap<String, String>) systemConfigMapObj;
+ }
+
+ private boolean hasAvailableStationTaskCapacity(HashMap<String, String> systemConfigMap) {
+ int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
+ int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
+ if (currentStationTaskCount > conveyorStationTaskLimit) {
+ News.error("杈撻�佺珯鐐逛换鍔″凡杈惧埌涓婇檺锛屼笂闄愬�硷細{}锛岀珯鐐逛换鍔℃暟锛歿}", conveyorStationTaskLimit, currentStationTaskCount);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) {
+ int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
+ while (true) {
+ int reservedCount = inFlightGenerateCount.get();
+ int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
+ if (currentStationTaskCount + reservedCount >= conveyorStationTaskLimit) {
+ News.error("杈撻�佺珯鐐逛换鍔″凡杈惧埌涓婇檺锛屼笂闄愬�硷細{}锛岀珯鐐逛换鍔℃暟锛歿}锛岀敓鎴愪腑浠诲姟鏁帮細{}",
+ conveyorStationTaskLimit, currentStationTaskCount, reservedCount);
+ return false;
+ }
+ if (inFlightGenerateCount.compareAndSet(reservedCount, reservedCount + 1)) {
+ return true;
+ }
+ }
+ }
+
+ private void releaseGenerateCapacity() {
+ inFlightGenerateCount.updateAndGet(current -> current > 0 ? current - 1 : 0);
+ }
+
+ private int getConveyorStationTaskLimit(HashMap<String, String> systemConfigMap) {
+ int conveyorStationTaskLimit = 30;
+ String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit");
+ if (conveyorStationTaskLimitStr != null) {
+ conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr);
+ }
+ return conveyorStationTaskLimit;
+ }
+
}
diff --git a/src/main/java/com/zy/core/utils/WmsOperateUtils.java b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
index 4b81b92..0c33c6d 100644
--- a/src/main/java/com/zy/core/utils/WmsOperateUtils.java
+++ b/src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -39,6 +39,7 @@
@Component
public class WmsOperateUtils {
+ private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5;
@Autowired
private ConfigService configService;
@@ -114,7 +115,7 @@
.setUri(wmsUrl)
.setPath(wmsSystemInUrl)
.setJson(JSON.toJSONString(requestParam))
- .setTimeout(30, TimeUnit.SECONDS)
+ .setTimeout(APPLY_IN_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.build()
.doPost();
if (!Cools.isEmpty(response)) {
--
Gitblit v1.9.1