From 63b01db83d9aad8a15276b4236a9a22e4aeef065 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 05 五月 2026 12:30:59 +0800
Subject: [PATCH] # Agent数据分析V3.0.1.7

---
 src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java |  180 +++++++++++++++++++++++++++++++++++++----------------------
 1 files changed, 113 insertions(+), 67 deletions(-)

diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
index 3e83c85..b4f259a 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -21,6 +21,8 @@
 import com.zy.core.network.ZyStationConnectDriver;
 import com.zy.core.network.entity.ZyStationStatusEntity;
 import com.zy.core.service.StationTaskLoopService;
+import com.zy.core.task.BasStationOptAsyncPublisher;
+import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider;
 import com.zy.core.thread.impl.v5.StationV5RunBlockReroutePlanner;
 import com.zy.core.thread.impl.v5.StationV5SegmentExecutor;
 import com.zy.core.thread.impl.v5.StationV5StatusReader;
@@ -34,39 +36,55 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 @Data
 @Slf4j
 public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread {
 
     private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
-    private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize";
+    private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512;
     private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20;
     private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48;
     private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L;
+    private static final long COMMAND_BUILD_WARN_MS = 500L;
+    private static final int QUEUE_DRAIN_BATCH_SIZE = 32;
+    private static final long QUEUE_IDLE_SLEEP_MS = 20L;
 
     private DeviceConfig deviceConfig;
     private RedisUtil redisUtil;
     private ZyStationConnectDriver zyStationConnectDriver;
-    private final ExecutorService executor;
+    private final ThreadPoolExecutor executor;
     private StationV5SegmentExecutor segmentExecutor;
     private final RecentStationArrivalTracker recentArrivalTracker;
     private final StationV5StatusReader statusReader;
     private final StationV5RunBlockReroutePlanner runBlockReroutePlanner;
+    private final int executorQueueCapacity;
+    private final BasStationOptAsyncPublisher basStationOptAsyncPublisher;
 
     public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
         this.redisUtil = redisUtil;
-        int poolSize = resolveSegmentExecutorPoolSize(redisUtil);
-        this.executor = Executors.newFixedThreadPool(poolSize);
+        StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class);
+        int poolSize = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE : configProvider.getSegmentExecutorPoolSize();
+        this.executorQueueCapacity = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY : configProvider.getSegmentExecutorQueueCapacity();
+        this.executor = new ThreadPoolExecutor(
+                poolSize,
+                poolSize,
+                0L,
+                TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<>(executorQueueCapacity)
+        );
         this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
         this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
         this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker);
         this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil);
-        log.info("鍒濆鍖朧5杈撻�佺嚎绋嬫睜锛宒eviceNo={}, poolSize={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize);
+        this.basStationOptAsyncPublisher = SpringUtils.getBean(BasStationOptAsyncPublisher.class);
+        log.info("鍒濆鍖朧5杈撻�佺嚎绋嬫睜锛宒eviceNo={}, poolSize={}, queueCapacity={}",
+                deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize, executorQueueCapacity);
     }
 
     @Override
@@ -89,8 +107,10 @@
         Thread processThread = new Thread(() -> {
             while (true) {
                 try {
-                    pollAndDispatchQueuedCommand();
-                    Thread.sleep(100);
+                    int dispatchedCount = pollAndDispatchQueuedCommandBatch();
+                    if (dispatchedCount <= 0) {
+                        Thread.sleep(QUEUE_IDLE_SLEEP_MS);
+                    }
                 } catch (Exception e) {
                     log.error("StationV5Process Fail", e);
                 }
@@ -139,38 +159,71 @@
         return statusReader.getTaskNoList();
     }
 
-    private void pollAndDispatchQueuedCommand() {
-        Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
-        if (task == null || task.getStep() == null || task.getStep() != 2) {
-            return;
+    private int pollAndDispatchQueuedCommandBatch() {
+        int dispatchedCount = 0;
+        while (dispatchedCount < QUEUE_DRAIN_BATCH_SIZE) {
+            if (isExecutorQueueAtWatermark()) {
+                logExecutorAbnormal("executor-watermark", null);
+                break;
+            }
+            Task task = MessageQueue.peek(SlaveType.Devp, deviceConfig.getDeviceNo());
+            if (task == null || task.getStep() == null || task.getStep() != 2) {
+                break;
+            }
+            StationCommand command = (StationCommand) task.getData();
+            logExecutorAbnormal("queue-peek", command);
+            if (!submitSegmentCommand(command)) {
+                logExecutorAbnormal("submit-rejected", command);
+                break;
+            }
+            MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
+            dispatchedCount++;
         }
-        StationCommand command = (StationCommand) task.getData();
-        logExecutorAbnormal("queue-poll", command);
-        submitSegmentCommand(command);
+        return dispatchedCount;
     }
 
-    private void submitSegmentCommand(StationCommand command) {
+    private boolean submitSegmentCommand(StationCommand command) {
         if (command == null || executor == null || segmentExecutor == null) {
-            return;
+            return false;
         }
-        executor.submit(() -> {
-            long start = System.currentTimeMillis();
-            try {
-                segmentExecutor.execute(command);
-            } finally {
-                long costMs = System.currentTimeMillis() - start;
-                if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
-                    log.warn("V5杈撻�佸懡浠ゅ垎娈垫墽琛岃�楁椂杩囬暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
-                            deviceConfig.getDeviceNo(),
-                            command.getTaskNo(),
-                            command.getStationId(),
-                            command.getTargetStaNo(),
-                            costMs);
-                    logExecutorAbnormal("segment-finish-slow", command);
+        try {
+            executor.execute(() -> {
+                long start = System.currentTimeMillis();
+                try {
+                    segmentExecutor.execute(command);
+                } finally {
+                    long costMs = System.currentTimeMillis() - start;
+                    if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
+                        log.warn("V5杈撻�佸懡浠ゅ垎娈垫墽琛岃�楁椂杩囬暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
+                                deviceConfig.getDeviceNo(),
+                                command.getTaskNo(),
+                                command.getStationId(),
+                                command.getTargetStaNo(),
+                                costMs);
+                        logExecutorAbnormal("segment-finish-slow", command);
+                    }
                 }
-            }
-        });
+            });
+        } catch (RejectedExecutionException e) {
+            log.error("V5杈撻�佺嚎绋嬫睜鎷掔粷鎵ц锛屼繚鐣欒澶囬槦鍒楃Н鍘嬶紝deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, activeCount={}, queuedCount={}, queueCapacity={}",
+                    deviceConfig.getDeviceNo(),
+                    command.getTaskNo(),
+                    command.getStationId(),
+                    command.getTargetStaNo(),
+                    executor.getActiveCount(),
+                    executor.getQueue() == null ? 0 : executor.getQueue().size(),
+                    executorQueueCapacity);
+            return false;
+        }
         logExecutorAbnormal("after-submit", command);
+        return true;
+    }
+
+    private boolean isExecutorQueueAtWatermark() {
+        if (executor == null || executor.getQueue() == null) {
+            return false;
+        }
+        return executor.getQueue().size() >= executorQueueCapacity;
     }
 
     @Override
@@ -202,8 +255,25 @@
         stationCommand.setCommandType(commandType);
 
         if (commandType == StationCommandType.MOVE && !stationId.equals(targetStationId)) {
+            long startNs = System.nanoTime();
+            long calcPathStartNs = startNs;
             List<NavigateNode> nodes = calcPathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor);
-            return fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId);
+            long calcPathCostMs = nanosToMillis(System.nanoTime() - calcPathStartNs);
+            long fillCommandStartNs = System.nanoTime();
+            StationCommand builtCommand = fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId);
+            long fillCommandCostMs = nanosToMillis(System.nanoTime() - fillCommandStartNs);
+            long totalCostMs = nanosToMillis(System.nanoTime() - startNs);
+            if (totalCostMs >= COMMAND_BUILD_WARN_MS) {
+                log.warn("V5杈撻�佸懡浠ょ敓鎴愯�楁椂杈冮暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, calcPath={}ms, fillCommand={}ms, total={}ms",
+                        deviceConfig == null ? null : deviceConfig.getDeviceNo(),
+                        taskNo,
+                        stationId,
+                        targetStationId,
+                        calcPathCostMs,
+                        fillCommandCostMs,
+                        totalCostMs);
+            }
+            return builtCommand;
         }
         return stationCommand;
     }
@@ -408,18 +478,19 @@
                     commandResponse != null && Boolean.TRUE.equals(commandResponse.getResult()) ? 1 : 0,
                     JSON.toJSONString(commandResponse)
             );
-            optService.save(basStationOpt);
+            if (basStationOptAsyncPublisher == null || !basStationOptAsyncPublisher.publish(basStationOpt)) {
+                optService.save(basStationOpt);
+            }
         }
         return commandResponse;
     }
 
     private void logExecutorAbnormal(String scene, StationCommand command) {
-        if (!(executor instanceof ThreadPoolExecutor)) {
+        if (executor == null) {
             return;
         }
-        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
-        int activeCount = threadPoolExecutor.getActiveCount();
-        int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size();
+        int activeCount = executor.getActiveCount();
+        int queuedCount = executor.getQueue() == null ? 0 : executor.getQueue().size();
         if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) {
             return;
         }
@@ -429,35 +500,10 @@
                 command == null ? null : command.getTaskNo(),
                 command == null ? null : command.getStationId(),
                 command == null ? null : command.getTargetStaNo(),
-                threadPoolExecutor.getPoolSize(),
+                executor.getPoolSize(),
                 activeCount,
                 queuedCount,
-                threadPoolExecutor.getCompletedTaskCount());
-    }
-
-    @SuppressWarnings("unchecked")
-    private int resolveSegmentExecutorPoolSize(RedisUtil redisUtil) {
-        if (redisUtil == null) {
-            return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
-        }
-        try {
-            Object systemConfigMapObj = redisUtil.get(com.zy.core.enums.RedisKeyType.SYSTEM_CONFIG_MAP.key);
-            if (!(systemConfigMapObj instanceof HashMap)) {
-                return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
-            }
-            HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
-            String poolSizeText = systemConfigMap.get(CFG_SEGMENT_EXECUTOR_POOL_SIZE);
-            if (poolSizeText == null || poolSizeText.trim().isEmpty()) {
-                return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
-            }
-            int configured = Integer.parseInt(poolSizeText.trim());
-            if (configured < 16) {
-                return 16;
-            }
-            return Math.min(configured, 512);
-        } catch (Exception ignore) {
-            return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
-        }
+                executor.getCompletedTaskCount());
     }
 
     @Override

--
Gitblit v1.9.1