| | |
| | | import java.util.Objects; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ThreadPoolExecutor; |
| | | |
| | | @Data |
| | | @Slf4j |
| | | public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread { |
| | | |
| | | private static final int SEGMENT_EXECUTOR_POOL_SIZE = 64; |
| | | private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128; |
| | | private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize"; |
| | | private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20; |
| | | private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48; |
| | | private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L; |
| | | |
| | | private DeviceConfig deviceConfig; |
| | | private RedisUtil redisUtil; |
| | | private ZyStationConnectDriver zyStationConnectDriver; |
| | | private final ExecutorService executor = Executors.newFixedThreadPool(SEGMENT_EXECUTOR_POOL_SIZE); |
| | | private final ExecutorService executor; |
| | | private StationV5SegmentExecutor segmentExecutor; |
| | | private final RecentStationArrivalTracker recentArrivalTracker; |
| | | private final StationV5StatusReader statusReader; |
| | |
| | | public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) { |
| | | this.deviceConfig = deviceConfig; |
| | | this.redisUtil = redisUtil; |
| | | int poolSize = resolveSegmentExecutorPoolSize(redisUtil); |
| | | this.executor = Executors.newFixedThreadPool(poolSize); |
| | | this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil); |
| | | this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand); |
| | | this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker); |
| | | this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil); |
| | | log.info("初始化V5输送线程池,deviceNo={}, poolSize={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize); |
| | | } |
| | | |
| | | @Override |
| | |
| | | return map; |
| | | } |
| | | |
| | | @Override |
| | | public List<Integer> getAllTaskNoList() { |
| | | return statusReader.getTaskNoList(); |
| | | } |
| | | |
| | | private void pollAndDispatchQueuedCommand() { |
| | | Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo()); |
| | | if (task == null || task.getStep() == null || task.getStep() != 2) { |
| | | return; |
| | | } |
| | | submitSegmentCommand((StationCommand) task.getData()); |
| | | StationCommand command = (StationCommand) task.getData(); |
| | | logExecutorAbnormal("queue-poll", command); |
| | | submitSegmentCommand(command); |
| | | } |
| | | |
| | | private void submitSegmentCommand(StationCommand command) { |
| | | if (command == null || executor == null || segmentExecutor == null) { |
| | | return; |
| | | } |
| | | executor.submit(() -> segmentExecutor.execute(command)); |
| | | executor.submit(() -> { |
| | | long start = System.currentTimeMillis(); |
| | | try { |
| | | segmentExecutor.execute(command); |
| | | } finally { |
| | | long costMs = System.currentTimeMillis() - start; |
| | | if (costMs >= SEGMENT_EXECUTE_WARN_MS) { |
| | | log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}", |
| | | deviceConfig.getDeviceNo(), |
| | | command.getTaskNo(), |
| | | command.getStationId(), |
| | | command.getTargetStaNo(), |
| | | costMs); |
| | | logExecutorAbnormal("segment-finish-slow", command); |
| | | } |
| | | } |
| | | }); |
| | | logExecutorAbnormal("after-submit", command); |
| | | } |
| | | |
| | | @Override |
| | |
| | | } |
| | | |
| | | @Override |
| | | public synchronized StationCommand getRunBlockRerouteCommand(Integer taskNo, |
| | | public StationCommand getRunBlockRerouteCommand(Integer taskNo, |
| | | Integer stationId, |
| | | Integer targetStationId, |
| | | Integer palletSize) { |
| | |
| | | } |
| | | |
| | | @Override |
| | | public synchronized StationCommand getRunBlockRerouteCommand(Integer taskNo, |
| | | public StationCommand getRunBlockRerouteCommand(Integer taskNo, |
| | | Integer stationId, |
| | | Integer targetStationId, |
| | | Integer palletSize, |
| | |
| | | return getCommand(StationCommandType.MOVE, taskNo, stationId, targetStationId, palletSize, pathLenFactor); |
| | | } |
| | | |
| | | long startNs = System.nanoTime(); |
| | | StationTaskLoopService taskLoopService = loadStationTaskLoopService(); |
| | | StationTaskLoopService.LoopEvaluation loopEvaluation = taskLoopService == null |
| | | ? new StationTaskLoopService.LoopEvaluation(taskNo, stationId, StationTaskLoopService.LoopIdentitySnapshot.empty(), 0, 0, false) |
| | | : taskLoopService.evaluateLoop(taskNo, stationId, true); |
| | | long loopEvalNs = System.nanoTime(); |
| | | log.info("输送线堵塞重规划环线识别,taskNo={}, stationId={}, scopeType={}, localStationCount={}, sourceLoopStationCount={}", |
| | | taskNo, |
| | | stationId, |
| | |
| | | loopEvaluation.getLoopIdentity().getLocalStationCount(), |
| | | loopEvaluation.getLoopIdentity().getSourceLoopStationCount()); |
| | | List<List<NavigateNode>> candidatePathList = calcCandidatePathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor); |
| | | long candidatePathNs = System.nanoTime(); |
| | | List<StationCommand> candidateCommandList = new ArrayList<>(); |
| | | for (List<NavigateNode> candidatePath : candidatePathList) { |
| | | StationCommand rerouteCommand = buildMoveCommand(taskNo, stationId, targetStationId, palletSize, candidatePath); |
| | |
| | | } |
| | | candidateCommandList.add(rerouteCommand); |
| | | } |
| | | long buildCommandNs = System.nanoTime(); |
| | | |
| | | StationV5RunBlockReroutePlanner.PlanResult planResult = runBlockReroutePlanner.plan( |
| | | taskNo, |
| | |
| | | loopEvaluation, |
| | | candidateCommandList |
| | | ); |
| | | long planNs = System.nanoTime(); |
| | | logRunBlockRerouteCost(taskNo, |
| | | stationId, |
| | | targetStationId, |
| | | candidatePathList == null ? 0 : candidatePathList.size(), |
| | | candidateCommandList.size(), |
| | | startNs, |
| | | loopEvalNs, |
| | | candidatePathNs, |
| | | buildCommandNs, |
| | | planNs); |
| | | if (candidateCommandList.isEmpty()) { |
| | | log.warn("输送线堵塞重规划失败,候选路径为空,taskNo={}, planCount={}, stationId={}, targetStationId={}", |
| | | taskNo, planResult.getPlanCount(), stationId, targetStationId); |
| | |
| | | } |
| | | |
| | | @Override |
| | | public synchronized boolean clearPath(Integer taskNo) { |
| | | public boolean clearPath(Integer taskNo) { |
| | | if (taskNo == null || taskNo <= 0) { |
| | | return false; |
| | | } |
| | |
| | | continue; |
| | | } |
| | | found = true; |
| | | Integer clearedTaskNo = item.getTaskNo(); |
| | | if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, item.getSlotIdx())) { |
| | | success = false; |
| | | log.warn("输送站缓存区残留路径清理失败。stationId={}, slotIdx={}, taskNo={}", |
| | | stationId, item.getSlotIdx(), item.getTaskNo()); |
| | | stationId, item.getSlotIdx(), clearedTaskNo); |
| | | continue; |
| | | }else { |
| | | item.setTaskNo(0); |
| | | item.setTargetStaNo(0); |
| | | success = true; |
| | | log.warn("输送站缓存区残留路径清理成功。stationId={}, slotIdx={}, taskNo={}", |
| | | stationId, item.getSlotIdx(), item.getTaskNo()); |
| | | } |
| | | item.setTaskNo(0); |
| | | item.setTargetStaNo(0); |
| | | log.warn("输送站缓存区残留路径清理成功。stationId={}, slotIdx={}, taskNo={}", |
| | | stationId, item.getSlotIdx(), clearedTaskNo); |
| | | } |
| | | } |
| | | return found && success; |
| | | } |
| | | |
| | | @Override |
| | | public boolean clearPathByStationSlot(Integer stationId, Integer slotIdx) { |
| | | if (stationId == null || slotIdx == null || zyStationConnectDriver == null) { |
| | | return false; |
| | | } |
| | | List<StationProtocol> status = getStatus(); |
| | | if (status == null || status.isEmpty()) { |
| | | return false; |
| | | } |
| | | |
| | | for (StationProtocol stationProtocol : status) { |
| | | if (stationProtocol == null || !Objects.equals(stationId, stationProtocol.getStationId())) { |
| | | continue; |
| | | } |
| | | if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, slotIdx)) { |
| | | log.warn("输送站缓存区残留路径按站点槽位清理失败。stationId={}, slotIdx={}", stationId, slotIdx); |
| | | return false; |
| | | } |
| | | List<StationTaskBufferItem> taskBufferItems = stationProtocol.getTaskBufferItems(); |
| | | if (taskBufferItems != null) { |
| | | for (StationTaskBufferItem item : taskBufferItems) { |
| | | if (item != null && Objects.equals(slotIdx, item.getSlotIdx())) { |
| | | item.setTaskNo(0); |
| | | item.setTargetStaNo(0); |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | log.warn("输送站缓存区残留路径按站点槽位清理成功。stationId={}, slotIdx={}", stationId, slotIdx); |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | @Override |
| | |
| | | return commandResponse; |
| | | } |
| | | |
| | | private void logExecutorAbnormal(String scene, StationCommand command) { |
| | | if (!(executor instanceof ThreadPoolExecutor)) { |
| | | return; |
| | | } |
| | | ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; |
| | | int activeCount = threadPoolExecutor.getActiveCount(); |
| | | int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size(); |
| | | if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) { |
| | | return; |
| | | } |
| | | log.warn("V5输送线程池出现堆积,scene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}", |
| | | scene, |
| | | deviceConfig.getDeviceNo(), |
| | | command == null ? null : command.getTaskNo(), |
| | | command == null ? null : command.getStationId(), |
| | | command == null ? null : command.getTargetStaNo(), |
| | | threadPoolExecutor.getPoolSize(), |
| | | activeCount, |
| | | queuedCount, |
| | | threadPoolExecutor.getCompletedTaskCount()); |
| | | } |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | private int resolveSegmentExecutorPoolSize(RedisUtil redisUtil) { |
| | | if (redisUtil == null) { |
| | | return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; |
| | | } |
| | | try { |
| | | Object systemConfigMapObj = redisUtil.get(com.zy.core.enums.RedisKeyType.SYSTEM_CONFIG_MAP.key); |
| | | if (!(systemConfigMapObj instanceof HashMap)) { |
| | | return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; |
| | | } |
| | | HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj; |
| | | String poolSizeText = systemConfigMap.get(CFG_SEGMENT_EXECUTOR_POOL_SIZE); |
| | | if (poolSizeText == null || poolSizeText.trim().isEmpty()) { |
| | | return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; |
| | | } |
| | | int configured = Integer.parseInt(poolSizeText.trim()); |
| | | if (configured < 16) { |
| | | return 16; |
| | | } |
| | | return Math.min(configured, 512); |
| | | } catch (Exception ignore) { |
| | | return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public CommandResponse sendOriginCommand(String address, short[] data) { |
| | | return zyStationConnectDriver.sendOriginCommand(address, data); |
| | |
| | | if (navigateUtils == null) { |
| | | return new ArrayList<>(); |
| | | } |
| | | return navigateUtils.calcByStationId(startStationId, targetStationId, taskNo, pathLenFactor); |
| | | return navigateUtils.calcOptimalPathByStationId(startStationId, targetStationId, taskNo, pathLenFactor); |
| | | } |
| | | |
| | | private List<List<NavigateNode>> calcCandidatePathNavigateNodes(Integer taskNo, |
| | |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | private void logRunBlockRerouteCost(Integer taskNo, |
| | | Integer stationId, |
| | | Integer targetStationId, |
| | | int candidatePathCount, |
| | | int candidateCommandCount, |
| | | long startNs, |
| | | long loopEvalNs, |
| | | long candidatePathNs, |
| | | long buildCommandNs, |
| | | long planNs) { |
| | | long totalMs = nanosToMillis(planNs - startNs); |
| | | if (totalMs < 1000L) { |
| | | return; |
| | | } |
| | | log.warn("输送线堵塞重规划耗时较长, taskNo={}, stationId={}, targetStationId={}, total={}ms, loopEval={}ms, candidatePath={}ms, buildCommand={}ms, planner={}ms, candidatePathCount={}, candidateCommandCount={}", |
| | | taskNo, |
| | | stationId, |
| | | targetStationId, |
| | | totalMs, |
| | | nanosToMillis(loopEvalNs - startNs), |
| | | nanosToMillis(candidatePathNs - loopEvalNs), |
| | | nanosToMillis(buildCommandNs - candidatePathNs), |
| | | nanosToMillis(planNs - buildCommandNs), |
| | | candidatePathCount, |
| | | candidateCommandCount); |
| | | } |
| | | |
| | | private long nanosToMillis(long nanos) { |
| | | return nanos <= 0L ? 0L : nanos / 1_000_000L; |
| | | } |
| | | } |