Junjie
12 小时以前 03c3ae747f82ad22c761c79e7b1c0e0031c57d41
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -36,17 +36,22 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@Data
@Slf4j
public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread {
    private static final int SEGMENT_EXECUTOR_POOL_SIZE = 64;
    private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
    private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize";
    private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20;
    private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48;
    private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L;
    private DeviceConfig deviceConfig;
    private RedisUtil redisUtil;
    private ZyStationConnectDriver zyStationConnectDriver;
    private final ExecutorService executor = Executors.newFixedThreadPool(SEGMENT_EXECUTOR_POOL_SIZE);
    private final ExecutorService executor;
    private StationV5SegmentExecutor segmentExecutor;
    private final RecentStationArrivalTracker recentArrivalTracker;
    private final StationV5StatusReader statusReader;
@@ -55,10 +60,13 @@
    public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
        this.redisUtil = redisUtil;
        int poolSize = resolveSegmentExecutorPoolSize(redisUtil);
        this.executor = Executors.newFixedThreadPool(poolSize);
        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
        this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
        this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker);
        this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil);
        log.info("初始化V5输送线程池,deviceNo={}, poolSize={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize);
    }
    @Override
@@ -126,19 +134,43 @@
        return map;
    }
    @Override
    public List<Integer> getAllTaskNoList() {
        return statusReader.getTaskNoList();
    }
    private void pollAndDispatchQueuedCommand() {
        Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
        if (task == null || task.getStep() == null || task.getStep() != 2) {
            return;
        }
        submitSegmentCommand((StationCommand) task.getData());
        StationCommand command = (StationCommand) task.getData();
        logExecutorAbnormal("queue-poll", command);
        submitSegmentCommand(command);
    }
    private void submitSegmentCommand(StationCommand command) {
        if (command == null || executor == null || segmentExecutor == null) {
            return;
        }
        executor.submit(() -> segmentExecutor.execute(command));
        executor.submit(() -> {
            long start = System.currentTimeMillis();
            try {
                segmentExecutor.execute(command);
            } finally {
                long costMs = System.currentTimeMillis() - start;
                if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
                    log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
                            deviceConfig.getDeviceNo(),
                            command.getTaskNo(),
                            command.getStationId(),
                            command.getTargetStaNo(),
                            costMs);
                    logExecutorAbnormal("segment-finish-slow", command);
                }
            }
        });
        logExecutorAbnormal("after-submit", command);
    }
    @Override
@@ -289,21 +321,54 @@
                    continue;
                }
                found = true;
                Integer clearedTaskNo = item.getTaskNo();
                if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, item.getSlotIdx())) {
                    success = false;
                    log.warn("输送站缓存区残留路径清理失败。stationId={}, slotIdx={}, taskNo={}",
                            stationId, item.getSlotIdx(), item.getTaskNo());
                            stationId, item.getSlotIdx(), clearedTaskNo);
                    continue;
                }else {
                    item.setTaskNo(0);
                    item.setTargetStaNo(0);
                    success = true;
                    log.warn("输送站缓存区残留路径清理成功。stationId={}, slotIdx={}, taskNo={}",
                            stationId, item.getSlotIdx(), item.getTaskNo());
                }
                item.setTaskNo(0);
                item.setTargetStaNo(0);
                log.warn("输送站缓存区残留路径清理成功。stationId={}, slotIdx={}, taskNo={}",
                        stationId, item.getSlotIdx(), clearedTaskNo);
            }
        }
        return found && success;
    }
    @Override
    public boolean clearPathByStationSlot(Integer stationId, Integer slotIdx) {
        if (stationId == null || slotIdx == null || zyStationConnectDriver == null) {
            return false;
        }
        List<StationProtocol> status = getStatus();
        if (status == null || status.isEmpty()) {
            return false;
        }
        for (StationProtocol stationProtocol : status) {
            if (stationProtocol == null || !Objects.equals(stationId, stationProtocol.getStationId())) {
                continue;
            }
            if (!zyStationConnectDriver.clearTaskBufferSlot(stationId, slotIdx)) {
                log.warn("输送站缓存区残留路径按站点槽位清理失败。stationId={}, slotIdx={}", stationId, slotIdx);
                return false;
            }
            List<StationTaskBufferItem> taskBufferItems = stationProtocol.getTaskBufferItems();
            if (taskBufferItems != null) {
                for (StationTaskBufferItem item : taskBufferItems) {
                    if (item != null && Objects.equals(slotIdx, item.getSlotIdx())) {
                        item.setTaskNo(0);
                        item.setTargetStaNo(0);
                        break;
                    }
                }
            }
            log.warn("输送站缓存区残留路径按站点槽位清理成功。stationId={}, slotIdx={}", stationId, slotIdx);
            return true;
        }
        return false;
    }
    @Override
@@ -348,6 +413,53 @@
        return commandResponse;
    }
    private void logExecutorAbnormal(String scene, StationCommand command) {
        if (!(executor instanceof ThreadPoolExecutor)) {
            return;
        }
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
        int activeCount = threadPoolExecutor.getActiveCount();
        int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size();
        if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) {
            return;
        }
        log.warn("V5输送线程池出现堆积,scene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}",
                scene,
                deviceConfig.getDeviceNo(),
                command == null ? null : command.getTaskNo(),
                command == null ? null : command.getStationId(),
                command == null ? null : command.getTargetStaNo(),
                threadPoolExecutor.getPoolSize(),
                activeCount,
                queuedCount,
                threadPoolExecutor.getCompletedTaskCount());
    }
    @SuppressWarnings("unchecked")
    private int resolveSegmentExecutorPoolSize(RedisUtil redisUtil) {
        if (redisUtil == null) {
            return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
        }
        try {
            Object systemConfigMapObj = redisUtil.get(com.zy.core.enums.RedisKeyType.SYSTEM_CONFIG_MAP.key);
            if (!(systemConfigMapObj instanceof HashMap)) {
                return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
            }
            HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
            String poolSizeText = systemConfigMap.get(CFG_SEGMENT_EXECUTOR_POOL_SIZE);
            if (poolSizeText == null || poolSizeText.trim().isEmpty()) {
                return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
            }
            int configured = Integer.parseInt(poolSizeText.trim());
            if (configured < 16) {
                return 16;
            }
            return Math.min(configured, 512);
        } catch (Exception ignore) {
            return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
        }
    }
    @Override
    public CommandResponse sendOriginCommand(String address, short[] data) {
        return zyStationConnectDriver.sendOriginCommand(address, data);