Junjie
2026-04-13 b8640dc78123f4be2483feed2f48b9183983f51f
#站点运行优化
3个文件已添加
9个文件已修改
880 ■■■■ 已修改文件
src/main/java/com/zy/common/utils/RedisUtil.java 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/ServerBootstrap.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/enums/RedisKeyType.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/network/ZyStationConnectDriver.java 45 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/task/BasStationOptAsyncPublisher.java 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java 160 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/v5/StationV5RuntimeConfigProvider.java 216 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/v5/StationV5StatusReader.java 115 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/support/StationTaskLocationRegistry.java 158 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/utils/station/StationDispatchRuntimeStateSupport.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/common/utils/RedisUtil.java
@@ -4,9 +4,11 @@
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -209,6 +211,44 @@
            }
            redisTemplate.execute((RedisCallback<Void>) connection -> null);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    public boolean trySetStringIfAbsent(String key, String value, long timeSeconds) {
        if (key == null || value == null) {
            return false;
        }
        try {
            Boolean result;
            if (timeSeconds > 0) {
                result = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeSeconds, TimeUnit.SECONDS);
            } else {
                result = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
            }
            return Boolean.TRUE.equals(result);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    public boolean compareAndDelete(String key, String expectedValue) {
        if (key == null || expectedValue == null) {
            return false;
        }
        try {
            DefaultRedisScript<Long> script = new DefaultRedisScript<>();
            script.setScriptText(
                    "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                            "return redis.call('del', KEYS[1]) " +
                            "else return 0 end"
            );
            script.setResultType(Long.class);
            Long result = stringRedisTemplate.execute(script, Collections.singletonList(key), expectedValue);
            return result != null && result > 0;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
@@ -705,4 +745,3 @@
}
src/main/java/com/zy/core/ServerBootstrap.java
@@ -47,6 +47,10 @@
    private void clearStartupRuntimeLocks() {
        redisUtil.del(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
        java.util.Set<String> stationSendLockKeys = redisUtil.scanKeys(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key + ":", 2048);
        if (stationSendLockKeys != null && !stationSendLockKeys.isEmpty()) {
            redisUtil.del(stationSendLockKeys.toArray(new String[0]));
        }
//        News.info("系统启动时已清理输送站命令执行锁,key={}", RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
    }
src/main/java/com/zy/core/enums/RedisKeyType.java
@@ -48,6 +48,7 @@
    STATION_OUT_EXECUTE_LIMIT("station_out_execute_limit_"),
    STATION_OUT_PENDING_DISPATCH_("station_out_pending_dispatch_"),
    STATION_OUT_ORDER_DISPATCH_LIMIT_("station_out_order_dispatch_limit_"),
    STATION_RUN_BLOCK_DIRECT_REASSIGN_LIMIT_("station_run_block_direct_reassign_limit_"),
    STATION_OUT_EXECUTE_COMPLETE_LIMIT("station_out_execute_complete_limit_"),
    CHECK_STATION_RUN_BLOCK_LIMIT_("check_station_run_block_limit_"),
    STATION_RUN_BLOCK_REROUTE_STATE_("station_run_block_reroute_state_"),
src/main/java/com/zy/core/network/ZyStationConnectDriver.java
@@ -20,6 +20,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.UUID;
/**
 * 输送站连接驱动
@@ -29,8 +30,10 @@
    private static final ZyStationFakeSegConnect zyStationFakeSegConnect = new ZyStationFakeSegConnect();
    private static final ZyStationV4FakeSegConnect zyStationV4FakeSegConnect = new ZyStationV4FakeSegConnect();
    private static final long SEND_LOCK_WARN_MS = 3_000L;
    private static final long SEND_LOCK_WARN_MS = 500L;
    private static final long SEND_COST_WARN_MS = 5_000L;
    private static final long SEND_LOCK_POLL_MS = 20L;
    private static final long SEND_LOCK_EXPIRE_SECONDS = 15L;
    private volatile boolean connected = false;
    private volatile boolean connecting = false;
@@ -162,37 +165,39 @@
        if (!connected || connecting || connectApi == null) {
            return new CommandResponse(false, "设备未连接,命令下发失败");
        }
        String lockKey = buildStationExecuteLockKey();
        String lockToken = UUID.randomUUID().toString();
        long lockWaitStart = System.currentTimeMillis();
        int waitRounds = 0;
        while (true) {
            Object lock = redisUtil.get(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
            if(lock != null) {
        if (redisUtil != null) {
            while (true) {
                if (redisUtil.trySetStringIfAbsent(lockKey, lockToken, SEND_LOCK_EXPIRE_SECONDS)) {
                    break;
                }
                waitRounds++;
                try {
                    Thread.sleep(500);
                }catch (Exception e) {
                    Thread.sleep(SEND_LOCK_POLL_MS);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }else {
                redisUtil.set(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key, "lock", 60 * 5);
                break;
            }
        }
        long lockWaitCost = System.currentTimeMillis() - lockWaitStart;
        if (lockWaitCost >= SEND_LOCK_WARN_MS) {
            log.warn("输送命令等待全局发送锁超时,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, waitMs={}, waitRounds={}",
            log.warn("输送命令等待设备发送锁超时,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, waitMs={}, waitRounds={}, lockKey={}",
                    deviceConfig == null ? null : deviceConfig.getDeviceNo(),
                    command == null ? null : command.getTaskNo(),
                    command == null ? null : command.getStationId(),
                    command == null ? null : command.getTargetStaNo(),
                    lockWaitCost,
                    waitRounds);
                    waitRounds,
                    lockKey);
        }
        long sendStart = System.currentTimeMillis();
        try {
            return connectApi.sendCommand(deviceConfig.getDeviceNo(), command);
        } finally {
            redisUtil.del(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
            releaseDeviceSendLock(lockKey, lockToken);
            long sendCostMs = System.currentTimeMillis() - sendStart;
            if (sendCostMs >= SEND_COST_WARN_MS) {
                log.warn("输送命令底层发送耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, sendCostMs={}",
@@ -205,6 +210,22 @@
        }
    }
    private String buildStationExecuteLockKey() {
        Integer deviceNo = deviceConfig == null ? null : deviceConfig.getDeviceNo();
        return RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key + ":" + deviceNo;
    }
    private void releaseDeviceSendLock(String lockKey, String lockToken) {
        if (redisUtil == null || lockKey == null) {
            return;
        }
        try {
            redisUtil.compareAndDelete(lockKey, lockToken);
        } catch (Exception e) {
            log.warn("释放输送设备发送锁失败,lockKey={}", lockKey, e);
        }
    }
    public CommandResponse sendOriginCommand(String address, short[] data) {
        ZyStationConnectApi connectApi = zyStationConnectApi;
        if (!connected || connecting || connectApi == null) {
src/main/java/com/zy/core/task/BasStationOptAsyncPublisher.java
New file
@@ -0,0 +1,60 @@
package com.zy.core.task;
import com.zy.asrs.entity.BasStationOpt;
import com.zy.asrs.service.BasStationOptService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
@Slf4j
@Component
public class BasStationOptAsyncPublisher {
    private static final String LANE_NAME = "bas-station-opt-save";
    private static final String TASK_NAME = "publish-bas-station-opt";
    private static final long MIN_INTERVAL_MS = 0L;
    private static final int DEFAULT_QUEUE_CAPACITY = 2048;
    private final ArrayBlockingQueue<BasStationOpt> pendingQueue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
    @Autowired
    private MainProcessTaskSubmitter mainProcessTaskSubmitter;
    @Autowired
    private BasStationOptService basStationOptService;
    public boolean publish(BasStationOpt basStationOpt) {
        if (basStationOpt == null) {
            return true;
        }
        if (!pendingQueue.offer(basStationOpt)) {
            log.error("BasStationOpt async publish queue full, fallback to sync save, taskNo={}, stationId={}, targetStationId={}",
                    basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId());
            return false;
        }
        mainProcessTaskSubmitter.submitSerialTask(LANE_NAME, TASK_NAME, MIN_INTERVAL_MS, this::drain);
        return true;
    }
    private void drain() {
        while (true) {
            BasStationOpt basStationOpt = pendingQueue.poll();
            if (basStationOpt == null) {
                return;
            }
            try {
                basStationOptService.save(basStationOpt);
            } catch (Exception e) {
                log.error("BasStationOpt async publish error, fallback to sync save next time, taskNo={}, stationId={}, targetStationId={}",
                        basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), e);
                try {
                    basStationOptService.save(basStationOpt);
                } catch (Exception ex) {
                    log.error("BasStationOpt sync fallback save error, taskNo={}, stationId={}, targetStationId={}",
                            basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), ex);
                }
            }
        }
    }
}
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -21,6 +21,8 @@
import com.zy.core.network.ZyStationConnectDriver;
import com.zy.core.network.entity.ZyStationStatusEntity;
import com.zy.core.service.StationTaskLoopService;
import com.zy.core.task.BasStationOptAsyncPublisher;
import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider;
import com.zy.core.thread.impl.v5.StationV5RunBlockReroutePlanner;
import com.zy.core.thread.impl.v5.StationV5SegmentExecutor;
import com.zy.core.thread.impl.v5.StationV5StatusReader;
@@ -34,39 +36,54 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread {
    private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
    private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize";
    private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512;
    private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20;
    private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48;
    private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L;
    private static final int QUEUE_DRAIN_BATCH_SIZE = 32;
    private static final long QUEUE_IDLE_SLEEP_MS = 20L;
    private DeviceConfig deviceConfig;
    private RedisUtil redisUtil;
    private ZyStationConnectDriver zyStationConnectDriver;
    private final ExecutorService executor;
    private final ThreadPoolExecutor executor;
    private StationV5SegmentExecutor segmentExecutor;
    private final RecentStationArrivalTracker recentArrivalTracker;
    private final StationV5StatusReader statusReader;
    private final StationV5RunBlockReroutePlanner runBlockReroutePlanner;
    private final int executorQueueCapacity;
    private final BasStationOptAsyncPublisher basStationOptAsyncPublisher;
    public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
        this.redisUtil = redisUtil;
        int poolSize = resolveSegmentExecutorPoolSize(redisUtil);
        this.executor = Executors.newFixedThreadPool(poolSize);
        StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class);
        int poolSize = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE : configProvider.getSegmentExecutorPoolSize();
        this.executorQueueCapacity = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY : configProvider.getSegmentExecutorQueueCapacity();
        this.executor = new ThreadPoolExecutor(
                poolSize,
                poolSize,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(executorQueueCapacity)
        );
        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
        this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
        this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker);
        this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil);
        log.info("初始化V5输送线程池,deviceNo={}, poolSize={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize);
        this.basStationOptAsyncPublisher = SpringUtils.getBean(BasStationOptAsyncPublisher.class);
        log.info("初始化V5输送线程池,deviceNo={}, poolSize={}, queueCapacity={}",
                deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize, executorQueueCapacity);
    }
    @Override
@@ -89,8 +106,10 @@
        Thread processThread = new Thread(() -> {
            while (true) {
                try {
                    pollAndDispatchQueuedCommand();
                    Thread.sleep(100);
                    int dispatchedCount = pollAndDispatchQueuedCommandBatch();
                    if (dispatchedCount <= 0) {
                        Thread.sleep(QUEUE_IDLE_SLEEP_MS);
                    }
                } catch (Exception e) {
                    log.error("StationV5Process Fail", e);
                }
@@ -139,38 +158,71 @@
        return statusReader.getTaskNoList();
    }
    private void pollAndDispatchQueuedCommand() {
        Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
        if (task == null || task.getStep() == null || task.getStep() != 2) {
            return;
    private int pollAndDispatchQueuedCommandBatch() {
        int dispatchedCount = 0;
        while (dispatchedCount < QUEUE_DRAIN_BATCH_SIZE) {
            if (isExecutorQueueAtWatermark()) {
                logExecutorAbnormal("executor-watermark", null);
                break;
            }
            Task task = MessageQueue.peek(SlaveType.Devp, deviceConfig.getDeviceNo());
            if (task == null || task.getStep() == null || task.getStep() != 2) {
                break;
            }
            StationCommand command = (StationCommand) task.getData();
            logExecutorAbnormal("queue-peek", command);
            if (!submitSegmentCommand(command)) {
                logExecutorAbnormal("submit-rejected", command);
                break;
            }
            MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
            dispatchedCount++;
        }
        StationCommand command = (StationCommand) task.getData();
        logExecutorAbnormal("queue-poll", command);
        submitSegmentCommand(command);
        return dispatchedCount;
    }
    private void submitSegmentCommand(StationCommand command) {
    private boolean submitSegmentCommand(StationCommand command) {
        if (command == null || executor == null || segmentExecutor == null) {
            return;
            return false;
        }
        executor.submit(() -> {
            long start = System.currentTimeMillis();
            try {
                segmentExecutor.execute(command);
            } finally {
                long costMs = System.currentTimeMillis() - start;
                if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
                    log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
                            deviceConfig.getDeviceNo(),
                            command.getTaskNo(),
                            command.getStationId(),
                            command.getTargetStaNo(),
                            costMs);
                    logExecutorAbnormal("segment-finish-slow", command);
        try {
            executor.execute(() -> {
                long start = System.currentTimeMillis();
                try {
                    segmentExecutor.execute(command);
                } finally {
                    long costMs = System.currentTimeMillis() - start;
                    if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
                        log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
                                deviceConfig.getDeviceNo(),
                                command.getTaskNo(),
                                command.getStationId(),
                                command.getTargetStaNo(),
                                costMs);
                        logExecutorAbnormal("segment-finish-slow", command);
                    }
                }
            }
        });
            });
        } catch (RejectedExecutionException e) {
            log.error("V5输送线程池拒绝执行,保留设备队列积压,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, activeCount={}, queuedCount={}, queueCapacity={}",
                    deviceConfig.getDeviceNo(),
                    command.getTaskNo(),
                    command.getStationId(),
                    command.getTargetStaNo(),
                    executor.getActiveCount(),
                    executor.getQueue() == null ? 0 : executor.getQueue().size(),
                    executorQueueCapacity);
            return false;
        }
        logExecutorAbnormal("after-submit", command);
        return true;
    }
    private boolean isExecutorQueueAtWatermark() {
        if (executor == null || executor.getQueue() == null) {
            return false;
        }
        return executor.getQueue().size() >= executorQueueCapacity;
    }
    @Override
@@ -408,18 +460,19 @@
                    commandResponse != null && Boolean.TRUE.equals(commandResponse.getResult()) ? 1 : 0,
                    JSON.toJSONString(commandResponse)
            );
            optService.save(basStationOpt);
            if (basStationOptAsyncPublisher == null || !basStationOptAsyncPublisher.publish(basStationOpt)) {
                optService.save(basStationOpt);
            }
        }
        return commandResponse;
    }
    private void logExecutorAbnormal(String scene, StationCommand command) {
        if (!(executor instanceof ThreadPoolExecutor)) {
        if (executor == null) {
            return;
        }
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
        int activeCount = threadPoolExecutor.getActiveCount();
        int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size();
        int activeCount = executor.getActiveCount();
        int queuedCount = executor.getQueue() == null ? 0 : executor.getQueue().size();
        if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) {
            return;
        }
@@ -429,35 +482,10 @@
                command == null ? null : command.getTaskNo(),
                command == null ? null : command.getStationId(),
                command == null ? null : command.getTargetStaNo(),
                threadPoolExecutor.getPoolSize(),
                executor.getPoolSize(),
                activeCount,
                queuedCount,
                threadPoolExecutor.getCompletedTaskCount());
    }
    @SuppressWarnings("unchecked")
    private int resolveSegmentExecutorPoolSize(RedisUtil redisUtil) {
        if (redisUtil == null) {
            return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
        }
        try {
            Object systemConfigMapObj = redisUtil.get(com.zy.core.enums.RedisKeyType.SYSTEM_CONFIG_MAP.key);
            if (!(systemConfigMapObj instanceof HashMap)) {
                return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
            }
            HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
            String poolSizeText = systemConfigMap.get(CFG_SEGMENT_EXECUTOR_POOL_SIZE);
            if (poolSizeText == null || poolSizeText.trim().isEmpty()) {
                return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
            }
            int configured = Integer.parseInt(poolSizeText.trim());
            if (configured < 16) {
                return 16;
            }
            return Math.min(configured, 512);
        } catch (Exception ignore) {
            return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
        }
                executor.getCompletedTaskCount());
    }
    @Override
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -15,6 +15,8 @@
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.move.StationMoveCoordinator;
import com.zy.core.trace.StationTaskTraceRegistry;
import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider;
import com.zy.core.thread.support.StationTaskLocationRegistry;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
@@ -29,6 +31,7 @@
    private static final String CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio";
    private static final double DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = 0.3d;
    private static final long CURRENT_STATION_TIMEOUT_MS = 1000L * 60L;
    private static final long TASK_LOCATION_STALE_MS = 2_000L;
    private final DeviceConfig deviceConfig;
    private final RedisUtil redisUtil;
@@ -310,6 +313,13 @@
    }
    private double loadSegmentAdvanceRatio() {
        if (isV5ThreadImpl()) {
            StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class);
            if (configProvider != null) {
                return configProvider.getSegmentAdvanceRatio();
            }
            return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
        }
        try {
            ConfigService configService = SpringUtils.getBean(ConfigService.class);
            if (configService == null) {
@@ -363,6 +373,9 @@
    }
    private StationProtocol findCurrentStationByTask(Integer taskNo) {
        if (isV5ThreadImpl()) {
            return findCurrentStationByTaskFromRegistry(taskNo);
        }
        try {
            com.zy.asrs.service.DeviceConfigService deviceConfigService = SpringUtils.getBean(com.zy.asrs.service.DeviceConfigService.class);
            if (deviceConfigService == null) {
@@ -391,6 +404,27 @@
        return null;
    }
    private StationProtocol findCurrentStationByTaskFromRegistry(Integer taskNo) {
        StationTaskLocationRegistry registry = SpringUtils.getBean(StationTaskLocationRegistry.class);
        if (registry == null) {
            return null;
        }
        StationTaskLocationRegistry.TaskLocationSnapshot snapshot = registry.findActive(taskNo, TASK_LOCATION_STALE_MS);
        if (snapshot == null || !snapshot.isLoading()) {
            return null;
        }
        StationProtocol stationProtocol = new StationProtocol();
        stationProtocol.setTaskNo(snapshot.getTaskNo());
        stationProtocol.setStationId(snapshot.getStationId());
        stationProtocol.setRunBlock(snapshot.isRunBlock());
        stationProtocol.setLoading(true);
        return stationProtocol;
    }
    private boolean isV5ThreadImpl() {
        return deviceConfig != null && "ZyStationV5Thread".equals(deviceConfig.getThreadImpl());
    }
    private List<StationTaskTraceSegmentVo> buildTraceSegments(List<StationCommand> segmentCommands) {
        List<StationTaskTraceSegmentVo> result = new ArrayList<>();
        if (segmentCommands == null) {
src/main/java/com/zy/core/thread/impl/v5/StationV5RuntimeConfigProvider.java
New file
@@ -0,0 +1,216 @@
package com.zy.core.thread.impl.v5;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.core.common.Cools;
import com.zy.common.utils.RedisUtil;
import com.zy.core.enums.RedisKeyType;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@Slf4j
@Component
public class StationV5RuntimeConfigProvider {
    private static final String CFG_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio";
    private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize";
    private static final String CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY = "stationV5SegmentExecutorQueueCapacity";
    private static final String CFG_CONFIG_REFRESH_SECONDS = "stationCommandConfigRefreshSeconds";
    private static final double DEFAULT_SEGMENT_ADVANCE_RATIO = 0.3d;
    private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
    private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512;
    private static final int DEFAULT_CONFIG_REFRESH_SECONDS = 30;
    @Autowired(required = false)
    private RedisUtil redisUtil;
    @Autowired(required = false)
    private ConfigService configService;
    private volatile CacheSnapshot cacheSnapshot = new CacheSnapshot(
            DEFAULT_SEGMENT_ADVANCE_RATIO,
            DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE,
            DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY,
            DEFAULT_CONFIG_REFRESH_SECONDS,
            0L
    );
    public double getSegmentAdvanceRatio() {
        return loadSnapshot().segmentAdvanceRatio;
    }
    public int getSegmentExecutorPoolSize() {
        return loadSnapshot().segmentExecutorPoolSize;
    }
    public int getSegmentExecutorQueueCapacity() {
        return loadSnapshot().segmentExecutorQueueCapacity;
    }
    private CacheSnapshot loadSnapshot() {
        CacheSnapshot snapshot = cacheSnapshot;
        long now = System.currentTimeMillis();
        if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) {
            return snapshot;
        }
        synchronized (this) {
            snapshot = cacheSnapshot;
            now = System.currentTimeMillis();
            if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) {
                return snapshot;
            }
            CacheSnapshot refreshed = refreshSnapshot(now);
            cacheSnapshot = refreshed;
            return refreshed;
        }
    }
    @SuppressWarnings("unchecked")
    private CacheSnapshot refreshSnapshot(long now) {
        HashMap<String, String> systemConfigMap = null;
        try {
            Object systemConfigMapObj = redisUtil == null ? null : redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
            if (systemConfigMapObj instanceof HashMap) {
                systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
            }
        } catch (Exception e) {
            log.warn("加载 V5 输送运行时配置缓存失败,fallback to db. reason=redis-read-error", e);
        }
        int refreshSeconds = normalizeRefreshSeconds(readConfigText(systemConfigMap, CFG_CONFIG_REFRESH_SECONDS));
        double segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigText(systemConfigMap, CFG_SEGMENT_ADVANCE_RATIO));
        int poolSize = normalizePoolSize(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_POOL_SIZE));
        int queueCapacity = normalizeQueueCapacity(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY));
        if (systemConfigMap == null) {
            segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigTextFromDb(CFG_SEGMENT_ADVANCE_RATIO), segmentAdvanceRatio);
            poolSize = normalizePoolSize(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_POOL_SIZE), poolSize);
            queueCapacity = normalizeQueueCapacity(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY), queueCapacity);
            refreshSeconds = normalizeRefreshSeconds(readConfigTextFromDb(CFG_CONFIG_REFRESH_SECONDS), refreshSeconds);
        }
        return new CacheSnapshot(segmentAdvanceRatio, poolSize, queueCapacity, refreshSeconds, now);
    }
    private String readConfigText(HashMap<String, String> systemConfigMap, String code) {
        if (systemConfigMap == null || code == null) {
            return null;
        }
        return systemConfigMap.get(code);
    }
    private String readConfigTextFromDb(String code) {
        if (configService == null || code == null) {
            return null;
        }
        try {
            Config config = configService.getOne(new QueryWrapper<Config>().eq("code", code));
            return config == null ? null : config.getValue();
        } catch (Exception e) {
            log.warn("加载 V5 输送运行时配置数据库失败,code={}", code, e);
            return null;
        }
    }
    private double normalizeSegmentAdvanceRatio(String valueText) {
        return normalizeSegmentAdvanceRatio(valueText, DEFAULT_SEGMENT_ADVANCE_RATIO);
    }
    private double normalizeSegmentAdvanceRatio(String valueText, double defaultValue) {
        if (valueText == null) {
            return defaultValue;
        }
        String text = valueText.trim();
        if (text.isEmpty()) {
            return defaultValue;
        }
        if (text.endsWith("%")) {
            text = text.substring(0, text.length() - 1).trim();
        }
        try {
            double ratio = Double.parseDouble(text);
            if (ratio > 1d && ratio <= 100d) {
                ratio = ratio / 100d;
            }
            if (ratio < 0d) {
                return 0d;
            }
            if (ratio > 1d) {
                return 1d;
            }
            return ratio;
        } catch (Exception ignore) {
            return defaultValue;
        }
    }
    private int normalizePoolSize(String valueText) {
        return normalizePoolSize(valueText, DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE);
    }
    private int normalizePoolSize(String valueText, int defaultValue) {
        int configured = parsePositiveInt(valueText, defaultValue);
        if (configured < 16) {
            return 16;
        }
        return Math.min(configured, 512);
    }
    private int normalizeQueueCapacity(String valueText) {
        return normalizeQueueCapacity(valueText, DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY);
    }
    private int normalizeQueueCapacity(String valueText, int defaultValue) {
        int configured = parsePositiveInt(valueText, defaultValue);
        if (configured < 64) {
            return 64;
        }
        return Math.min(configured, 4096);
    }
    private int normalizeRefreshSeconds(String valueText) {
        return normalizeRefreshSeconds(valueText, DEFAULT_CONFIG_REFRESH_SECONDS);
    }
    private int normalizeRefreshSeconds(String valueText, int defaultValue) {
        int configured = parsePositiveInt(valueText, defaultValue);
        if (configured < 5) {
            return 5;
        }
        return Math.min(configured, 300);
    }
    private int parsePositiveInt(String valueText, int defaultValue) {
        if (Cools.isEmpty(valueText)) {
            return defaultValue;
        }
        try {
            return Integer.parseInt(valueText.trim());
        } catch (Exception ignore) {
            return defaultValue;
        }
    }
    private static class CacheSnapshot {
        private final double segmentAdvanceRatio;
        private final int segmentExecutorPoolSize;
        private final int segmentExecutorQueueCapacity;
        private final int refreshSeconds;
        private final long loadedAtMs;
        private CacheSnapshot(double segmentAdvanceRatio,
                              int segmentExecutorPoolSize,
                              int segmentExecutorQueueCapacity,
                              int refreshSeconds,
                              long loadedAtMs) {
            this.segmentAdvanceRatio = segmentAdvanceRatio;
            this.segmentExecutorPoolSize = segmentExecutorPoolSize;
            this.segmentExecutorQueueCapacity = segmentExecutorQueueCapacity;
            this.refreshSeconds = refreshSeconds;
            this.loadedAtMs = loadedAtMs;
        }
    }
}
src/main/java/com/zy/core/thread/impl/v5/StationV5StatusReader.java
@@ -20,6 +20,7 @@
import com.zy.core.task.DeviceAsyncLogPublisher;
import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.zy.core.thread.support.StationErrLogSupport;
import com.zy.core.thread.support.StationTaskLocationRegistry;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -27,6 +28,7 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
public class StationV5StatusReader {
@@ -34,7 +36,9 @@
    private final RedisUtil redisUtil;
    private final RecentStationArrivalTracker recentArrivalTracker;
    private final DeviceAsyncLogPublisher devpAsyncLogPublisher;
    private final StationTaskLocationRegistry stationTaskLocationRegistry;
    private final List<StationProtocol> statusList = new ArrayList<>();
    private final Map<Integer, StationProtocol> statusMap = new HashMap<>();
    private volatile List<Integer> taskNoList = new ArrayList<>();
    private boolean initialized = false;
    private long deviceDataLogTime = System.currentTimeMillis();
@@ -46,6 +50,7 @@
        this.redisUtil = redisUtil;
        this.recentArrivalTracker = recentArrivalTracker;
        this.devpAsyncLogPublisher = SpringUtils.getBean(DeviceAsyncLogPublisher.class);
        this.stationTaskLocationRegistry = SpringUtils.getBean(StationTaskLocationRegistry.class);
    }
    public void readStatus(ZyStationConnectDriver zyStationConnectDriver) {
@@ -74,57 +79,72 @@
                StationProtocol stationProtocol = new StationProtocol();
                stationProtocol.setStationId(entity.getStationId());
                statusList.add(stationProtocol);
                statusMap.put(entity.getStationId(), stationProtocol);
            }
            initialized = true;
        }
        int deviceLogCollectTime = initialized ? Utils.getDeviceLogCollectTime() : 200;
        List<ZyStationStatusEntity> zyStationStatusEntities = zyStationConnectDriver.getStatus();
        if (zyStationStatusEntities == null || zyStationStatusEntities.isEmpty()) {
            return;
        }
        LinkedHashSet<Integer> taskNoSet = new LinkedHashSet<>();
        LinkedHashSet<Integer> loadingTaskNoSet = new LinkedHashSet<>();
        long observeAt = System.currentTimeMillis();
        for (ZyStationStatusEntity statusEntity : zyStationStatusEntities) {
            for (StationProtocol stationProtocol : statusList) {
                if (stationProtocol.getStationId().equals(statusEntity.getStationId())) {
                    stationProtocol.setTaskNo(statusEntity.getTaskNo());
                    stationProtocol.setTargetStaNo(statusEntity.getTargetStaNo());
                    stationProtocol.setAutoing(statusEntity.isAutoing());
                    stationProtocol.setLoading(statusEntity.isLoading());
                    stationProtocol.setInEnable(statusEntity.isInEnable());
                    stationProtocol.setOutEnable(statusEntity.isOutEnable());
                    stationProtocol.setEmptyMk(statusEntity.isEmptyMk());
                    stationProtocol.setFullPlt(statusEntity.isFullPlt());
                    stationProtocol.setPalletHeight(statusEntity.getPalletHeight());
                    stationProtocol.setError(statusEntity.getError());
                    stationProtocol.setErrorMsg(statusEntity.getErrorMsg());
                    stationProtocol.setBarcode(statusEntity.getBarcode());
                    stationProtocol.setRunBlock(statusEntity.isRunBlock());
                    stationProtocol.setEnableIn(statusEntity.isEnableIn());
                    stationProtocol.setWeight(statusEntity.getWeight());
                    stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
                    stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
                    stationProtocol.setIoMode(statusEntity.getIoMode());
                    stationProtocol.setInBarcodeError(statusEntity.isInBarcodeError());
                    if (statusEntity.getTaskNo() != null && statusEntity.getTaskNo() > 0) {
                        taskNoSet.add(statusEntity.getTaskNo());
                    }
                    if (statusEntity.getTaskBufferItems() != null) {
                        statusEntity.getTaskBufferItems().forEach(item -> {
                            Integer bufferTaskNo = item == null ? null : item.getTaskNo();
                            if (bufferTaskNo != null && bufferTaskNo > 0) {
                                taskNoSet.add(bufferTaskNo);
                            }
                        });
                    }
                    recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
            if (statusEntity == null || statusEntity.getStationId() == null) {
                continue;
            }
            StationProtocol stationProtocol = statusMap.get(statusEntity.getStationId());
            if (stationProtocol == null) {
                continue;
            }
            stationProtocol.setTaskNo(statusEntity.getTaskNo());
            stationProtocol.setTargetStaNo(statusEntity.getTargetStaNo());
            stationProtocol.setAutoing(statusEntity.isAutoing());
            stationProtocol.setLoading(statusEntity.isLoading());
            stationProtocol.setInEnable(statusEntity.isInEnable());
            stationProtocol.setOutEnable(statusEntity.isOutEnable());
            stationProtocol.setEmptyMk(statusEntity.isEmptyMk());
            stationProtocol.setFullPlt(statusEntity.isFullPlt());
            stationProtocol.setPalletHeight(statusEntity.getPalletHeight());
            stationProtocol.setError(statusEntity.getError());
            stationProtocol.setErrorMsg(statusEntity.getErrorMsg());
            stationProtocol.setBarcode(statusEntity.getBarcode());
            stationProtocol.setRunBlock(statusEntity.isRunBlock());
            stationProtocol.setEnableIn(statusEntity.isEnableIn());
            stationProtocol.setWeight(statusEntity.getWeight());
            stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
            stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
            stationProtocol.setIoMode(statusEntity.getIoMode());
            stationProtocol.setInBarcodeError(statusEntity.isInBarcodeError());
            if (statusEntity.getTaskNo() != null && statusEntity.getTaskNo() > 0) {
                taskNoSet.add(statusEntity.getTaskNo());
                if (statusEntity.isLoading()) {
                    loadingTaskNoSet.add(statusEntity.getTaskNo());
                }
                if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
                    if (stationProtocol.isAutoing() && !stationProtocol.isLoading()) {
                        stationProtocol.setSystemWarning("");
            }
            if (statusEntity.getTaskBufferItems() != null) {
                statusEntity.getTaskBufferItems().forEach(item -> {
                    Integer bufferTaskNo = item == null ? null : item.getTaskNo();
                    if (bufferTaskNo != null && bufferTaskNo > 0) {
                        taskNoSet.add(bufferTaskNo);
                    }
                }
                });
            }
            recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
            syncTaskLocation(statusEntity, observeAt);
            if (!Cools.isEmpty(stationProtocol.getSystemWarning())
                    && stationProtocol.isAutoing()
                    && !stationProtocol.isLoading()) {
                stationProtocol.setSystemWarning("");
            }
        }
        taskNoList = new ArrayList<>(taskNoSet);
        if (stationTaskLocationRegistry != null) {
            stationTaskLocationRegistry.cleanupByDevice(deviceConfig.getDeviceNo(), loadingTaskNoSet);
        }
        OutputQueue.DEVP.offer(MessageFormat.format("【{0}】[id:{1}] <<<<< 实时数据更新成功",
                DateUtils.convert(new Date()), deviceConfig.getDeviceNo()));
@@ -173,4 +193,23 @@
    public List<Integer> getTaskNoList() {
        return taskNoList;
    }
    private void syncTaskLocation(ZyStationStatusEntity statusEntity, long observeAt) {
        if (stationTaskLocationRegistry == null || statusEntity == null) {
            return;
        }
        Integer taskNo = statusEntity.getTaskNo();
        if (taskNo != null && taskNo > 0 && statusEntity.isLoading()) {
            stationTaskLocationRegistry.update(
                    taskNo,
                    deviceConfig.getDeviceNo(),
                    statusEntity.getStationId(),
                    true,
                    statusEntity.isRunBlock(),
                    observeAt
            );
            return;
        }
        stationTaskLocationRegistry.remove(taskNo, deviceConfig.getDeviceNo(), statusEntity.getStationId());
    }
}
src/main/java/com/zy/core/thread/support/StationTaskLocationRegistry.java
New file
@@ -0,0 +1,158 @@
package com.zy.core.thread.support;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class StationTaskLocationRegistry {
    private static final long DEFAULT_STALE_THRESHOLD_MS = 2_000L;
    private static final long LOOKUP_WARN_INTERVAL_MS = 5_000L;
    private final ConcurrentHashMap<Integer, TaskLocationSnapshot> loadingTaskLocationMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Integer, Long> lookupWarnAtMap = new ConcurrentHashMap<>();
    public void update(Integer taskNo,
                       Integer deviceNo,
                       Integer stationId,
                       boolean loading,
                       boolean runBlock,
                       long updateTime) {
        if (taskNo == null || taskNo <= 0) {
            return;
        }
        if (!loading || deviceNo == null || stationId == null) {
            remove(taskNo, null, null);
            return;
        }
        loadingTaskLocationMap.put(taskNo, new TaskLocationSnapshot(taskNo, deviceNo, stationId, true, runBlock, updateTime));
    }
    public void remove(Integer taskNo, Integer deviceNo, Integer stationId) {
        if (taskNo == null || taskNo <= 0) {
            return;
        }
        loadingTaskLocationMap.computeIfPresent(taskNo, (key, snapshot) -> {
            if (snapshot == null) {
                return null;
            }
            if (deviceNo != null && !Objects.equals(deviceNo, snapshot.getDeviceNo())) {
                return snapshot;
            }
            if (stationId != null && !Objects.equals(stationId, snapshot.getStationId())) {
                return snapshot;
            }
            return null;
        });
    }
    public TaskLocationSnapshot findActive(Integer taskNo) {
        return findActive(taskNo, DEFAULT_STALE_THRESHOLD_MS);
    }
    public TaskLocationSnapshot findActive(Integer taskNo, long staleThresholdMs) {
        if (taskNo == null || taskNo <= 0) {
            return null;
        }
        TaskLocationSnapshot snapshot = loadingTaskLocationMap.get(taskNo);
        if (snapshot == null) {
            warnLookup(taskNo, "miss", null);
            return null;
        }
        long ageMs = Math.max(0L, System.currentTimeMillis() - snapshot.getUpdateTime());
        if (ageMs > staleThresholdMs) {
            warnLookup(taskNo, "stale", ageMs);
            return null;
        }
        return snapshot;
    }
    public void cleanupByDevice(Integer deviceNo, Iterable<Integer> activeTaskNoList) {
        if (deviceNo == null) {
            return;
        }
        ConcurrentHashMap<Integer, Boolean> activeMap = new ConcurrentHashMap<>();
        if (activeTaskNoList != null) {
            for (Integer taskNo : activeTaskNoList) {
                if (taskNo != null && taskNo > 0) {
                    activeMap.put(taskNo, Boolean.TRUE);
                }
            }
        }
        for (Map.Entry<Integer, TaskLocationSnapshot> entry : loadingTaskLocationMap.entrySet()) {
            TaskLocationSnapshot snapshot = entry.getValue();
            if (snapshot == null || !Objects.equals(deviceNo, snapshot.getDeviceNo())) {
                continue;
            }
            if (!activeMap.containsKey(entry.getKey())) {
                loadingTaskLocationMap.remove(entry.getKey(), snapshot);
            }
        }
    }
    private void warnLookup(Integer taskNo, String reason, Long ageMs) {
        long now = System.currentTimeMillis();
        Long lastWarnAt = lookupWarnAtMap.get(taskNo);
        if (lastWarnAt != null && now - lastWarnAt < LOOKUP_WARN_INTERVAL_MS) {
            return;
        }
        lookupWarnAtMap.put(taskNo, now);
        if (ageMs == null) {
            log.warn("task-location-registry miss, taskNo={}", taskNo);
            return;
        }
        log.warn("task-location-registry stale, taskNo={}, ageMs={}", taskNo, ageMs);
    }
    public static class TaskLocationSnapshot {
        private final Integer taskNo;
        private final Integer deviceNo;
        private final Integer stationId;
        private final boolean loading;
        private final boolean runBlock;
        private final long updateTime;
        public TaskLocationSnapshot(Integer taskNo,
                                    Integer deviceNo,
                                    Integer stationId,
                                    boolean loading,
                                    boolean runBlock,
                                    long updateTime) {
            this.taskNo = taskNo;
            this.deviceNo = deviceNo;
            this.stationId = stationId;
            this.loading = loading;
            this.runBlock = runBlock;
            this.updateTime = updateTime;
        }
        public Integer getTaskNo() {
            return taskNo;
        }
        public Integer getDeviceNo() {
            return deviceNo;
        }
        public Integer getStationId() {
            return stationId;
        }
        public boolean isLoading() {
            return loading;
        }
        public boolean isRunBlock() {
            return runBlock;
        }
        public long getUpdateTime() {
            return updateTime;
        }
    }
}
src/main/java/com/zy/core/utils/station/StationDispatchRuntimeStateSupport.java
@@ -137,6 +137,13 @@
        return tryAcquireLock(RedisKeyType.STATION_OUT_ORDER_DISPATCH_LIMIT_.key + wrkNo + "_" + stationId, seconds);
    }
    public boolean tryAcquireRunBlockDirectReassignLock(Integer wrkNo, Integer stationId, int seconds) {
        if (wrkNo == null || wrkNo <= 0 || stationId == null) {
            return true;
        }
        return tryAcquireLock(RedisKeyType.STATION_RUN_BLOCK_DIRECT_REASSIGN_LIMIT_.key + wrkNo + "_" + stationId, seconds);
    }
    public void signalSegmentReset(Integer taskNo, long waitMs) {
        if (redisUtil == null || taskNo == null || taskNo <= 0) {
            return;
src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
@@ -51,6 +51,7 @@
public class StationRerouteProcessor {
    private static final int OUT_ORDER_DISPATCH_LIMIT_SECONDS = 2;
    private static final long STATION_MOVE_RESET_WAIT_MS = 1000L;
    private static final int RUN_BLOCK_DIRECT_REASSIGN_LIMIT_SECONDS = 15 * 60;
    @Autowired
    private BasDevpService basDevpService;
@@ -447,15 +448,26 @@
        if (basDevp == null || stationThread == null || stationProtocol == null || wrkMast == null) {
            return;
        }
        if (isDirectReassignContextStale(stationProtocol, wrkMast)) {
            return;
        }
        int currentTaskBufferCommandCount = countCurrentTaskBufferCommands(
                stationProtocol.getTaskBufferItems(),
                stationProtocol.getTaskNo()
        );
        if (currentTaskBufferCommandCount > 0) {
            News.info("输送站点运行堵塞重分配已跳过,缓存区仍存在当前任务命令。站点号={},工作号={},当前任务命令数={}",
            News.info("输送站点运行堵塞重分配检测到旧分段命令残留,将先重置本地分段状态后继续重发。站点号={},工作号={},当前任务命令数={}",
                    stationProtocol.getStationId(),
                    stationProtocol.getTaskNo(),
                    currentTaskBufferCommandCount);
        }
        if (!stationDispatchRuntimeStateSupport.tryAcquireRunBlockDirectReassignLock(
                wrkMast.getWrkNo(),
                stationProtocol.getStationId(),
                RUN_BLOCK_DIRECT_REASSIGN_LIMIT_SECONDS)) {
            News.info("输送站点运行堵塞重分配已跳过,15分钟内不允许重复申请。站点号={},工作号={}",
                    stationProtocol.getStationId(),
                    wrkMast.getWrkNo());
            return;
        }
        String response = wmsOperateUtils.applyReassignTaskLocNo(wrkMast.getWrkNo(), stationProtocol.getStationId());
@@ -534,6 +546,7 @@
        if (!wrkMastService.updateById(wrkMast)) {
            return;
        }
        stationDispatchRuntimeStateSupport.signalSegmentReset(wrkMast.getWrkNo(), STATION_MOVE_RESET_WAIT_MS);
        boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "checkStationRunBlock_direct");
        if (!offered) {
            return;
@@ -551,6 +564,30 @@
        }
    }
    private boolean isDirectReassignContextStale(StationProtocol stationProtocol, WrkMast wrkMast) {
        if (stationProtocol == null || wrkMast == null || stationMoveCoordinator == null) {
            return false;
        }
        Integer taskNo = wrkMast.getWrkNo();
        Integer triggerStationId = stationProtocol.getStationId();
        if (taskNo == null || taskNo <= 0 || triggerStationId == null) {
            return false;
        }
        StationMoveSession session = stationMoveCoordinator.loadSession(taskNo);
        if (session == null || !session.isActive()) {
            return false;
        }
        Integer currentStationId = session.getCurrentStationId();
        if (currentStationId == null || Objects.equals(currentStationId, triggerStationId)) {
            return false;
        }
        News.info("输送站点运行堵塞重分配已跳过,任务已离开触发站点。触发站点={},当前站点={},工作号={}",
                triggerStationId,
                currentStationId,
                taskNo);
        return true;
    }
    private int countCurrentTaskBufferCommands(List<StationTaskBufferItem> taskBufferItems, Integer currentTaskNo) {
        if (taskBufferItems == null || taskBufferItems.isEmpty() || currentTaskNo == null || currentTaskNo <= 0) {
            return 0;