package com.zy.core.thread.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.core.common.SpringUtils; import com.zy.asrs.entity.BasStationOpt; import com.zy.asrs.entity.DeviceConfig; import com.zy.asrs.service.BasStationOptService; import com.zy.common.model.NavigateNode; import com.zy.common.utils.NavigateUtils; import com.zy.common.utils.RedisUtil; import com.zy.core.cache.MessageQueue; import com.zy.core.enums.SlaveType; import com.zy.core.enums.StationCommandType; import com.zy.core.model.CommandResponse; import com.zy.core.model.Task; import com.zy.core.model.command.StationCommand; import com.zy.core.model.protocol.StationProtocol; import com.zy.core.model.protocol.StationTaskBufferItem; import com.zy.core.network.DeviceConnectPool; import com.zy.core.network.ZyStationConnectDriver; import com.zy.core.network.entity.ZyStationStatusEntity; import com.zy.core.service.StationTaskLoopService; import com.zy.core.task.BasStationOptAsyncPublisher; import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider; import com.zy.core.thread.impl.v5.StationV5RunBlockReroutePlanner; import com.zy.core.thread.impl.v5.StationV5SegmentExecutor; import com.zy.core.thread.impl.v5.StationV5StatusReader; import com.zy.core.thread.support.RecentStationArrivalTracker; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Data @Slf4j public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread { private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128; private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512; private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20; private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48; private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L; private static final int QUEUE_DRAIN_BATCH_SIZE = 32; private static final long QUEUE_IDLE_SLEEP_MS = 20L; private DeviceConfig deviceConfig; private RedisUtil redisUtil; private ZyStationConnectDriver zyStationConnectDriver; private final ThreadPoolExecutor executor; private StationV5SegmentExecutor segmentExecutor; private final RecentStationArrivalTracker recentArrivalTracker; private final StationV5StatusReader statusReader; private final StationV5RunBlockReroutePlanner runBlockReroutePlanner; private final int executorQueueCapacity; private final BasStationOptAsyncPublisher basStationOptAsyncPublisher; public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) { this.deviceConfig = deviceConfig; this.redisUtil = redisUtil; StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class); int poolSize = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE : configProvider.getSegmentExecutorPoolSize(); this.executorQueueCapacity = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY : configProvider.getSegmentExecutorQueueCapacity(); this.executor = new ThreadPoolExecutor( poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(executorQueueCapacity) ); this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil); this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand); this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker); this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil); this.basStationOptAsyncPublisher = SpringUtils.getBean(BasStationOptAsyncPublisher.class); log.info("初始化V5输送线程池,deviceNo={}, poolSize={}, queueCapacity={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize, executorQueueCapacity); } @Override @SuppressWarnings("InfiniteLoopStatement") public void run() { this.connect(); Thread readThread = new Thread(() -> { while (true) { try { statusReader.readStatus(zyStationConnectDriver); Thread.sleep(100); } catch (Exception e) { log.error("StationV5Thread Fail", e); } } }, "DevpRead-" + deviceConfig.getDeviceNo()); readThread.start(); Thread processThread = new Thread(() -> { while (true) { try { int dispatchedCount = pollAndDispatchQueuedCommandBatch(); if (dispatchedCount <= 0) { Thread.sleep(QUEUE_IDLE_SLEEP_MS); } } catch (Exception e) { log.error("StationV5Process Fail", e); } } }, "DevpProcess-" + deviceConfig.getDeviceNo()); processThread.start(); } @Override public boolean connect() { zyStationConnectDriver = new ZyStationConnectDriver(deviceConfig, redisUtil); zyStationConnectDriver.start(); DeviceConnectPool.put(SlaveType.Devp, deviceConfig.getDeviceNo(), zyStationConnectDriver); return true; } @Override public void close() { if (zyStationConnectDriver != null) { zyStationConnectDriver.close(); } if (executor != null) { try { executor.shutdownNow(); } catch (Exception ignore) { } } } @Override public List getStatus() { return statusReader.getStatusList(); } @Override public Map getStatusMap() { Map map = new HashMap<>(); for (StationProtocol stationProtocol : statusReader.getStatusList()) { map.put(stationProtocol.getStationId(), stationProtocol); } return map; } @Override public List getAllTaskNoList() { return statusReader.getTaskNoList(); } private int pollAndDispatchQueuedCommandBatch() { int dispatchedCount = 0; while (dispatchedCount < QUEUE_DRAIN_BATCH_SIZE) { if (isExecutorQueueAtWatermark()) { logExecutorAbnormal("executor-watermark", null); break; } Task task = MessageQueue.peek(SlaveType.Devp, deviceConfig.getDeviceNo()); if (task == null || task.getStep() == null || task.getStep() != 2) { break; } StationCommand command = (StationCommand) task.getData(); logExecutorAbnormal("queue-peek", command); if (!submitSegmentCommand(command)) { logExecutorAbnormal("submit-rejected", command); break; } MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo()); dispatchedCount++; } return dispatchedCount; } private boolean submitSegmentCommand(StationCommand command) { if (command == null || executor == null || segmentExecutor == null) { return false; } try { executor.execute(() -> { long start = System.currentTimeMillis(); try { segmentExecutor.execute(command); } finally { long costMs = System.currentTimeMillis() - start; if (costMs >= SEGMENT_EXECUTE_WARN_MS) { log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}", deviceConfig.getDeviceNo(), command.getTaskNo(), command.getStationId(), command.getTargetStaNo(), costMs); logExecutorAbnormal("segment-finish-slow", command); } } }); } catch (RejectedExecutionException e) { log.error("V5输送线程池拒绝执行,保留设备队列积压,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, activeCount={}, queuedCount={}, queueCapacity={}", deviceConfig.getDeviceNo(), command.getTaskNo(), command.getStationId(), command.getTargetStaNo(), executor.getActiveCount(), executor.getQueue() == null ? 0 : executor.getQueue().size(), executorQueueCapacity); return false; } logExecutorAbnormal("after-submit", command); return true; } private boolean isExecutorQueueAtWatermark() { if (executor == null || executor.getQueue() == null) { return false; } return executor.getQueue().size() >= executorQueueCapacity; } @Override public boolean hasRecentArrival(Integer stationId, Integer taskNo) { return recentArrivalTracker.hasRecentArrival(stationId, taskNo); } @Override public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) { return getCommand(commandType, taskNo, stationId, targetStationId, palletSize, null); } @Override public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize, Double pathLenFactor) { StationCommand stationCommand = new StationCommand(); stationCommand.setTaskNo(taskNo); stationCommand.setStationId(stationId); stationCommand.setTargetStaNo(targetStationId); stationCommand.setPalletSize(palletSize); stationCommand.setCommandType(commandType); if (commandType == StationCommandType.MOVE && !stationId.equals(targetStationId)) { List nodes = calcPathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor); return fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId); } return stationCommand; } @Override public StationCommand getRunBlockRerouteCommand(Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) { return getRunBlockRerouteCommand(taskNo, stationId, targetStationId, palletSize, null); } @Override public StationCommand getRunBlockRerouteCommand(Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize, Double pathLenFactor) { if (taskNo == null || taskNo <= 0 || stationId == null || targetStationId == null) { return null; } if (Objects.equals(stationId, targetStationId)) { return getCommand(StationCommandType.MOVE, taskNo, stationId, targetStationId, palletSize, pathLenFactor); } long startNs = System.nanoTime(); StationTaskLoopService taskLoopService = loadStationTaskLoopService(); StationTaskLoopService.LoopEvaluation loopEvaluation = taskLoopService == null ? new StationTaskLoopService.LoopEvaluation(taskNo, stationId, StationTaskLoopService.LoopIdentitySnapshot.empty(), 0, 0, false) : taskLoopService.evaluateLoop(taskNo, stationId, true); long loopEvalNs = System.nanoTime(); log.info("输送线堵塞重规划环线识别,taskNo={}, stationId={}, scopeType={}, localStationCount={}, sourceLoopStationCount={}", taskNo, stationId, loopEvaluation.getLoopIdentity().getScopeType(), loopEvaluation.getLoopIdentity().getLocalStationCount(), loopEvaluation.getLoopIdentity().getSourceLoopStationCount()); List> candidatePathList = calcCandidatePathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor); long candidatePathNs = System.nanoTime(); List candidateCommandList = new ArrayList<>(); for (List candidatePath : candidatePathList) { StationCommand rerouteCommand = buildMoveCommand(taskNo, stationId, targetStationId, palletSize, candidatePath); if (rerouteCommand == null || rerouteCommand.getNavigatePath() == null || rerouteCommand.getNavigatePath().isEmpty()) { continue; } candidateCommandList.add(rerouteCommand); } long buildCommandNs = System.nanoTime(); StationV5RunBlockReroutePlanner.PlanResult planResult = runBlockReroutePlanner.plan( taskNo, stationId, loopEvaluation, candidateCommandList ); long planNs = System.nanoTime(); logRunBlockRerouteCost(taskNo, stationId, targetStationId, candidatePathList == null ? 0 : candidatePathList.size(), candidateCommandList.size(), startNs, loopEvalNs, candidatePathNs, buildCommandNs, planNs); if (candidateCommandList.isEmpty()) { log.warn("输送线堵塞重规划失败,候选路径为空,taskNo={}, planCount={}, stationId={}, targetStationId={}", taskNo, planResult.getPlanCount(), stationId, targetStationId); return null; } StationCommand rerouteCommand = planResult.getCommand(); if (rerouteCommand != null) { if (taskLoopService != null) { taskLoopService.recordLoopIssue(loopEvaluation, "RUN_BLOCK_REROUTE"); } log.info("输送线堵塞重规划选中候选路线,taskNo={}, planCount={}, stationId={}, targetStationId={}, route={}", taskNo, planResult.getPlanCount(), stationId, targetStationId, JSON.toJSONString(rerouteCommand.getNavigatePath())); return rerouteCommand; } log.warn("输送线堵塞重规划未找到可下发路线,taskNo={}, planCount={}, stationId={}, targetStationId={}, triedRoutes={}", taskNo, planResult.getPlanCount(), stationId, targetStationId, JSON.toJSONString(planResult.getIssuedRoutePathList())); return null; } @Override public boolean clearPath(Integer taskNo) { if (taskNo == null || taskNo <= 0) { return false; } if (zyStationConnectDriver == null) { return false; } List status = getStatus(); if (status == null || status.isEmpty()) { return false; } boolean found = false; boolean success = true; for (StationProtocol stationProtocol : status) { List taskBufferItems = stationProtocol == null ? null : stationProtocol.getTaskBufferItems(); if (taskBufferItems == null || taskBufferItems.isEmpty()) { continue; } Integer stationId = stationProtocol.getStationId(); for (StationTaskBufferItem item : taskBufferItems) { if (item == null || !Objects.equals(taskNo, item.getTaskNo())) { continue; } found = true; Integer clearedTaskNo = item.getTaskNo(); if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, item.getSlotIdx())) { success = false; log.warn("输送站缓存区残留路径清理失败。stationId={}, slotIdx={}, taskNo={}", stationId, item.getSlotIdx(), clearedTaskNo); continue; } item.setTaskNo(0); item.setTargetStaNo(0); log.warn("输送站缓存区残留路径清理成功。stationId={}, slotIdx={}, taskNo={}", stationId, item.getSlotIdx(), clearedTaskNo); } } return found && success; } @Override public boolean clearPathByStationSlot(Integer stationId, Integer slotIdx) { if (stationId == null || slotIdx == null || zyStationConnectDriver == null) { return false; } List status = getStatus(); if (status == null || status.isEmpty()) { return false; } for (StationProtocol stationProtocol : status) { if (stationProtocol == null || !Objects.equals(stationId, stationProtocol.getStationId())) { continue; } if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, slotIdx)) { log.warn("输送站缓存区残留路径按站点槽位清理失败。stationId={}, slotIdx={}", stationId, slotIdx); return false; } List taskBufferItems = stationProtocol.getTaskBufferItems(); if (taskBufferItems != null) { for (StationTaskBufferItem item : taskBufferItems) { if (item != null && Objects.equals(slotIdx, item.getSlotIdx())) { item.setTaskNo(0); item.setTargetStaNo(0); break; } } } log.warn("输送站缓存区残留路径按站点槽位清理成功。stationId={}, slotIdx={}", stationId, slotIdx); return true; } return false; } @Override public CommandResponse sendCommand(StationCommand command) { CommandResponse commandResponse = null; try { commandResponse = zyStationConnectDriver.sendCommand(command); } catch (Exception e) { e.printStackTrace(); } finally { BasStationOptService optService = SpringUtils.getBean(BasStationOptService.class); if (optService == null) { return commandResponse; } List statusListEntity = zyStationConnectDriver.getStatus(); ZyStationStatusEntity matched = null; if (statusListEntity != null) { for (ZyStationStatusEntity entity : statusListEntity) { if (entity.getStationId() != null && entity.getStationId().equals(command.getStationId())) { matched = entity; break; } } } BasStationOpt basStationOpt = new BasStationOpt( command.getTaskNo(), command.getStationId(), new Date(), String.valueOf(command.getCommandType()), command.getStationId(), command.getTargetStaNo(), null, null, null, JSON.toJSONString(command), JSON.toJSONString(matched), commandResponse != null && Boolean.TRUE.equals(commandResponse.getResult()) ? 1 : 0, JSON.toJSONString(commandResponse) ); if (basStationOptAsyncPublisher == null || !basStationOptAsyncPublisher.publish(basStationOpt)) { optService.save(basStationOpt); } } return commandResponse; } private void logExecutorAbnormal(String scene, StationCommand command) { if (executor == null) { return; } int activeCount = executor.getActiveCount(); int queuedCount = executor.getQueue() == null ? 0 : executor.getQueue().size(); if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) { return; } log.warn("V5输送线程池出现堆积,scene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}", scene, deviceConfig.getDeviceNo(), command == null ? null : command.getTaskNo(), command == null ? null : command.getStationId(), command == null ? null : command.getTargetStaNo(), executor.getPoolSize(), activeCount, queuedCount, executor.getCompletedTaskCount()); } @Override public CommandResponse sendOriginCommand(String address, short[] data) { return zyStationConnectDriver.sendOriginCommand(address, data); } @Override public byte[] readOriginCommand(String address, int length) { return zyStationConnectDriver.readOriginCommand(address, length); } private List calcPathNavigateNodes(Integer taskNo, Integer startStationId, Integer targetStationId, Double pathLenFactor) { NavigateUtils navigateUtils = SpringUtils.getBean(NavigateUtils.class); if (navigateUtils == null) { return new ArrayList<>(); } return navigateUtils.calcOptimalPathByStationId(startStationId, targetStationId, taskNo, pathLenFactor); } private List> calcCandidatePathNavigateNodes(Integer taskNo, Integer startStationId, Integer targetStationId, Double pathLenFactor) { NavigateUtils navigateUtils = SpringUtils.getBean(NavigateUtils.class); if (navigateUtils == null) { return new ArrayList<>(); } return navigateUtils.calcCandidatePathByStationId(startStationId, targetStationId, taskNo, pathLenFactor); } private StationCommand buildMoveCommand(Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize, List nodes) { StationCommand stationCommand = new StationCommand(); stationCommand.setTaskNo(taskNo); stationCommand.setStationId(stationId); stationCommand.setTargetStaNo(targetStationId); stationCommand.setPalletSize(palletSize); stationCommand.setCommandType(StationCommandType.MOVE); return fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId); } private StationCommand fillMoveCommandPath(StationCommand stationCommand, List nodes, Integer taskNo, Integer stationId, Integer targetStationId) { List path = new ArrayList<>(); List liftTransferPath = new ArrayList<>(); for (NavigateNode node : nodes) { JSONObject valueObject; try { valueObject = JSONObject.parseObject(node.getNodeValue()); } catch (Exception ignore) { continue; } if (valueObject == null) { continue; } Integer stationNo = valueObject.getInteger("stationId"); if (stationNo == null) { continue; } path.add(stationNo); if (Boolean.TRUE.equals(node.getIsLiftTransferPoint())) { liftTransferPath.add(stationNo); } } if (path.isEmpty()) { log.warn("输送线命令生成失败,路径为空,taskNo={}, stationId={}, targetStationId={}", taskNo, stationId, targetStationId); return null; } stationCommand.setNavigatePath(path); stationCommand.setLiftTransferPath(liftTransferPath); stationCommand.setTargetStaNo(path.get(path.size() - 1)); return stationCommand; } private StationTaskLoopService loadStationTaskLoopService() { try { return SpringUtils.getBean(StationTaskLoopService.class); } catch (Exception ignore) { return null; } } private void logRunBlockRerouteCost(Integer taskNo, Integer stationId, Integer targetStationId, int candidatePathCount, int candidateCommandCount, long startNs, long loopEvalNs, long candidatePathNs, long buildCommandNs, long planNs) { long totalMs = nanosToMillis(planNs - startNs); if (totalMs < 1000L) { return; } log.warn("输送线堵塞重规划耗时较长, taskNo={}, stationId={}, targetStationId={}, total={}ms, loopEval={}ms, candidatePath={}ms, buildCommand={}ms, planner={}ms, candidatePathCount={}, candidateCommandCount={}", taskNo, stationId, targetStationId, totalMs, nanosToMillis(loopEvalNs - startNs), nanosToMillis(candidatePathNs - loopEvalNs), nanosToMillis(buildCommandNs - candidatePathNs), nanosToMillis(planNs - buildCommandNs), candidatePathCount, candidateCommandCount); } private long nanosToMillis(long nanos) { return nanos <= 0L ? 0L : nanos / 1_000_000L; } }