Junjie
2 天以前 63b01db83d9aad8a15276b4236a9a22e4aeef065
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -21,6 +21,8 @@
import com.zy.core.network.ZyStationConnectDriver;
import com.zy.core.network.entity.ZyStationStatusEntity;
import com.zy.core.service.StationTaskLoopService;
import com.zy.core.task.BasStationOptAsyncPublisher;
import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider;
import com.zy.core.thread.impl.v5.StationV5RunBlockReroutePlanner;
import com.zy.core.thread.impl.v5.StationV5SegmentExecutor;
import com.zy.core.thread.impl.v5.StationV5StatusReader;
@@ -34,31 +36,55 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread {
    private static final int SEGMENT_EXECUTOR_POOL_SIZE = 64;
    private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
    private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512;
    private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20;
    private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48;
    private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L;
    private static final long COMMAND_BUILD_WARN_MS = 500L;
    private static final int QUEUE_DRAIN_BATCH_SIZE = 32;
    private static final long QUEUE_IDLE_SLEEP_MS = 20L;
    private DeviceConfig deviceConfig;
    private RedisUtil redisUtil;
    private ZyStationConnectDriver zyStationConnectDriver;
    private final ExecutorService executor = Executors.newFixedThreadPool(SEGMENT_EXECUTOR_POOL_SIZE);
    private final ThreadPoolExecutor executor;
    private StationV5SegmentExecutor segmentExecutor;
    private final RecentStationArrivalTracker recentArrivalTracker;
    private final StationV5StatusReader statusReader;
    private final StationV5RunBlockReroutePlanner runBlockReroutePlanner;
    private final int executorQueueCapacity;
    private final BasStationOptAsyncPublisher basStationOptAsyncPublisher;
    public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
        this.redisUtil = redisUtil;
        StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class);
        int poolSize = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE : configProvider.getSegmentExecutorPoolSize();
        this.executorQueueCapacity = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY : configProvider.getSegmentExecutorQueueCapacity();
        this.executor = new ThreadPoolExecutor(
                poolSize,
                poolSize,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(executorQueueCapacity)
        );
        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
        this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
        this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker);
        this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil);
        this.basStationOptAsyncPublisher = SpringUtils.getBean(BasStationOptAsyncPublisher.class);
        log.info("初始化V5输送线程池,deviceNo={}, poolSize={}, queueCapacity={}",
                deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize, executorQueueCapacity);
    }
    @Override
@@ -81,8 +107,10 @@
        Thread processThread = new Thread(() -> {
            while (true) {
                try {
                    pollAndDispatchQueuedCommand();
                    Thread.sleep(100);
                    int dispatchedCount = pollAndDispatchQueuedCommandBatch();
                    if (dispatchedCount <= 0) {
                        Thread.sleep(QUEUE_IDLE_SLEEP_MS);
                    }
                } catch (Exception e) {
                    log.error("StationV5Process Fail", e);
                }
@@ -126,19 +154,76 @@
        return map;
    }
    private void pollAndDispatchQueuedCommand() {
        Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
        if (task == null || task.getStep() == null || task.getStep() != 2) {
            return;
        }
        submitSegmentCommand((StationCommand) task.getData());
    @Override
    public List<Integer> getAllTaskNoList() {
        return statusReader.getTaskNoList();
    }
    private void submitSegmentCommand(StationCommand command) {
        if (command == null || executor == null || segmentExecutor == null) {
            return;
    private int pollAndDispatchQueuedCommandBatch() {
        int dispatchedCount = 0;
        while (dispatchedCount < QUEUE_DRAIN_BATCH_SIZE) {
            if (isExecutorQueueAtWatermark()) {
                logExecutorAbnormal("executor-watermark", null);
                break;
            }
            Task task = MessageQueue.peek(SlaveType.Devp, deviceConfig.getDeviceNo());
            if (task == null || task.getStep() == null || task.getStep() != 2) {
                break;
            }
            StationCommand command = (StationCommand) task.getData();
            logExecutorAbnormal("queue-peek", command);
            if (!submitSegmentCommand(command)) {
                logExecutorAbnormal("submit-rejected", command);
                break;
            }
            MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
            dispatchedCount++;
        }
        executor.submit(() -> segmentExecutor.execute(command));
        return dispatchedCount;
    }
    private boolean submitSegmentCommand(StationCommand command) {
        if (command == null || executor == null || segmentExecutor == null) {
            return false;
        }
        try {
            executor.execute(() -> {
                long start = System.currentTimeMillis();
                try {
                    segmentExecutor.execute(command);
                } finally {
                    long costMs = System.currentTimeMillis() - start;
                    if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
                        log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
                                deviceConfig.getDeviceNo(),
                                command.getTaskNo(),
                                command.getStationId(),
                                command.getTargetStaNo(),
                                costMs);
                        logExecutorAbnormal("segment-finish-slow", command);
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            log.error("V5输送线程池拒绝执行,保留设备队列积压,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, activeCount={}, queuedCount={}, queueCapacity={}",
                    deviceConfig.getDeviceNo(),
                    command.getTaskNo(),
                    command.getStationId(),
                    command.getTargetStaNo(),
                    executor.getActiveCount(),
                    executor.getQueue() == null ? 0 : executor.getQueue().size(),
                    executorQueueCapacity);
            return false;
        }
        logExecutorAbnormal("after-submit", command);
        return true;
    }
    private boolean isExecutorQueueAtWatermark() {
        if (executor == null || executor.getQueue() == null) {
            return false;
        }
        return executor.getQueue().size() >= executorQueueCapacity;
    }
    @Override
@@ -170,8 +255,25 @@
        stationCommand.setCommandType(commandType);
        if (commandType == StationCommandType.MOVE && !stationId.equals(targetStationId)) {
            long startNs = System.nanoTime();
            long calcPathStartNs = startNs;
            List<NavigateNode> nodes = calcPathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor);
            return fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId);
            long calcPathCostMs = nanosToMillis(System.nanoTime() - calcPathStartNs);
            long fillCommandStartNs = System.nanoTime();
            StationCommand builtCommand = fillMoveCommandPath(stationCommand, nodes, taskNo, stationId, targetStationId);
            long fillCommandCostMs = nanosToMillis(System.nanoTime() - fillCommandStartNs);
            long totalCostMs = nanosToMillis(System.nanoTime() - startNs);
            if (totalCostMs >= COMMAND_BUILD_WARN_MS) {
                log.warn("V5输送命令生成耗时较长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, calcPath={}ms, fillCommand={}ms, total={}ms",
                        deviceConfig == null ? null : deviceConfig.getDeviceNo(),
                        taskNo,
                        stationId,
                        targetStationId,
                        calcPathCostMs,
                        fillCommandCostMs,
                        totalCostMs);
            }
            return builtCommand;
        }
        return stationCommand;
    }
@@ -197,10 +299,12 @@
            return getCommand(StationCommandType.MOVE, taskNo, stationId, targetStationId, palletSize, pathLenFactor);
        }
        long startNs = System.nanoTime();
        StationTaskLoopService taskLoopService = loadStationTaskLoopService();
        StationTaskLoopService.LoopEvaluation loopEvaluation = taskLoopService == null
                ? new StationTaskLoopService.LoopEvaluation(taskNo, stationId, StationTaskLoopService.LoopIdentitySnapshot.empty(), 0, 0, false)
                : taskLoopService.evaluateLoop(taskNo, stationId, true);
        long loopEvalNs = System.nanoTime();
        log.info("输送线堵塞重规划环线识别,taskNo={}, stationId={}, scopeType={}, localStationCount={}, sourceLoopStationCount={}",
                taskNo,
                stationId,
@@ -208,6 +312,7 @@
                loopEvaluation.getLoopIdentity().getLocalStationCount(),
                loopEvaluation.getLoopIdentity().getSourceLoopStationCount());
        List<List<NavigateNode>> candidatePathList = calcCandidatePathNavigateNodes(taskNo, stationId, targetStationId, pathLenFactor);
        long candidatePathNs = System.nanoTime();
        List<StationCommand> candidateCommandList = new ArrayList<>();
        for (List<NavigateNode> candidatePath : candidatePathList) {
            StationCommand rerouteCommand = buildMoveCommand(taskNo, stationId, targetStationId, palletSize, candidatePath);
@@ -216,6 +321,7 @@
            }
            candidateCommandList.add(rerouteCommand);
        }
        long buildCommandNs = System.nanoTime();
        StationV5RunBlockReroutePlanner.PlanResult planResult = runBlockReroutePlanner.plan(
                taskNo,
@@ -223,6 +329,17 @@
                loopEvaluation,
                candidateCommandList
        );
        long planNs = System.nanoTime();
        logRunBlockRerouteCost(taskNo,
                stationId,
                targetStationId,
                candidatePathList == null ? 0 : candidatePathList.size(),
                candidateCommandList.size(),
                startNs,
                loopEvalNs,
                candidatePathNs,
                buildCommandNs,
                planNs);
        if (candidateCommandList.isEmpty()) {
            log.warn("输送线堵塞重规划失败,候选路径为空,taskNo={}, planCount={}, stationId={}, targetStationId={}",
                    taskNo, planResult.getPlanCount(), stationId, targetStationId);
@@ -274,21 +391,54 @@
                    continue;
                }
                found = true;
                Integer clearedTaskNo = item.getTaskNo();
                if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, item.getSlotIdx())) {
                    success = false;
                    log.warn("输送站缓存区残留路径清理失败。stationId={}, slotIdx={}, taskNo={}",
                            stationId, item.getSlotIdx(), item.getTaskNo());
                            stationId, item.getSlotIdx(), clearedTaskNo);
                    continue;
                }else {
                    item.setTaskNo(0);
                    item.setTargetStaNo(0);
                    success = true;
                    log.warn("输送站缓存区残留路径清理成功。stationId={}, slotIdx={}, taskNo={}",
                            stationId, item.getSlotIdx(), item.getTaskNo());
                }
                item.setTaskNo(0);
                item.setTargetStaNo(0);
                log.warn("输送站缓存区残留路径清理成功。stationId={}, slotIdx={}, taskNo={}",
                        stationId, item.getSlotIdx(), clearedTaskNo);
            }
        }
        return found && success;
    }
    @Override
    public boolean clearPathByStationSlot(Integer stationId, Integer slotIdx) {
        if (stationId == null || slotIdx == null || zyStationConnectDriver == null) {
            return false;
        }
        List<StationProtocol> status = getStatus();
        if (status == null || status.isEmpty()) {
            return false;
        }
        for (StationProtocol stationProtocol : status) {
            if (stationProtocol == null || !Objects.equals(stationId, stationProtocol.getStationId())) {
                continue;
            }
            if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, slotIdx)) {
                log.warn("输送站缓存区残留路径按站点槽位清理失败。stationId={}, slotIdx={}", stationId, slotIdx);
                return false;
            }
            List<StationTaskBufferItem> taskBufferItems = stationProtocol.getTaskBufferItems();
            if (taskBufferItems != null) {
                for (StationTaskBufferItem item : taskBufferItems) {
                    if (item != null && Objects.equals(slotIdx, item.getSlotIdx())) {
                        item.setTaskNo(0);
                        item.setTargetStaNo(0);
                        break;
                    }
                }
            }
            log.warn("输送站缓存区残留路径按站点槽位清理成功。stationId={}, slotIdx={}", stationId, slotIdx);
            return true;
        }
        return false;
    }
    @Override
@@ -328,9 +478,32 @@
                    commandResponse != null && Boolean.TRUE.equals(commandResponse.getResult()) ? 1 : 0,
                    JSON.toJSONString(commandResponse)
            );
            optService.save(basStationOpt);
            if (basStationOptAsyncPublisher == null || !basStationOptAsyncPublisher.publish(basStationOpt)) {
                optService.save(basStationOpt);
            }
        }
        return commandResponse;
    }
    private void logExecutorAbnormal(String scene, StationCommand command) {
        if (executor == null) {
            return;
        }
        int activeCount = executor.getActiveCount();
        int queuedCount = executor.getQueue() == null ? 0 : executor.getQueue().size();
        if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) {
            return;
        }
        log.warn("V5输送线程池出现堆积,scene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}",
                scene,
                deviceConfig.getDeviceNo(),
                command == null ? null : command.getTaskNo(),
                command == null ? null : command.getStationId(),
                command == null ? null : command.getTargetStaNo(),
                executor.getPoolSize(),
                activeCount,
                queuedCount,
                executor.getCompletedTaskCount());
    }
    @Override
@@ -423,4 +596,35 @@
            return null;
        }
    }
    private void logRunBlockRerouteCost(Integer taskNo,
                                        Integer stationId,
                                        Integer targetStationId,
                                        int candidatePathCount,
                                        int candidateCommandCount,
                                        long startNs,
                                        long loopEvalNs,
                                        long candidatePathNs,
                                        long buildCommandNs,
                                        long planNs) {
        long totalMs = nanosToMillis(planNs - startNs);
        if (totalMs < 1000L) {
            return;
        }
        log.warn("输送线堵塞重规划耗时较长, taskNo={}, stationId={}, targetStationId={}, total={}ms, loopEval={}ms, candidatePath={}ms, buildCommand={}ms, planner={}ms, candidatePathCount={}, candidateCommandCount={}",
                taskNo,
                stationId,
                targetStationId,
                totalMs,
                nanosToMillis(loopEvalNs - startNs),
                nanosToMillis(candidatePathNs - loopEvalNs),
                nanosToMillis(buildCommandNs - candidatePathNs),
                nanosToMillis(planNs - buildCommandNs),
                candidatePathCount,
                candidateCommandCount);
    }
    private long nanosToMillis(long nanos) {
        return nanos <= 0L ? 0L : nanos / 1_000_000L;
    }
}