package com.zy.core.thread.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.core.common.SpringUtils; import com.zy.asrs.entity.BasStationOpt; import com.zy.asrs.entity.DeviceConfig; import com.zy.asrs.service.BasStationOptService; import com.zy.common.model.NavigateNode; import com.zy.common.utils.NavigateUtils; import com.zy.common.utils.RedisUtil; import com.zy.core.cache.MessageQueue; import com.zy.core.enums.SlaveType; import com.zy.core.enums.StationCommandType; import com.zy.core.model.CommandResponse; import com.zy.core.model.Task; import com.zy.core.model.command.StationCommand; import com.zy.core.model.protocol.StationProtocol; import com.zy.core.model.protocol.StationTaskBufferItem; import com.zy.core.network.DeviceConnectPool; import com.zy.core.network.ZyStationConnectDriver; import com.zy.core.network.entity.ZyStationStatusEntity; import com.zy.core.service.StationTaskLoopService; import com.zy.core.thread.impl.v5.StationV5RunBlockReroutePlanner; import com.zy.core.thread.impl.v5.StationV5SegmentExecutor; import com.zy.core.thread.impl.v5.StationV5StatusReader; import com.zy.core.thread.support.RecentStationArrivalTracker; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @Data @Slf4j public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread { private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128; private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize"; private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20; private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48; private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L; private DeviceConfig deviceConfig; private RedisUtil redisUtil; private ZyStationConnectDriver zyStationConnectDriver; private final ExecutorService executor; private StationV5SegmentExecutor segmentExecutor; private final RecentStationArrivalTracker recentArrivalTracker; private final StationV5StatusReader statusReader; private final StationV5RunBlockReroutePlanner runBlockReroutePlanner; public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) { this.deviceConfig = deviceConfig; this.redisUtil = redisUtil; int poolSize = resolveSegmentExecutorPoolSize(redisUtil); this.executor = Executors.newFixedThreadPool(poolSize); this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil); this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand); this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker); this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil); log.info("初始化V5输送线程池,deviceNo={}, poolSize={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize); } @Override @SuppressWarnings("InfiniteLoopStatement") public void run() { this.connect(); Thread readThread = new Thread(() -> { while (true) { try { statusReader.readStatus(zyStationConnectDriver); Thread.sleep(100); } catch (Exception e) { log.error("StationV5Thread Fail", e); } } }, "DevpRead-" + deviceConfig.getDeviceNo()); readThread.start(); Thread processThread = new Thread(() -> { while (true) { try { pollAndDispatchQueuedCommand(); Thread.sleep(100); } catch (Exception e) { log.error("StationV5Process Fail", e); } } }, "DevpProcess-" + deviceConfig.getDeviceNo()); processThread.start(); } @Override public boolean connect() { zyStationConnectDriver = new ZyStationConnectDriver(deviceConfig, redisUtil); zyStationConnectDriver.start(); DeviceConnectPool.put(SlaveType.Devp, deviceConfig.getDeviceNo(), zyStationConnectDriver); return true; } @Override public void close() { if (zyStationConnectDriver != null) { zyStationConnectDriver.close(); } if (executor != null) { try { executor.shutdownNow(); } catch (Exception ignore) { } } } @Override public List getStatus() { return statusReader.getStatusList(); } @Override public Map getStatusMap() { Map map = new HashMap<>(); for (StationProtocol stationProtocol : statusReader.getStatusList()) { map.put(stationProtocol.getStationId(), stationProtocol); } return map; } @Override public List getAllTaskNoList() { return statusReader.getTaskNoList(); } private void pollAndDispatchQueuedCommand() { Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo()); if (task == null || task.getStep() == null || task.getStep() != 2) { return; } StationCommand command = (StationCommand) task.getData(); logExecutorAbnormal("queue-poll", command); submitSegmentCommand(command); } private void submitSegmentCommand(StationCommand command) { if (command == null || executor == null || segmentExecutor == null) { return; } executor.submit(() -> { long start = System.currentTimeMillis(); try { segmentExecutor.execute(command); } finally { long costMs = System.currentTimeMillis() - start; if (costMs >= SEGMENT_EXECUTE_WARN_MS) { log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}", deviceConfig.getDeviceNo(), command.getTaskNo(), command.getStationId(), command.getTargetStaNo(), costMs); logExecutorAbnormal("segment-finish-slow", command); } } }); logExecutorAbnormal("after-submit", command); } @Override public boolean hasRecentArrival(Integer stationId, Integer taskNo) { return recentArrivalTracker.hasRecentArrival(stationId, taskNo); } @Override public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) { return getCommand(commandType, taskNo, stationId, targetStationId, palletSize, null); } @Override public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize, Double pathLenFactor) { StationCommand stationCommand = new StationCommand(); stationCommand.setTaskNo(taskNo); stationCommand.setStationId(stationId); stationCommand.setTargetStaNo(targetStationId); stationCommand.setPalletSize(palletSize); stationCommand.setCommandType(commandType); if (commandType == StationCommandType.MOVE && !stationId.equals(targetStationId)) { List nodes = calcPathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor); return fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId); } return stationCommand; } @Override public StationCommand getRunBlockRerouteCommand(Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) { return getRunBlockRerouteCommand(taskNo, stationId, targetStationId, palletSize, null); } @Override public StationCommand getRunBlockRerouteCommand(Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize, Double pathLenFactor) { if (taskNo == null || taskNo <= 0 || stationId == null || targetStationId == null) { return null; } if (Objects.equals(stationId, targetStationId)) { return getCommand(StationCommandType.MOVE, taskNo, stationId, targetStationId, palletSize, pathLenFactor); } long startNs = System.nanoTime(); StationTaskLoopService taskLoopService = loadStationTaskLoopService(); StationTaskLoopService.LoopEvaluation loopEvaluation = taskLoopService == null ? new StationTaskLoopService.LoopEvaluation(taskNo, stationId, StationTaskLoopService.LoopIdentitySnapshot.empty(), 0, 0, false) : taskLoopService.evaluateLoop(taskNo, stationId, true); long loopEvalNs = System.nanoTime(); log.info("输送线堵塞重规划环线识别,taskNo={}, stationId={}, scopeType={}, localStationCount={}, sourceLoopStationCount={}", taskNo, stationId, loopEvaluation.getLoopIdentity().getScopeType(), loopEvaluation.getLoopIdentity().getLocalStationCount(), loopEvaluation.getLoopIdentity().getSourceLoopStationCount()); List> candidatePathList = calcCandidatePathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor); long candidatePathNs = System.nanoTime(); List candidateCommandList = new ArrayList<>(); for (List candidatePath : candidatePathList) { StationCommand rerouteCommand = buildMoveCommand(taskNo, stationId, targetStationId, palletSize, candidatePath); if (rerouteCommand == null || rerouteCommand.getNavigatePath() == null || rerouteCommand.getNavigatePath().isEmpty()) { continue; } candidateCommandList.add(rerouteCommand); } long buildCommandNs = System.nanoTime(); StationV5RunBlockReroutePlanner.PlanResult planResult = runBlockReroutePlanner.plan( taskNo, stationId, loopEvaluation, candidateCommandList ); long planNs = System.nanoTime(); logRunBlockRerouteCost(taskNo, stationId, targetStationId, candidatePathList == null ? 0 : candidatePathList.size(), candidateCommandList.size(), startNs, loopEvalNs, candidatePathNs, buildCommandNs, planNs); if (candidateCommandList.isEmpty()) { log.warn("输送线堵塞重规划失败,候选路径为空,taskNo={}, planCount={}, stationId={}, targetStationId={}", taskNo, planResult.getPlanCount(), stationId, targetStationId); return null; } StationCommand rerouteCommand = planResult.getCommand(); if (rerouteCommand != null) { if (taskLoopService != null) { taskLoopService.recordLoopIssue(loopEvaluation, "RUN_BLOCK_REROUTE"); } log.info("输送线堵塞重规划选中候选路线,taskNo={}, planCount={}, stationId={}, targetStationId={}, route={}", taskNo, planResult.getPlanCount(), stationId, targetStationId, JSON.toJSONString(rerouteCommand.getNavigatePath())); return rerouteCommand; } log.warn("输送线堵塞重规划未找到可下发路线,taskNo={}, planCount={}, stationId={}, targetStationId={}, triedRoutes={}", taskNo, planResult.getPlanCount(), stationId, targetStationId, JSON.toJSONString(planResult.getIssuedRoutePathList())); return null; } @Override public boolean clearPath(Integer taskNo) { if (taskNo == null || taskNo <= 0) { return false; } if (zyStationConnectDriver == null) { return false; } List status = getStatus(); if (status == null || status.isEmpty()) { return false; } boolean found = false; boolean success = true; for (StationProtocol stationProtocol : status) { List taskBufferItems = stationProtocol == null ? null : stationProtocol.getTaskBufferItems(); if (taskBufferItems == null || taskBufferItems.isEmpty()) { continue; } Integer stationId = stationProtocol.getStationId(); for (StationTaskBufferItem item : taskBufferItems) { if (item == null || !Objects.equals(taskNo, item.getTaskNo())) { continue; } found = true; Integer clearedTaskNo = item.getTaskNo(); if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, item.getSlotIdx())) { success = false; log.warn("输送站缓存区残留路径清理失败。stationId={}, slotIdx={}, taskNo={}", stationId, item.getSlotIdx(), clearedTaskNo); continue; } item.setTaskNo(0); item.setTargetStaNo(0); log.warn("输送站缓存区残留路径清理成功。stationId={}, slotIdx={}, taskNo={}", stationId, item.getSlotIdx(), clearedTaskNo); } } return found && success; } @Override public boolean clearPathByStationSlot(Integer stationId, Integer slotIdx) { if (stationId == null || slotIdx == null || zyStationConnectDriver == null) { return false; } List status = getStatus(); if (status == null || status.isEmpty()) { return false; } for (StationProtocol stationProtocol : status) { if (stationProtocol == null || !Objects.equals(stationId, stationProtocol.getStationId())) { continue; } if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, slotIdx)) { log.warn("输送站缓存区残留路径按站点槽位清理失败。stationId={}, slotIdx={}", stationId, slotIdx); return false; } List taskBufferItems = stationProtocol.getTaskBufferItems(); if (taskBufferItems != null) { for (StationTaskBufferItem item : taskBufferItems) { if (item != null && Objects.equals(slotIdx, item.getSlotIdx())) { item.setTaskNo(0); item.setTargetStaNo(0); break; } } } log.warn("输送站缓存区残留路径按站点槽位清理成功。stationId={}, slotIdx={}", stationId, slotIdx); return true; } return false; } @Override public CommandResponse sendCommand(StationCommand command) { CommandResponse commandResponse = null; try { commandResponse = zyStationConnectDriver.sendCommand(command); } catch (Exception e) { e.printStackTrace(); } finally { BasStationOptService optService = SpringUtils.getBean(BasStationOptService.class); if (optService == null) { return commandResponse; } List statusListEntity = zyStationConnectDriver.getStatus(); ZyStationStatusEntity matched = null; if (statusListEntity != null) { for (ZyStationStatusEntity entity : statusListEntity) { if (entity.getStationId() != null && entity.getStationId().equals(command.getStationId())) { matched = entity; break; } } } BasStationOpt basStationOpt = new BasStationOpt( command.getTaskNo(), command.getStationId(), new Date(), String.valueOf(command.getCommandType()), command.getStationId(), command.getTargetStaNo(), null, null, null, JSON.toJSONString(command), JSON.toJSONString(matched), commandResponse != null && Boolean.TRUE.equals(commandResponse.getResult()) ? 1 : 0, JSON.toJSONString(commandResponse) ); optService.save(basStationOpt); } return commandResponse; } private void logExecutorAbnormal(String scene, StationCommand command) { if (!(executor instanceof ThreadPoolExecutor)) { return; } ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; int activeCount = threadPoolExecutor.getActiveCount(); int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size(); if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) { return; } log.warn("V5输送线程池出现堆积,scene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}", scene, deviceConfig.getDeviceNo(), command == null ? null : command.getTaskNo(), command == null ? null : command.getStationId(), command == null ? null : command.getTargetStaNo(), threadPoolExecutor.getPoolSize(), activeCount, queuedCount, threadPoolExecutor.getCompletedTaskCount()); } @SuppressWarnings("unchecked") private int resolveSegmentExecutorPoolSize(RedisUtil redisUtil) { if (redisUtil == null) { return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; } try { Object systemConfigMapObj = redisUtil.get(com.zy.core.enums.RedisKeyType.SYSTEM_CONFIG_MAP.key); if (!(systemConfigMapObj instanceof HashMap)) { return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; } HashMap systemConfigMap = (HashMap) systemConfigMapObj; String poolSizeText = systemConfigMap.get(CFG_SEGMENT_EXECUTOR_POOL_SIZE); if (poolSizeText == null || poolSizeText.trim().isEmpty()) { return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; } int configured = Integer.parseInt(poolSizeText.trim()); if (configured < 16) { return 16; } return Math.min(configured, 512); } catch (Exception ignore) { return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; } } @Override public CommandResponse sendOriginCommand(String address, short[] data) { return zyStationConnectDriver.sendOriginCommand(address, data); } @Override public byte[] readOriginCommand(String address, int length) { return zyStationConnectDriver.readOriginCommand(address, length); } private List calcPathNavigateNodes(Integer taskNo, Integer startStationId, Integer targetStationId, Double pathLenFactor) { NavigateUtils navigateUtils = SpringUtils.getBean(NavigateUtils.class); if (navigateUtils == null) { return new ArrayList<>(); } return navigateUtils.calcOptimalPathByStationId(startStationId, targetStationId, taskNo, pathLenFactor); } private List> calcCandidatePathNavigateNodes(Integer taskNo, Integer startStationId, Integer targetStationId, Double pathLenFactor) { NavigateUtils navigateUtils = SpringUtils.getBean(NavigateUtils.class); if (navigateUtils == null) { return new ArrayList<>(); } return navigateUtils.calcCandidatePathByStationId(startStationId, targetStationId, taskNo, pathLenFactor); } private StationCommand buildMoveCommand(Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize, List nodes) { StationCommand stationCommand = new StationCommand(); stationCommand.setTaskNo(taskNo); stationCommand.setStationId(stationId); stationCommand.setTargetStaNo(targetStationId); stationCommand.setPalletSize(palletSize); stationCommand.setCommandType(StationCommandType.MOVE); return fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId); } private StationCommand fillMoveCommandPath(StationCommand stationCommand, List nodes, Integer taskNo, Integer stationId, Integer targetStationId) { List path = new ArrayList<>(); List liftTransferPath = new ArrayList<>(); for (NavigateNode node : nodes) { JSONObject valueObject; try { valueObject = JSONObject.parseObject(node.getNodeValue()); } catch (Exception ignore) { continue; } if (valueObject == null) { continue; } Integer stationNo = valueObject.getInteger("stationId"); if (stationNo == null) { continue; } path.add(stationNo); if (Boolean.TRUE.equals(node.getIsLiftTransferPoint())) { liftTransferPath.add(stationNo); } } if (path.isEmpty()) { log.warn("输送线命令生成失败,路径为空,taskNo={}, stationId={}, targetStationId={}", taskNo, stationId, targetStationId); return null; } stationCommand.setNavigatePath(path); stationCommand.setLiftTransferPath(liftTransferPath); stationCommand.setTargetStaNo(path.get(path.size() - 1)); return stationCommand; } private StationTaskLoopService loadStationTaskLoopService() { try { return SpringUtils.getBean(StationTaskLoopService.class); } catch (Exception ignore) { return null; } } private void logRunBlockRerouteCost(Integer taskNo, Integer stationId, Integer targetStationId, int candidatePathCount, int candidateCommandCount, long startNs, long loopEvalNs, long candidatePathNs, long buildCommandNs, long planNs) { long totalMs = nanosToMillis(planNs - startNs); if (totalMs < 1000L) { return; } log.warn("输送线堵塞重规划耗时较长, taskNo={}, stationId={}, targetStationId={}, total={}ms, loopEval={}ms, candidatePath={}ms, buildCommand={}ms, planner={}ms, candidatePathCount={}, candidateCommandCount={}", taskNo, stationId, targetStationId, totalMs, nanosToMillis(loopEvalNs - startNs), nanosToMillis(candidatePathNs - loopEvalNs), nanosToMillis(buildCommandNs - candidatePathNs), nanosToMillis(planNs - buildCommandNs), candidatePathCount, candidateCommandCount); } private long nanosToMillis(long nanos) { return nanos <= 0L ? 0L : nanos / 1_000_000L; } }