Junjie
2026-04-14 f7d2eda120867b3c5a2d9001d5f5c9d0396c65bd
#命令下发队列优化
3个文件已修改
59 ■■■■■ 已修改文件
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -241,6 +241,9 @@
                                         Integer currentStationId) {
        // 在下发新分段前检查路由版本是否仍然有效,避免在路由版本已更新的情况下下发旧版本命令
        if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
            // 首次校验失败可能是 Redis 写入延迟导致的,短暂等待后重试一次。
            sleepQuietly(50L);
            if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
            if (traceRegistry != null && command != null) {
                traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
                        buildDetails("reason", "route_version_replaced_before_segment_send", "routeVersion", command.getRouteVersion()));
@@ -251,6 +254,7 @@
                    "route_version_replaced_before_segment_send");
            return false;
        }
        }
        
        while (true) {
            SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java
@@ -351,8 +351,8 @@
            wrkMast.setModiTime(now);
            if (wrkMastService.updateById(wrkMast)) {
                wrkAnalysisService.markInboundStationStart(wrkMast, now);
                boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "stationInExecute");
                if (offered && stationMoveCoordinator != null) {
                // 先记录 session 再入队命令,避免消费线程在 session 写入 Redis 前取到命令导致路由校验失败。
                if (stationMoveCoordinator != null) {
                    stationMoveCoordinator.recordDispatch(
                            wrkMast.getWrkNo(),
                            stationProtocol.getStationId(),
@@ -361,7 +361,12 @@
                            false
                    );
                }
                boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "stationInExecute");
                if (offered) {
                News.info("输送站点入库命令下发成功,站点号={},工作号={},命令数据={}", stationId, wrkMast.getWrkNo(), JSON.toJSONString(command));
                } else {
                    News.warn("输送站点入库命令入队被拒绝(可能重复),站点号={},工作号={}", stationId, wrkMast.getWrkNo());
                }
                redisUtil.set(RedisKeyType.STATION_IN_EXECUTE_LIMIT.key + stationId, "lock", 5);
                loadGuardState.reserveLoopTask(loopHitResult.getLoopNo());
                stationDispatchLoadSupport.saveLoopLoadReserve(wrkMast.getWrkNo(), loopHitResult);
src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
@@ -511,13 +511,16 @@
        int clearedCommandCount = 0;
        boolean offered = offerDevpCommandWithDedup(context.dispatchDeviceNo(), plan.command(), plan.dispatchScene());
        if (!offered) {
            return RerouteExecutionResult.skip("dispatch-dedup");
        }
        // 先取消旧 session 并记录新 session,再入队命令,避免消费线程在 session 写入 Redis 前取到命令导致路由校验失败。
        if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
            stationMoveCoordinator.markCancelPending(taskNo, "reroute_pending");
            stationMoveCoordinator.cancelSession(taskNo);
        }
        preRegisterDispatchSession(context, plan);
        boolean offered = offerDevpCommandWithDedup(context.dispatchDeviceNo(), plan.command(), plan.dispatchScene());
        if (!offered) {
            return RerouteExecutionResult.skip("dispatch-dedup");
        }
        applyRerouteDispatchEffects(context, plan, clearedCommandCount);
@@ -531,6 +534,24 @@
        }
        RerouteCommandPlan plan = buildRerouteCommandPlan(context, decision);
        return executeReroutePlan(context, plan);
    }
    private void preRegisterDispatchSession(RerouteContext context, RerouteCommandPlan plan) {
        if (context == null || plan == null || plan.command() == null || context.wrkMast() == null || context.stationProtocol() == null) {
            return;
        }
        if (stationMoveCoordinator == null) {
            return;
        }
        OutOrderDispatchDecision dispatchDecision =
                plan.decision() == null ? null : plan.decision().dispatchDecision();
        stationMoveCoordinator.recordDispatch(
                context.wrkMast().getWrkNo(),
                context.stationProtocol().getStationId(),
                plan.dispatchScene(),
                plan.command(),
                dispatchDecision != null && dispatchDecision.isCircle()
        );
    }
    private void applyRerouteDispatchEffects(RerouteContext context,
@@ -551,15 +572,6 @@
                dispatchDecision,
                plan.command()
        );
        if (stationMoveCoordinator != null) {
            stationMoveCoordinator.recordDispatch(
                    wrkMast.getWrkNo(),
                    stationProtocol.getStationId(),
                    plan.dispatchScene(),
                    plan.command(),
                    dispatchDecision != null && dispatchDecision.isCircle()
            );
        }
        if (context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE) {
            News.info("输送站点堵塞后重新计算路径命令下发成功,站点号={},工作号={},命令数据={}",
                    stationProtocol.getStationId(),
@@ -678,10 +690,6 @@
                stationProtocol.getStationId(),
                RUN_BLOCK_DIRECT_REASSIGN_LIMIT_SECONDS);
        stationDispatchRuntimeStateSupport.signalSegmentReset(wrkMast.getWrkNo(), STATION_MOVE_RESET_WAIT_MS);
        boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "checkStationRunBlock_direct");
        if (!offered) {
            return;
        }
        if (stationMoveCoordinator != null) {
            stationMoveCoordinator.markCancelPending(wrkMast.getWrkNo(), "reroute_pending");
            stationMoveCoordinator.cancelSession(wrkMast.getWrkNo());
@@ -693,6 +701,10 @@
                    false
            );
        }
        boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "checkStationRunBlock_direct");
        if (!offered) {
            News.warn("输送站点堵塞直派命令入队被拒绝(可能重复),站点号={},工作号={}", stationProtocol.getStationId(), wrkMast.getWrkNo());
        }
    }
    private int countCurrentTaskBufferCommands(List<StationTaskBufferItem> taskBufferItems, Integer currentTaskNo) {