| | |
| | | import com.zy.core.network.ZyStationConnectDriver; |
| | | import com.zy.core.network.entity.ZyStationStatusEntity; |
| | | import com.zy.core.service.StationTaskLoopService; |
| | | import com.zy.core.task.BasStationOptAsyncPublisher; |
| | | import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider; |
| | | import com.zy.core.thread.impl.v5.StationV5RunBlockReroutePlanner; |
| | | import com.zy.core.thread.impl.v5.StationV5SegmentExecutor; |
| | | import com.zy.core.thread.impl.v5.StationV5StatusReader; |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Objects; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ArrayBlockingQueue; |
| | | import java.util.concurrent.RejectedExecutionException; |
| | | import java.util.concurrent.ThreadPoolExecutor; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | @Data |
| | | @Slf4j |
| | | public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread { |
| | | |
| | | private static final int SEGMENT_EXECUTOR_POOL_SIZE = 64; |
| | | private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128; |
| | | private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512; |
| | | private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20; |
| | | private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48; |
| | | private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L; |
| | | private static final long COMMAND_BUILD_WARN_MS = 500L; |
| | | private static final int QUEUE_DRAIN_BATCH_SIZE = 32; |
| | | private static final long QUEUE_IDLE_SLEEP_MS = 20L; |
| | | |
| | | private DeviceConfig deviceConfig; |
| | | private RedisUtil redisUtil; |
| | | private ZyStationConnectDriver zyStationConnectDriver; |
| | | private final ExecutorService executor = Executors.newFixedThreadPool(SEGMENT_EXECUTOR_POOL_SIZE); |
| | | private final ThreadPoolExecutor executor; |
| | | private StationV5SegmentExecutor segmentExecutor; |
| | | private final RecentStationArrivalTracker recentArrivalTracker; |
| | | private final StationV5StatusReader statusReader; |
| | | private final StationV5RunBlockReroutePlanner runBlockReroutePlanner; |
| | | private final int executorQueueCapacity; |
| | | private final BasStationOptAsyncPublisher basStationOptAsyncPublisher; |
| | | |
| | | public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) { |
| | | this.deviceConfig = deviceConfig; |
| | | this.redisUtil = redisUtil; |
| | | StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class); |
| | | int poolSize = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE : configProvider.getSegmentExecutorPoolSize(); |
| | | this.executorQueueCapacity = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY : configProvider.getSegmentExecutorQueueCapacity(); |
| | | this.executor = new ThreadPoolExecutor( |
| | | poolSize, |
| | | poolSize, |
| | | 0L, |
| | | TimeUnit.MILLISECONDS, |
| | | new ArrayBlockingQueue<>(executorQueueCapacity) |
| | | ); |
| | | this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil); |
| | | this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand); |
| | | this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker); |
| | | this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil); |
| | | this.basStationOptAsyncPublisher = SpringUtils.getBean(BasStationOptAsyncPublisher.class); |
| | | log.info("初始化V5输送线程池,deviceNo={}, poolSize={}, queueCapacity={}", |
| | | deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize, executorQueueCapacity); |
| | | } |
| | | |
| | | @Override |
| | |
| | | Thread processThread = new Thread(() -> { |
| | | while (true) { |
| | | try { |
| | | pollAndDispatchQueuedCommand(); |
| | | Thread.sleep(100); |
| | | int dispatchedCount = pollAndDispatchQueuedCommandBatch(); |
| | | if (dispatchedCount <= 0) { |
| | | Thread.sleep(QUEUE_IDLE_SLEEP_MS); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("StationV5Process Fail", e); |
| | | } |
| | |
| | | return map; |
| | | } |
| | | |
| | | private void pollAndDispatchQueuedCommand() { |
| | | Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo()); |
| | | if (task == null || task.getStep() == null || task.getStep() != 2) { |
| | | return; |
| | | } |
| | | submitSegmentCommand((StationCommand) task.getData()); |
| | | @Override |
| | | public List<Integer> getAllTaskNoList() { |
| | | return statusReader.getTaskNoList(); |
| | | } |
| | | |
| | | private void submitSegmentCommand(StationCommand command) { |
| | | if (command == null || executor == null || segmentExecutor == null) { |
| | | return; |
| | | private int pollAndDispatchQueuedCommandBatch() { |
| | | int dispatchedCount = 0; |
| | | while (dispatchedCount < QUEUE_DRAIN_BATCH_SIZE) { |
| | | if (isExecutorQueueAtWatermark()) { |
| | | logExecutorAbnormal("executor-watermark", null); |
| | | break; |
| | | } |
| | | Task task = MessageQueue.peek(SlaveType.Devp, deviceConfig.getDeviceNo()); |
| | | if (task == null || task.getStep() == null || task.getStep() != 2) { |
| | | break; |
| | | } |
| | | StationCommand command = (StationCommand) task.getData(); |
| | | logExecutorAbnormal("queue-peek", command); |
| | | if (!submitSegmentCommand(command)) { |
| | | logExecutorAbnormal("submit-rejected", command); |
| | | break; |
| | | } |
| | | MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo()); |
| | | dispatchedCount++; |
| | | } |
| | | executor.submit(() -> segmentExecutor.execute(command)); |
| | | return dispatchedCount; |
| | | } |
| | | |
| | | private boolean submitSegmentCommand(StationCommand command) { |
| | | if (command == null || executor == null || segmentExecutor == null) { |
| | | return false; |
| | | } |
| | | try { |
| | | executor.execute(() -> { |
| | | long start = System.currentTimeMillis(); |
| | | try { |
| | | segmentExecutor.execute(command); |
| | | } finally { |
| | | long costMs = System.currentTimeMillis() - start; |
| | | if (costMs >= SEGMENT_EXECUTE_WARN_MS) { |
| | | log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}", |
| | | deviceConfig.getDeviceNo(), |
| | | command.getTaskNo(), |
| | | command.getStationId(), |
| | | command.getTargetStaNo(), |
| | | costMs); |
| | | logExecutorAbnormal("segment-finish-slow", command); |
| | | } |
| | | } |
| | | }); |
| | | } catch (RejectedExecutionException e) { |
| | | log.error("V5输送线程池拒绝执行,保留设备队列积压,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, activeCount={}, queuedCount={}, queueCapacity={}", |
| | | deviceConfig.getDeviceNo(), |
| | | command.getTaskNo(), |
| | | command.getStationId(), |
| | | command.getTargetStaNo(), |
| | | executor.getActiveCount(), |
| | | executor.getQueue() == null ? 0 : executor.getQueue().size(), |
| | | executorQueueCapacity); |
| | | return false; |
| | | } |
| | | logExecutorAbnormal("after-submit", command); |
| | | return true; |
| | | } |
| | | |
| | | private boolean isExecutorQueueAtWatermark() { |
| | | if (executor == null || executor.getQueue() == null) { |
| | | return false; |
| | | } |
| | | return executor.getQueue().size() >= executorQueueCapacity; |
| | | } |
| | | |
| | | @Override |
| | |
| | | stationCommand.setCommandType(commandType); |
| | | |
| | | if (commandType == StationCommandType.MOVE && !stationId.equals(targetStationId)) { |
| | | long startNs = System.nanoTime(); |
| | | long calcPathStartNs = startNs; |
| | | List<NavigateNode> nodes = calcPathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor); |
| | | return fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId); |
| | | long calcPathCostMs = nanosToMillis(System.nanoTime() - calcPathStartNs); |
| | | long fillCommandStartNs = System.nanoTime(); |
| | | StationCommand builtCommand = fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId); |
| | | long fillCommandCostMs = nanosToMillis(System.nanoTime() - fillCommandStartNs); |
| | | long totalCostMs = nanosToMillis(System.nanoTime() - startNs); |
| | | if (totalCostMs >= COMMAND_BUILD_WARN_MS) { |
| | | log.warn("V5输送命令生成耗时较长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, calcPath={}ms, fillCommand={}ms, total={}ms", |
| | | deviceConfig == null ? null : deviceConfig.getDeviceNo(), |
| | | taskNo, |
| | | stationId, |
| | | targetStationId, |
| | | calcPathCostMs, |
| | | fillCommandCostMs, |
| | | totalCostMs); |
| | | } |
| | | return builtCommand; |
| | | } |
| | | return stationCommand; |
| | | } |
| | |
| | | loopEvaluation.getLoopIdentity().getScopeType(), |
| | | loopEvaluation.getLoopIdentity().getLocalStationCount(), |
| | | loopEvaluation.getLoopIdentity().getSourceLoopStationCount()); |
| | | log.info("输送线堵塞重规划候选路径计算开始,taskNo={}, stationId={}, targetStationId={}, pathLenFactor={}", |
| | | taskNo, stationId, targetStationId, pathLenFactor); |
| | | List<List<NavigateNode>> candidatePathList = calcCandidatePathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor); |
| | | long candidatePathNs = System.nanoTime(); |
| | | log.info("输送线堵塞重规划候选路径计算完成,taskNo={}, stationId={}, targetStationId={}, candidatePathCount={}, costMs={}", |
| | | taskNo, |
| | | stationId, |
| | | targetStationId, |
| | | candidatePathList == null ? null : candidatePathList.size(), |
| | | nanosToMillis(candidatePathNs - loopEvalNs)); |
| | | List<StationCommand> candidateCommandList = new ArrayList<>(); |
| | | for (List<NavigateNode> candidatePath : candidatePathList) { |
| | | StationCommand rerouteCommand = buildMoveCommand(taskNo, stationId, targetStationId, palletSize, candidatePath); |
| | |
| | | candidateCommandList.add(rerouteCommand); |
| | | } |
| | | long buildCommandNs = System.nanoTime(); |
| | | log.info("输送线堵塞重规划候选命令构建完成,taskNo={}, stationId={}, targetStationId={}, candidatePathCount={}, candidateCommandCount={}, costMs={}, firstCommandPath={}", |
| | | taskNo, |
| | | stationId, |
| | | targetStationId, |
| | | candidatePathList == null ? null : candidatePathList.size(), |
| | | candidateCommandList.size(), |
| | | nanosToMillis(buildCommandNs - candidatePathNs), |
| | | JSON.toJSONString(firstCommandPath(candidateCommandList))); |
| | | |
| | | StationV5RunBlockReroutePlanner.PlanResult planResult = runBlockReroutePlanner.plan( |
| | | taskNo, |
| | |
| | | candidateCommandList |
| | | ); |
| | | long planNs = System.nanoTime(); |
| | | log.info("输送线堵塞重规划planner完成,taskNo={}, stationId={}, targetStationId={}, planCount={}, selected={}, issuedRouteCount={}, costMs={}", |
| | | taskNo, |
| | | stationId, |
| | | targetStationId, |
| | | planResult == null ? null : planResult.getPlanCount(), |
| | | planResult != null && planResult.getCommand() != null, |
| | | planResult == null || planResult.getIssuedRoutePathList() == null ? null : planResult.getIssuedRoutePathList().size(), |
| | | nanosToMillis(planNs - buildCommandNs)); |
| | | logRunBlockRerouteCost(taskNo, |
| | | stationId, |
| | | targetStationId, |
| | |
| | | StationCommand rerouteCommand = planResult.getCommand(); |
| | | if (rerouteCommand != null) { |
| | | if (taskLoopService != null) { |
| | | log.info("输送线堵塞重规划记录环线开始,taskNo={}, stationId={}, targetStationId={}", |
| | | taskNo, stationId, targetStationId); |
| | | taskLoopService.recordLoopIssue(loopEvaluation, "RUN_BLOCK_REROUTE"); |
| | | log.info("输送线堵塞重规划记录环线完成,taskNo={}, stationId={}, targetStationId={}", |
| | | taskNo, stationId, targetStationId); |
| | | } |
| | | log.info("输送线堵塞重规划选中候选路线,taskNo={}, planCount={}, stationId={}, targetStationId={}, route={}", |
| | | taskNo, planResult.getPlanCount(), stationId, targetStationId, JSON.toJSONString(rerouteCommand.getNavigatePath())); |
| | |
| | | return null; |
| | | } |
| | | |
| | | private List<Integer> firstCommandPath(List<StationCommand> candidateCommandList) { |
| | | if (candidateCommandList == null || candidateCommandList.isEmpty()) { |
| | | return new ArrayList<>(); |
| | | } |
| | | StationCommand firstCommand = candidateCommandList.get(0); |
| | | if (firstCommand == null || firstCommand.getNavigatePath() == null) { |
| | | return new ArrayList<>(); |
| | | } |
| | | return firstCommand.getNavigatePath(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean clearPath(Integer taskNo) { |
| | | if (taskNo == null) { |
| | | if (taskNo == null || taskNo <= 0) { |
| | | return false; |
| | | } |
| | | if (zyStationConnectDriver == null) { |
| | |
| | | commandResponse != null && Boolean.TRUE.equals(commandResponse.getResult()) ? 1 : 0, |
| | | JSON.toJSONString(commandResponse) |
| | | ); |
| | | optService.save(basStationOpt); |
| | | if (basStationOptAsyncPublisher == null || !basStationOptAsyncPublisher.publish(basStationOpt)) { |
| | | optService.save(basStationOpt); |
| | | } |
| | | } |
| | | return commandResponse; |
| | | } |
| | | |
| | | private void logExecutorAbnormal(String scene, StationCommand command) { |
| | | if (executor == null) { |
| | | return; |
| | | } |
| | | int activeCount = executor.getActiveCount(); |
| | | int queuedCount = executor.getQueue() == null ? 0 : executor.getQueue().size(); |
| | | if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) { |
| | | return; |
| | | } |
| | | log.warn("V5输送线程池出现堆积,scene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}", |
| | | scene, |
| | | deviceConfig.getDeviceNo(), |
| | | command == null ? null : command.getTaskNo(), |
| | | command == null ? null : command.getStationId(), |
| | | command == null ? null : command.getTargetStaNo(), |
| | | executor.getPoolSize(), |
| | | activeCount, |
| | | queuedCount, |
| | | executor.getCompletedTaskCount()); |
| | | } |
| | | |
| | | @Override |
| | | public CommandResponse sendOriginCommand(String address, short[] data) { |
| | | return zyStationConnectDriver.sendOriginCommand(address, data); |