From 03c3ae747f82ad22c761c79e7b1c0e0031c57d41 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期一, 06 四月 2026 20:28:35 +0800
Subject: [PATCH] #出库站点命令下发
---
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java | 180 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 169 insertions(+), 11 deletions(-)
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
index 0f8f76c..3e83c85 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -36,17 +36,22 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
@Data
@Slf4j
public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread {
- private static final int SEGMENT_EXECUTOR_POOL_SIZE = 64;
+ private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
+ private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize";
+ private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20;
+ private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48;
+ private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L;
private DeviceConfig deviceConfig;
private RedisUtil redisUtil;
private ZyStationConnectDriver zyStationConnectDriver;
- private final ExecutorService executor = Executors.newFixedThreadPool(SEGMENT_EXECUTOR_POOL_SIZE);
+ private final ExecutorService executor;
private StationV5SegmentExecutor segmentExecutor;
private final RecentStationArrivalTracker recentArrivalTracker;
private final StationV5StatusReader statusReader;
@@ -55,10 +60,13 @@
public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
this.redisUtil = redisUtil;
+ int poolSize = resolveSegmentExecutorPoolSize(redisUtil);
+ this.executor = Executors.newFixedThreadPool(poolSize);
this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker);
this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil);
+ log.info("鍒濆鍖朧5杈撻�佺嚎绋嬫睜锛宒eviceNo={}, poolSize={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize);
}
@Override
@@ -126,19 +134,43 @@
return map;
}
+ @Override
+ public List<Integer> getAllTaskNoList() {
+ return statusReader.getTaskNoList();
+ }
+
private void pollAndDispatchQueuedCommand() {
Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
if (task == null || task.getStep() == null || task.getStep() != 2) {
return;
}
- submitSegmentCommand((StationCommand) task.getData());
+ StationCommand command = (StationCommand) task.getData();
+ logExecutorAbnormal("queue-poll", command);
+ submitSegmentCommand(command);
}
private void submitSegmentCommand(StationCommand command) {
if (command == null || executor == null || segmentExecutor == null) {
return;
}
- executor.submit(() -> segmentExecutor.execute(command));
+ executor.submit(() -> {
+ long start = System.currentTimeMillis();
+ try {
+ segmentExecutor.execute(command);
+ } finally {
+ long costMs = System.currentTimeMillis() - start;
+ if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
+ log.warn("V5杈撻�佸懡浠ゅ垎娈垫墽琛岃�楁椂杩囬暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
+ deviceConfig.getDeviceNo(),
+ command.getTaskNo(),
+ command.getStationId(),
+ command.getTargetStaNo(),
+ costMs);
+ logExecutorAbnormal("segment-finish-slow", command);
+ }
+ }
+ });
+ logExecutorAbnormal("after-submit", command);
}
@Override
@@ -197,10 +229,12 @@
return getCommand(StationCommandType.MOVE, taskNo, stationId, targetStationId, palletSize, pathLenFactor);
}
+ long startNs = System.nanoTime();
StationTaskLoopService taskLoopService = loadStationTaskLoopService();
StationTaskLoopService.LoopEvaluation loopEvaluation = taskLoopService == null
? new StationTaskLoopService.LoopEvaluation(taskNo, stationId, StationTaskLoopService.LoopIdentitySnapshot.empty(), 0, 0, false)
: taskLoopService.evaluateLoop(taskNo, stationId, true);
+ long loopEvalNs = System.nanoTime();
log.info("杈撻�佺嚎鍫靛閲嶈鍒掔幆绾胯瘑鍒紝taskNo={}, stationId={}, scopeType={}, localStationCount={}, sourceLoopStationCount={}",
taskNo,
stationId,
@@ -208,6 +242,7 @@
loopEvaluation.getLoopIdentity().getLocalStationCount(),
loopEvaluation.getLoopIdentity().getSourceLoopStationCount());
List<List<NavigateNode>> candidatePathList = calcCandidatePathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor);
+ long candidatePathNs = System.nanoTime();
List<StationCommand> candidateCommandList = new ArrayList<>();
for (List<NavigateNode> candidatePath : candidatePathList) {
StationCommand rerouteCommand = buildMoveCommand(taskNo, stationId, targetStationId, palletSize, candidatePath);
@@ -216,6 +251,7 @@
}
candidateCommandList.add(rerouteCommand);
}
+ long buildCommandNs = System.nanoTime();
StationV5RunBlockReroutePlanner.PlanResult planResult = runBlockReroutePlanner.plan(
taskNo,
@@ -223,6 +259,17 @@
loopEvaluation,
candidateCommandList
);
+ long planNs = System.nanoTime();
+ logRunBlockRerouteCost(taskNo,
+ stationId,
+ targetStationId,
+ candidatePathList == null ? 0 : candidatePathList.size(),
+ candidateCommandList.size(),
+ startNs,
+ loopEvalNs,
+ candidatePathNs,
+ buildCommandNs,
+ planNs);
if (candidateCommandList.isEmpty()) {
log.warn("杈撻�佺嚎鍫靛閲嶈鍒掑け璐ワ紝鍊欓�夎矾寰勪负绌猴紝taskNo={}, planCount={}, stationId={}, targetStationId={}",
taskNo, planResult.getPlanCount(), stationId, targetStationId);
@@ -274,21 +321,54 @@
continue;
}
found = true;
+ Integer clearedTaskNo = item.getTaskNo();
if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, item.getSlotIdx())) {
success = false;
log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬竻鐞嗗け璐ャ�俿tationId={}, slotIdx={}, taskNo={}",
- stationId, item.getSlotIdx(), item.getTaskNo());
+ stationId, item.getSlotIdx(), clearedTaskNo);
continue;
- }else {
- item.setTaskNo(0);
- item.setTargetStaNo(0);
- success = true;
- log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬竻鐞嗘垚鍔熴�俿tationId={}, slotIdx={}, taskNo={}",
- stationId, item.getSlotIdx(), item.getTaskNo());
}
+ item.setTaskNo(0);
+ item.setTargetStaNo(0);
+ log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬竻鐞嗘垚鍔熴�俿tationId={}, slotIdx={}, taskNo={}",
+ stationId, item.getSlotIdx(), clearedTaskNo);
}
}
return found && success;
+ }
+
+ @Override
+ public boolean clearPathByStationSlot(Integer stationId, Integer slotIdx) {
+ if (stationId == null || slotIdx == null || zyStationConnectDriver == null) {
+ return false;
+ }
+ List<StationProtocol> status = getStatus();
+ if (status == null || status.isEmpty()) {
+ return false;
+ }
+
+ for (StationProtocol stationProtocol : status) {
+ if (stationProtocol == null || !Objects.equals(stationId, stationProtocol.getStationId())) {
+ continue;
+ }
+ if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, slotIdx)) {
+ log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬寜绔欑偣妲戒綅娓呯悊澶辫触銆俿tationId={}, slotIdx={}", stationId, slotIdx);
+ return false;
+ }
+ List<StationTaskBufferItem> taskBufferItems = stationProtocol.getTaskBufferItems();
+ if (taskBufferItems != null) {
+ for (StationTaskBufferItem item : taskBufferItems) {
+ if (item != null && Objects.equals(slotIdx, item.getSlotIdx())) {
+ item.setTaskNo(0);
+ item.setTargetStaNo(0);
+ break;
+ }
+ }
+ }
+ log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬寜绔欑偣妲戒綅娓呯悊鎴愬姛銆俿tationId={}, slotIdx={}", stationId, slotIdx);
+ return true;
+ }
+ return false;
}
@Override
@@ -331,6 +411,53 @@
optService.save(basStationOpt);
}
return commandResponse;
+ }
+
+ private void logExecutorAbnormal(String scene, StationCommand command) {
+ if (!(executor instanceof ThreadPoolExecutor)) {
+ return;
+ }
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
+ int activeCount = threadPoolExecutor.getActiveCount();
+ int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size();
+ if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) {
+ return;
+ }
+ log.warn("V5杈撻�佺嚎绋嬫睜鍑虹幇鍫嗙Н锛宻cene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}",
+ scene,
+ deviceConfig.getDeviceNo(),
+ command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getStationId(),
+ command == null ? null : command.getTargetStaNo(),
+ threadPoolExecutor.getPoolSize(),
+ activeCount,
+ queuedCount,
+ threadPoolExecutor.getCompletedTaskCount());
+ }
+
+ @SuppressWarnings("unchecked")
+ private int resolveSegmentExecutorPoolSize(RedisUtil redisUtil) {
+ if (redisUtil == null) {
+ return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
+ }
+ try {
+ Object systemConfigMapObj = redisUtil.get(com.zy.core.enums.RedisKeyType.SYSTEM_CONFIG_MAP.key);
+ if (!(systemConfigMapObj instanceof HashMap)) {
+ return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
+ }
+ HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
+ String poolSizeText = systemConfigMap.get(CFG_SEGMENT_EXECUTOR_POOL_SIZE);
+ if (poolSizeText == null || poolSizeText.trim().isEmpty()) {
+ return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
+ }
+ int configured = Integer.parseInt(poolSizeText.trim());
+ if (configured < 16) {
+ return 16;
+ }
+ return Math.min(configured, 512);
+ } catch (Exception ignore) {
+ return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
+ }
}
@Override
@@ -423,4 +550,35 @@
return null;
}
}
+
+ private void logRunBlockRerouteCost(Integer taskNo,
+ Integer stationId,
+ Integer targetStationId,
+ int candidatePathCount,
+ int candidateCommandCount,
+ long startNs,
+ long loopEvalNs,
+ long candidatePathNs,
+ long buildCommandNs,
+ long planNs) {
+ long totalMs = nanosToMillis(planNs - startNs);
+ if (totalMs < 1000L) {
+ return;
+ }
+ log.warn("杈撻�佺嚎鍫靛閲嶈鍒掕�楁椂杈冮暱, taskNo={}, stationId={}, targetStationId={}, total={}ms, loopEval={}ms, candidatePath={}ms, buildCommand={}ms, planner={}ms, candidatePathCount={}, candidateCommandCount={}",
+ taskNo,
+ stationId,
+ targetStationId,
+ totalMs,
+ nanosToMillis(loopEvalNs - startNs),
+ nanosToMillis(candidatePathNs - loopEvalNs),
+ nanosToMillis(buildCommandNs - candidatePathNs),
+ nanosToMillis(planNs - buildCommandNs),
+ candidatePathCount,
+ candidateCommandCount);
+ }
+
+ private long nanosToMillis(long nanos) {
+ return nanos <= 0L ? 0L : nanos / 1_000_000L;
+ }
}
--
Gitblit v1.9.1