From 03c3ae747f82ad22c761c79e7b1c0e0031c57d41 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期一, 06 四月 2026 20:28:35 +0800
Subject: [PATCH] #出库站点命令下发

---
 src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java |  180 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 169 insertions(+), 11 deletions(-)

diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
index 0f8f76c..3e83c85 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -36,17 +36,22 @@
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 
 @Data
 @Slf4j
 public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread {
 
-    private static final int SEGMENT_EXECUTOR_POOL_SIZE = 64;
+    private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
+    private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize";
+    private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20;
+    private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48;
+    private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L;
 
     private DeviceConfig deviceConfig;
     private RedisUtil redisUtil;
     private ZyStationConnectDriver zyStationConnectDriver;
-    private final ExecutorService executor = Executors.newFixedThreadPool(SEGMENT_EXECUTOR_POOL_SIZE);
+    private final ExecutorService executor;
     private StationV5SegmentExecutor segmentExecutor;
     private final RecentStationArrivalTracker recentArrivalTracker;
     private final StationV5StatusReader statusReader;
@@ -55,10 +60,13 @@
     public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
         this.redisUtil = redisUtil;
+        int poolSize = resolveSegmentExecutorPoolSize(redisUtil);
+        this.executor = Executors.newFixedThreadPool(poolSize);
         this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
         this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
         this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker);
         this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil);
+        log.info("鍒濆鍖朧5杈撻�佺嚎绋嬫睜锛宒eviceNo={}, poolSize={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize);
     }
 
     @Override
@@ -126,19 +134,43 @@
         return map;
     }
 
+    @Override
+    public List<Integer> getAllTaskNoList() {
+        return statusReader.getTaskNoList();
+    }
+
     private void pollAndDispatchQueuedCommand() {
         Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
         if (task == null || task.getStep() == null || task.getStep() != 2) {
             return;
         }
-        submitSegmentCommand((StationCommand) task.getData());
+        StationCommand command = (StationCommand) task.getData();
+        logExecutorAbnormal("queue-poll", command);
+        submitSegmentCommand(command);
     }
 
     private void submitSegmentCommand(StationCommand command) {
         if (command == null || executor == null || segmentExecutor == null) {
             return;
         }
-        executor.submit(() -> segmentExecutor.execute(command));
+        executor.submit(() -> {
+            long start = System.currentTimeMillis();
+            try {
+                segmentExecutor.execute(command);
+            } finally {
+                long costMs = System.currentTimeMillis() - start;
+                if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
+                    log.warn("V5杈撻�佸懡浠ゅ垎娈垫墽琛岃�楁椂杩囬暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
+                            deviceConfig.getDeviceNo(),
+                            command.getTaskNo(),
+                            command.getStationId(),
+                            command.getTargetStaNo(),
+                            costMs);
+                    logExecutorAbnormal("segment-finish-slow", command);
+                }
+            }
+        });
+        logExecutorAbnormal("after-submit", command);
     }
 
     @Override
@@ -197,10 +229,12 @@
             return getCommand(StationCommandType.MOVE, taskNo, stationId, targetStationId, palletSize, pathLenFactor);
         }
 
+        long startNs = System.nanoTime();
         StationTaskLoopService taskLoopService = loadStationTaskLoopService();
         StationTaskLoopService.LoopEvaluation loopEvaluation = taskLoopService == null
                 ? new StationTaskLoopService.LoopEvaluation(taskNo, stationId, StationTaskLoopService.LoopIdentitySnapshot.empty(), 0, 0, false)
                 : taskLoopService.evaluateLoop(taskNo, stationId, true);
+        long loopEvalNs = System.nanoTime();
         log.info("杈撻�佺嚎鍫靛閲嶈鍒掔幆绾胯瘑鍒紝taskNo={}, stationId={}, scopeType={}, localStationCount={}, sourceLoopStationCount={}",
                 taskNo,
                 stationId,
@@ -208,6 +242,7 @@
                 loopEvaluation.getLoopIdentity().getLocalStationCount(),
                 loopEvaluation.getLoopIdentity().getSourceLoopStationCount());
         List<List<NavigateNode>> candidatePathList = calcCandidatePathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor);
+        long candidatePathNs = System.nanoTime();
         List<StationCommand> candidateCommandList = new ArrayList<>();
         for (List<NavigateNode> candidatePath : candidatePathList) {
             StationCommand rerouteCommand = buildMoveCommand(taskNo, stationId, targetStationId, palletSize, candidatePath);
@@ -216,6 +251,7 @@
             }
             candidateCommandList.add(rerouteCommand);
         }
+        long buildCommandNs = System.nanoTime();
 
         StationV5RunBlockReroutePlanner.PlanResult planResult = runBlockReroutePlanner.plan(
                 taskNo,
@@ -223,6 +259,17 @@
                 loopEvaluation,
                 candidateCommandList
         );
+        long planNs = System.nanoTime();
+        logRunBlockRerouteCost(taskNo,
+                stationId,
+                targetStationId,
+                candidatePathList == null ? 0 : candidatePathList.size(),
+                candidateCommandList.size(),
+                startNs,
+                loopEvalNs,
+                candidatePathNs,
+                buildCommandNs,
+                planNs);
         if (candidateCommandList.isEmpty()) {
             log.warn("杈撻�佺嚎鍫靛閲嶈鍒掑け璐ワ紝鍊欓�夎矾寰勪负绌猴紝taskNo={}, planCount={}, stationId={}, targetStationId={}",
                     taskNo, planResult.getPlanCount(), stationId, targetStationId);
@@ -274,21 +321,54 @@
                     continue;
                 }
                 found = true;
+                Integer clearedTaskNo = item.getTaskNo();
                 if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, item.getSlotIdx())) {
                     success = false;
                     log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬竻鐞嗗け璐ャ�俿tationId={}, slotIdx={}, taskNo={}",
-                            stationId, item.getSlotIdx(), item.getTaskNo());
+                            stationId, item.getSlotIdx(), clearedTaskNo);
                     continue;
-                }else {
-                    item.setTaskNo(0);
-                    item.setTargetStaNo(0);
-                    success = true;
-                    log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬竻鐞嗘垚鍔熴�俿tationId={}, slotIdx={}, taskNo={}",
-                            stationId, item.getSlotIdx(), item.getTaskNo());
                 }
+                item.setTaskNo(0);
+                item.setTargetStaNo(0);
+                log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬竻鐞嗘垚鍔熴�俿tationId={}, slotIdx={}, taskNo={}",
+                        stationId, item.getSlotIdx(), clearedTaskNo);
             }
         }
         return found && success;
+    }
+
+    @Override
+    public boolean clearPathByStationSlot(Integer stationId, Integer slotIdx) {
+        if (stationId == null || slotIdx == null || zyStationConnectDriver == null) {
+            return false;
+        }
+        List<StationProtocol> status = getStatus();
+        if (status == null || status.isEmpty()) {
+            return false;
+        }
+
+        for (StationProtocol stationProtocol : status) {
+            if (stationProtocol == null || !Objects.equals(stationId, stationProtocol.getStationId())) {
+                continue;
+            }
+            if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, slotIdx)) {
+                log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬寜绔欑偣妲戒綅娓呯悊澶辫触銆俿tationId={}, slotIdx={}", stationId, slotIdx);
+                return false;
+            }
+            List<StationTaskBufferItem> taskBufferItems = stationProtocol.getTaskBufferItems();
+            if (taskBufferItems != null) {
+                for (StationTaskBufferItem item : taskBufferItems) {
+                    if (item != null && Objects.equals(slotIdx, item.getSlotIdx())) {
+                        item.setTaskNo(0);
+                        item.setTargetStaNo(0);
+                        break;
+                    }
+                }
+            }
+            log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬寜绔欑偣妲戒綅娓呯悊鎴愬姛銆俿tationId={}, slotIdx={}", stationId, slotIdx);
+            return true;
+        }
+        return false;
     }
 
     @Override
@@ -331,6 +411,53 @@
             optService.save(basStationOpt);
         }
         return commandResponse;
+    }
+
+    private void logExecutorAbnormal(String scene, StationCommand command) {
+        if (!(executor instanceof ThreadPoolExecutor)) {
+            return;
+        }
+        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
+        int activeCount = threadPoolExecutor.getActiveCount();
+        int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size();
+        if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) {
+            return;
+        }
+        log.warn("V5杈撻�佺嚎绋嬫睜鍑虹幇鍫嗙Н锛宻cene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}",
+                scene,
+                deviceConfig.getDeviceNo(),
+                command == null ? null : command.getTaskNo(),
+                command == null ? null : command.getStationId(),
+                command == null ? null : command.getTargetStaNo(),
+                threadPoolExecutor.getPoolSize(),
+                activeCount,
+                queuedCount,
+                threadPoolExecutor.getCompletedTaskCount());
+    }
+
+    @SuppressWarnings("unchecked")
+    private int resolveSegmentExecutorPoolSize(RedisUtil redisUtil) {
+        if (redisUtil == null) {
+            return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
+        }
+        try {
+            Object systemConfigMapObj = redisUtil.get(com.zy.core.enums.RedisKeyType.SYSTEM_CONFIG_MAP.key);
+            if (!(systemConfigMapObj instanceof HashMap)) {
+                return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
+            }
+            HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
+            String poolSizeText = systemConfigMap.get(CFG_SEGMENT_EXECUTOR_POOL_SIZE);
+            if (poolSizeText == null || poolSizeText.trim().isEmpty()) {
+                return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
+            }
+            int configured = Integer.parseInt(poolSizeText.trim());
+            if (configured < 16) {
+                return 16;
+            }
+            return Math.min(configured, 512);
+        } catch (Exception ignore) {
+            return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
+        }
     }
 
     @Override
@@ -423,4 +550,35 @@
             return null;
         }
     }
+
+    private void logRunBlockRerouteCost(Integer taskNo,
+                                        Integer stationId,
+                                        Integer targetStationId,
+                                        int candidatePathCount,
+                                        int candidateCommandCount,
+                                        long startNs,
+                                        long loopEvalNs,
+                                        long candidatePathNs,
+                                        long buildCommandNs,
+                                        long planNs) {
+        long totalMs = nanosToMillis(planNs - startNs);
+        if (totalMs < 1000L) {
+            return;
+        }
+        log.warn("杈撻�佺嚎鍫靛閲嶈鍒掕�楁椂杈冮暱, taskNo={}, stationId={}, targetStationId={}, total={}ms, loopEval={}ms, candidatePath={}ms, buildCommand={}ms, planner={}ms, candidatePathCount={}, candidateCommandCount={}",
+                taskNo,
+                stationId,
+                targetStationId,
+                totalMs,
+                nanosToMillis(loopEvalNs - startNs),
+                nanosToMillis(candidatePathNs - loopEvalNs),
+                nanosToMillis(buildCommandNs - candidatePathNs),
+                nanosToMillis(planNs - buildCommandNs),
+                candidatePathCount,
+                candidateCommandCount);
+    }
+
+    private long nanosToMillis(long nanos) {
+        return nanos <= 0L ? 0L : nanos / 1_000_000L;
+    }
 }

--
Gitblit v1.9.1