Junjie
13 小时以前 5ef79791cf93200c938b09dbd2461a7775391825
#线程日志
2个文件已修改
80 ■■■■■ 已修改文件
src/main/java/com/zy/core/network/ZyStationConnectDriver.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/network/ZyStationConnectDriver.java
@@ -29,6 +29,8 @@
    private static final ZyStationFakeSegConnect zyStationFakeSegConnect = new ZyStationFakeSegConnect();
    private static final ZyStationV4FakeSegConnect zyStationV4FakeSegConnect = new ZyStationV4FakeSegConnect();
    private static final long SEND_LOCK_WARN_MS = 3_000L;
    private static final long SEND_COST_WARN_MS = 5_000L;
    private volatile boolean connected = false;
    private volatile boolean connecting = false;
@@ -160,9 +162,12 @@
        if (!connected || connecting || connectApi == null) {
            return new CommandResponse(false, "设备未连接,命令下发失败");
        }
        long lockWaitStart = System.currentTimeMillis();
        int waitRounds = 0;
        while (true) {
            Object lock = redisUtil.get(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
            if(lock != null) {
                waitRounds++;
                try {
                    Thread.sleep(500);
                }catch (Exception e) {
@@ -173,9 +178,31 @@
                break;
            }
        }
        CommandResponse commandResponse = connectApi.sendCommand(deviceConfig.getDeviceNo(), command);
        long lockWaitCost = System.currentTimeMillis() - lockWaitStart;
        if (lockWaitCost >= SEND_LOCK_WARN_MS) {
            log.warn("输送命令等待全局发送锁超时,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, waitMs={}, waitRounds={}",
                    deviceConfig == null ? null : deviceConfig.getDeviceNo(),
                    command == null ? null : command.getTaskNo(),
                    command == null ? null : command.getStationId(),
                    command == null ? null : command.getTargetStaNo(),
                    lockWaitCost,
                    waitRounds);
        }
        long sendStart = System.currentTimeMillis();
        try {
            return connectApi.sendCommand(deviceConfig.getDeviceNo(), command);
        } finally {
        redisUtil.del(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
        return commandResponse;
            long sendCostMs = System.currentTimeMillis() - sendStart;
            if (sendCostMs >= SEND_COST_WARN_MS) {
                log.warn("输送命令底层发送耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, sendCostMs={}",
                        deviceConfig == null ? null : deviceConfig.getDeviceNo(),
                        command == null ? null : command.getTaskNo(),
                        command == null ? null : command.getStationId(),
                        command == null ? null : command.getTargetStaNo(),
                        sendCostMs);
            }
        }
    }
    public CommandResponse sendOriginCommand(String address, short[] data) {
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -36,12 +36,16 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@Data
@Slf4j
public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread {
    private static final int SEGMENT_EXECUTOR_POOL_SIZE = 64;
    private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20;
    private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48;
    private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L;
    private DeviceConfig deviceConfig;
    private RedisUtil redisUtil;
@@ -131,14 +135,33 @@
        if (task == null || task.getStep() == null || task.getStep() != 2) {
            return;
        }
        submitSegmentCommand((StationCommand) task.getData());
        StationCommand command = (StationCommand) task.getData();
        logExecutorAbnormal("queue-poll", command);
        submitSegmentCommand(command);
    }
    private void submitSegmentCommand(StationCommand command) {
        if (command == null || executor == null || segmentExecutor == null) {
            return;
        }
        executor.submit(() -> segmentExecutor.execute(command));
        executor.submit(() -> {
            long start = System.currentTimeMillis();
            try {
                segmentExecutor.execute(command);
            } finally {
                long costMs = System.currentTimeMillis() - start;
                if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
                    log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
                            deviceConfig.getDeviceNo(),
                            command.getTaskNo(),
                            command.getStationId(),
                            command.getTargetStaNo(),
                            costMs);
                    logExecutorAbnormal("segment-finish-slow", command);
                }
            }
        });
        logExecutorAbnormal("after-submit", command);
    }
    @Override
@@ -381,6 +404,28 @@
        return commandResponse;
    }
    private void logExecutorAbnormal(String scene, StationCommand command) {
        if (!(executor instanceof ThreadPoolExecutor)) {
            return;
        }
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
        int activeCount = threadPoolExecutor.getActiveCount();
        int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size();
        if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) {
            return;
        }
        log.warn("V5输送线程池出现堆积,scene={}, deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, poolSize={}, activeCount={}, queuedCount={}, completedCount={}",
                scene,
                deviceConfig.getDeviceNo(),
                command == null ? null : command.getTaskNo(),
                command == null ? null : command.getStationId(),
                command == null ? null : command.getTargetStaNo(),
                threadPoolExecutor.getPoolSize(),
                activeCount,
                queuedCount,
                threadPoolExecutor.getCompletedTaskCount());
    }
    @Override
    public CommandResponse sendOriginCommand(String address, short[] data) {
        return zyStationConnectDriver.sendOriginCommand(address, data);