From 852664df1caf38831793b341edcada9dd7b6c22a Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期三, 06 五月 2026 19:28:33 +0800
Subject: [PATCH] #dfs
---
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 217 insertions(+), 19 deletions(-)
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
index 23dadc4..e0209e4 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -21,6 +21,8 @@
import com.zy.core.network.ZyStationConnectDriver;
import com.zy.core.network.entity.ZyStationStatusEntity;
import com.zy.core.service.StationTaskLoopService;
+import com.zy.core.task.BasStationOptAsyncPublisher;
+import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider;
import com.zy.core.thread.impl.v5.StationV5RunBlockReroutePlanner;
import com.zy.core.thread.impl.v5.StationV5SegmentExecutor;
import com.zy.core.thread.impl.v5.StationV5StatusReader;
@@ -34,31 +36,55 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread {
- private static final int SEGMENT_EXECUTOR_POOL_SIZE = 64;
+ private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
+ private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512;
+ private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20;
+ private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48;
+ private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L;
+ private static final long COMMAND_BUILD_WARN_MS = 500L;
+ private static final int QUEUE_DRAIN_BATCH_SIZE = 32;
+ private static final long QUEUE_IDLE_SLEEP_MS = 20L;
private DeviceConfig deviceConfig;
private RedisUtil redisUtil;
private ZyStationConnectDriver zyStationConnectDriver;
- private final ExecutorService executor = Executors.newFixedThreadPool(SEGMENT_EXECUTOR_POOL_SIZE);
+ private final ThreadPoolExecutor executor;
private StationV5SegmentExecutor segmentExecutor;
private final RecentStationArrivalTracker recentArrivalTracker;
private final StationV5StatusReader statusReader;
private final StationV5RunBlockReroutePlanner runBlockReroutePlanner;
+ private final int executorQueueCapacity;
+ private final BasStationOptAsyncPublisher basStationOptAsyncPublisher;
public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
this.redisUtil = redisUtil;
+ StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class);
+ int poolSize = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE : configProvider.getSegmentExecutorPoolSize();
+ this.executorQueueCapacity = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY : configProvider.getSegmentExecutorQueueCapacity();
+ this.executor = new ThreadPoolExecutor(
+ poolSize,
+ poolSize,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(executorQueueCapacity)
+ );
this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker);
this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil);
+ this.basStationOptAsyncPublisher = SpringUtils.getBean(BasStationOptAsyncPublisher.class);
+ log.info("鍒濆鍖朧5杈撻�佺嚎绋嬫睜锛宒eviceNo={}, poolSize={}, queueCapacity={}",
+ deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize, executorQueueCapacity);
}
@Override
@@ -81,8 +107,10 @@
Thread processThread = new Thread(() -> {
while (true) {
try {
- pollAndDispatchQueuedCommand();
- Thread.sleep(100);
+ int dispatchedCount = pollAndDispatchQueuedCommandBatch();
+ if (dispatchedCount <= 0) {
+ Thread.sleep(QUEUE_IDLE_SLEEP_MS);
+ }
} catch (Exception e) {
log.error("StationV5Process Fail", e);
}
@@ -126,19 +154,76 @@
return map;
}
- private void pollAndDispatchQueuedCommand() {
- Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
- if (task == null || task.getStep() == null || task.getStep() != 2) {
- return;
- }
- submitSegmentCommand((StationCommand) task.getData());
+ @Override
+ public List<Integer> getAllTaskNoList() {
+ return statusReader.getTaskNoList();
}
- private void submitSegmentCommand(StationCommand command) {
- if (command == null || executor == null || segmentExecutor == null) {
- return;
+ private int pollAndDispatchQueuedCommandBatch() {
+ int dispatchedCount = 0;
+ while (dispatchedCount < QUEUE_DRAIN_BATCH_SIZE) {
+ if (isExecutorQueueAtWatermark()) {
+ logExecutorAbnormal("executor-watermark", null);
+ break;
+ }
+ Task task = MessageQueue.peek(SlaveType.Devp, deviceConfig.getDeviceNo());
+ if (task == null || task.getStep() == null || task.getStep() != 2) {
+ break;
+ }
+ StationCommand command = (StationCommand) task.getData();
+ logExecutorAbnormal("queue-peek", command);
+ if (!submitSegmentCommand(command)) {
+ logExecutorAbnormal("submit-rejected", command);
+ break;
+ }
+ MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
+ dispatchedCount++;
}
- executor.submit(() -> segmentExecutor.execute(command));
+ return dispatchedCount;
+ }
+
+ private boolean submitSegmentCommand(StationCommand command) {
+ if (command == null || executor == null || segmentExecutor == null) {
+ return false;
+ }
+ try {
+ executor.execute(() -> {
+ long start = System.currentTimeMillis();
+ try {
+ segmentExecutor.execute(command);
+ } finally {
+ long costMs = System.currentTimeMillis() - start;
+ if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
+ log.warn("V5杈撻�佸懡浠ゅ垎娈垫墽琛岃�楁椂杩囬暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
+ deviceConfig.getDeviceNo(),
+ command.getTaskNo(),
+ command.getStationId(),
+ command.getTargetStaNo(),
+ costMs);
+ logExecutorAbnormal("segment-finish-slow", command);
+ }
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ log.error("V5杈撻�佺嚎绋嬫睜鎷掔粷鎵ц锛屼繚鐣欒澶囬槦鍒楃Н鍘嬶紝deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, activeCount={}, queuedCount={}, queueCapacity={}",
+ deviceConfig.getDeviceNo(),
+ command.getTaskNo(),
+ command.getStationId(),
+ command.getTargetStaNo(),
+ executor.getActiveCount(),
+ executor.getQueue() == null ? 0 : executor.getQueue().size(),
+ executorQueueCapacity);
+ return false;
+ }
+ logExecutorAbnormal("after-submit", command);
+ return true;
+ }
+
+ private boolean isExecutorQueueAtWatermark() {
+ if (executor == null || executor.getQueue() == null) {
+ return false;
+ }
+ return executor.getQueue().size() >= executorQueueCapacity;
}
@Override
@@ -170,8 +255,25 @@
stationCommand.setCommandType(commandType);
if (commandType == StationCommandType.MOVE && !stationId.equals(targetStationId)) {
+ long startNs = System.nanoTime();
+ long calcPathStartNs = startNs;
List<NavigateNode> nodes = calcPathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor);
- return fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId);
+ long calcPathCostMs = nanosToMillis(System.nanoTime() - calcPathStartNs);
+ long fillCommandStartNs = System.nanoTime();
+ StationCommand builtCommand = fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId);
+ long fillCommandCostMs = nanosToMillis(System.nanoTime() - fillCommandStartNs);
+ long totalCostMs = nanosToMillis(System.nanoTime() - startNs);
+ if (totalCostMs >= COMMAND_BUILD_WARN_MS) {
+ log.warn("V5杈撻�佸懡浠ょ敓鎴愯�楁椂杈冮暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, calcPath={}ms, fillCommand={}ms, total={}ms",
+ deviceConfig == null ? null : deviceConfig.getDeviceNo(),
+ taskNo,
+ stationId,
+ targetStationId,
+ calcPathCostMs,
+ fillCommandCostMs,
+ totalCostMs);
+ }
+ return builtCommand;
}
return stationCommand;
}
@@ -209,8 +311,16 @@
loopEvaluation.getLoopIdentity().getScopeType(),
loopEvaluation.getLoopIdentity().getLocalStationCount(),
loopEvaluation.getLoopIdentity().getSourceLoopStationCount());
+ log.info("杈撻�佺嚎鍫靛閲嶈鍒掑�欓�夎矾寰勮绠楀紑濮嬶紝taskNo={}, stationId={}, targetStationId={}, pathLenFactor={}",
+ taskNo, stationId, targetStationId, pathLenFactor);
List<List<NavigateNode>> candidatePathList = calcCandidatePathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor);
long candidatePathNs = System.nanoTime();
+ log.info("杈撻�佺嚎鍫靛閲嶈鍒掑�欓�夎矾寰勮绠楀畬鎴愶紝taskNo={}, stationId={}, targetStationId={}, candidatePathCount={}, costMs={}",
+ taskNo,
+ stationId,
+ targetStationId,
+ candidatePathList == null ? null : candidatePathList.size(),
+ nanosToMillis(candidatePathNs - loopEvalNs));
List<StationCommand> candidateCommandList = new ArrayList<>();
for (List<NavigateNode> candidatePath : candidatePathList) {
StationCommand rerouteCommand = buildMoveCommand(taskNo, stationId, targetStationId, palletSize, candidatePath);
@@ -220,6 +330,14 @@
candidateCommandList.add(rerouteCommand);
}
long buildCommandNs = System.nanoTime();
+ log.info("杈撻�佺嚎鍫靛閲嶈鍒掑�欓�夊懡浠ゆ瀯寤哄畬鎴愶紝taskNo={}, stationId={}, targetStationId={}, candidatePathCount={}, candidateCommandCount={}, costMs={}, firstCommandPath={}",
+ taskNo,
+ stationId,
+ targetStationId,
+ candidatePathList == null ? null : candidatePathList.size(),
+ candidateCommandList.size(),
+ nanosToMillis(buildCommandNs - candidatePathNs),
+ JSON.toJSONString(firstCommandPath(candidateCommandList)));
StationV5RunBlockReroutePlanner.PlanResult planResult = runBlockReroutePlanner.plan(
taskNo,
@@ -228,6 +346,14 @@
candidateCommandList
);
long planNs = System.nanoTime();
+ log.info("杈撻�佺嚎鍫靛閲嶈鍒抪lanner瀹屾垚锛宼askNo={}, stationId={}, targetStationId={}, planCount={}, selected={}, issuedRouteCount={}, costMs={}",
+ taskNo,
+ stationId,
+ targetStationId,
+ planResult == null ? null : planResult.getPlanCount(),
+ planResult != null && planResult.getCommand() != null,
+ planResult == null || planResult.getIssuedRoutePathList() == null ? null : planResult.getIssuedRoutePathList().size(),
+ nanosToMillis(planNs - buildCommandNs));
logRunBlockRerouteCost(taskNo,
stationId,
targetStationId,
@@ -247,7 +373,11 @@
StationCommand rerouteCommand = planResult.getCommand();
if (rerouteCommand != null) {
if (taskLoopService != null) {
+ log.info("杈撻�佺嚎鍫靛閲嶈鍒掕褰曠幆绾垮紑濮嬶紝taskNo={}, stationId={}, targetStationId={}",
+ taskNo, stationId, targetStationId);
taskLoopService.recordLoopIssue(loopEvaluation, "RUN_BLOCK_REROUTE");
+ log.info("杈撻�佺嚎鍫靛閲嶈鍒掕褰曠幆绾垮畬鎴愶紝taskNo={}, stationId={}, targetStationId={}",
+ taskNo, stationId, targetStationId);
}
log.info("杈撻�佺嚎鍫靛閲嶈鍒掗�変腑鍊欓�夎矾绾匡紝taskNo={}, planCount={}, stationId={}, targetStationId={}, route={}",
taskNo, planResult.getPlanCount(), stationId, targetStationId, JSON.toJSONString(rerouteCommand.getNavigatePath()));
@@ -263,9 +393,20 @@
return null;
}
+ private List<Integer> firstCommandPath(List<StationCommand> candidateCommandList) {
+ if (candidateCommandList == null || candidateCommandList.isEmpty()) {
+ return new ArrayList<>();
+ }
+ StationCommand firstCommand = candidateCommandList.get(0);
+ if (firstCommand == null || firstCommand.getNavigatePath() == null) {
+ return new ArrayList<>();
+ }
+ return firstCommand.getNavigatePath();
+ }
+
@Override
public boolean clearPath(Integer taskNo) {
- if (taskNo == null) {
+ if (taskNo == null || taskNo <= 0) {
return false;
}
if (zyStationConnectDriver == null) {
@@ -306,6 +447,40 @@
}
@Override
+ public boolean clearPathByStationSlot(Integer stationId, Integer slotIdx) {
+ if (stationId == null || slotIdx == null || zyStationConnectDriver == null) {
+ return false;
+ }
+ List<StationProtocol> status = getStatus();
+ if (status == null || status.isEmpty()) {
+ return false;
+ }
+
+ for (StationProtocol stationProtocol : status) {
+ if (stationProtocol == null || !Objects.equals(stationId, stationProtocol.getStationId())) {
+ continue;
+ }
+ if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, slotIdx)) {
+ log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬寜绔欑偣妲戒綅娓呯悊澶辫触銆俿tationId={}, slotIdx={}", stationId, slotIdx);
+ return false;
+ }
+ List<StationTaskBufferItem> taskBufferItems = stationProtocol.getTaskBufferItems();
+ if (taskBufferItems != null) {
+ for (StationTaskBufferItem item : taskBufferItems) {
+ if (item != null && Objects.equals(slotIdx, item.getSlotIdx())) {
+ item.setTaskNo(0);
+ item.setTargetStaNo(0);
+ break;
+ }
+ }
+ }
+ log.warn("杈撻�佺珯缂撳瓨鍖烘畫鐣欒矾寰勬寜绔欑偣妲戒綅娓呯悊鎴愬姛銆俿tationId={}, slotIdx={}", stationId, slotIdx);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
public CommandResponse sendCommand(StationCommand command) {
CommandResponse commandResponse = null;
try {
@@ -342,11 +517,34 @@
commandResponse != null && Boolean.TRUE.equals(commandResponse.getResult()) ? 1 : 0,
JSON.toJSONString(commandResponse)
);
- optService.save(basStationOpt);
+ if (basStationOptAsyncPublisher == null || !basStationOptAsyncPublisher.publish(basStationOpt)) {
+ optService.save(basStationOpt);
+ }
}
return commandResponse;
}
+ private void logExecutorAbnormal(String scene, StationCommand command) {
+ if (executor == null) {
+ return;
+ }
+ int activeCount = executor.getActiveCount();
+ int queuedCount = executor.getQueue() == null ? 0 : executor.getQueue().size();
+ if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) {
+ return;
+ }
+ log.warn("V5杈撻�佺嚎绋嬫睜鍑虹幇鍫嗙Н锛宻cene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}",
+ scene,
+ deviceConfig.getDeviceNo(),
+ command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getStationId(),
+ command == null ? null : command.getTargetStaNo(),
+ executor.getPoolSize(),
+ activeCount,
+ queuedCount,
+ executor.getCompletedTaskCount());
+ }
+
@Override
public CommandResponse sendOriginCommand(String address, short[] data) {
return zyStationConnectDriver.sendOriginCommand(address, data);
--
Gitblit v1.9.1