From f7d2eda120867b3c5a2d9001d5f5c9d0396c65bd Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 14 四月 2026 12:41:13 +0800
Subject: [PATCH] #命令下发队列优化
---
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java | 20 ++++++----
src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java | 46 ++++++++++++++--------
src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java | 11 ++++-
3 files changed, 49 insertions(+), 28 deletions(-)
diff --git a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
index 9647384..21496f0 100644
--- a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
+++ b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -241,15 +241,19 @@
Integer currentStationId) {
// 鍦ㄤ笅鍙戞柊鍒嗘鍓嶆鏌ヨ矾鐢辩増鏈槸鍚︿粛鐒舵湁鏁堬紝閬垮厤鍦ㄨ矾鐢辩増鏈凡鏇存柊鐨勬儏鍐典笅涓嬪彂鏃х増鏈懡浠�
if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
- if (traceRegistry != null && command != null) {
- traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
- buildDetails("reason", "route_version_replaced_before_segment_send", "routeVersion", command.getRouteVersion()));
+ // 棣栨鏍¢獙澶辫触鍙兘鏄� Redis 鍐欏叆寤惰繜瀵艰嚧鐨勶紝鐭殏绛夊緟鍚庨噸璇曚竴娆°��
+ sleepQuietly(50L);
+ if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+ if (traceRegistry != null && command != null) {
+ traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+ buildDetails("reason", "route_version_replaced_before_segment_send", "routeVersion", command.getRouteVersion()));
+ }
+ markCancelled(command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getRouteVersion(),
+ currentStationId,
+ "route_version_replaced_before_segment_send");
+ return false;
}
- markCancelled(command == null ? null : command.getTaskNo(),
- command == null ? null : command.getRouteVersion(),
- currentStationId,
- "route_version_replaced_before_segment_send");
- return false;
}
while (true) {
diff --git a/src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java b/src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java
index 5b51be2..7ae1627 100644
--- a/src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java
+++ b/src/main/java/com/zy/core/utils/station/StationRegularDispatchProcessor.java
@@ -351,8 +351,8 @@
wrkMast.setModiTime(now);
if (wrkMastService.updateById(wrkMast)) {
wrkAnalysisService.markInboundStationStart(wrkMast, now);
- boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "stationInExecute");
- if (offered && stationMoveCoordinator != null) {
+ // 鍏堣褰� session 鍐嶅叆闃熷懡浠わ紝閬垮厤娑堣垂绾跨▼鍦� session 鍐欏叆 Redis 鍓嶅彇鍒板懡浠ゅ鑷磋矾鐢辨牎楠屽け璐ャ��
+ if (stationMoveCoordinator != null) {
stationMoveCoordinator.recordDispatch(
wrkMast.getWrkNo(),
stationProtocol.getStationId(),
@@ -361,7 +361,12 @@
false
);
}
- News.info("杈撻�佺珯鐐瑰叆搴撳懡浠や笅鍙戞垚鍔燂紝绔欑偣鍙�={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}", stationId, wrkMast.getWrkNo(), JSON.toJSONString(command));
+ boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "stationInExecute");
+ if (offered) {
+ News.info("杈撻�佺珯鐐瑰叆搴撳懡浠や笅鍙戞垚鍔燂紝绔欑偣鍙�={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}", stationId, wrkMast.getWrkNo(), JSON.toJSONString(command));
+ } else {
+ News.warn("杈撻�佺珯鐐瑰叆搴撳懡浠ゅ叆闃熻鎷掔粷(鍙兘閲嶅)锛岀珯鐐瑰彿={}锛屽伐浣滃彿={}", stationId, wrkMast.getWrkNo());
+ }
redisUtil.set(RedisKeyType.STATION_IN_EXECUTE_LIMIT.key + stationId, "lock", 5);
loadGuardState.reserveLoopTask(loopHitResult.getLoopNo());
stationDispatchLoadSupport.saveLoopLoadReserve(wrkMast.getWrkNo(), loopHitResult);
diff --git a/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java b/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
index dddde68..a433445 100644
--- a/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
+++ b/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
@@ -511,13 +511,16 @@
int clearedCommandCount = 0;
- boolean offered = offerDevpCommandWithDedup(context.dispatchDeviceNo(), plan.command(), plan.dispatchScene());
- if (!offered) {
- return RerouteExecutionResult.skip("dispatch-dedup");
- }
+ // 鍏堝彇娑堟棫 session 骞惰褰曟柊 session锛屽啀鍏ラ槦鍛戒护锛岄伩鍏嶆秷璐圭嚎绋嬪湪 session 鍐欏叆 Redis 鍓嶅彇鍒板懡浠ゅ鑷磋矾鐢辨牎楠屽け璐ャ��
if (context.cancelSessionBeforeDispatch() && stationMoveCoordinator != null) {
stationMoveCoordinator.markCancelPending(taskNo, "reroute_pending");
stationMoveCoordinator.cancelSession(taskNo);
+ }
+ preRegisterDispatchSession(context, plan);
+
+ boolean offered = offerDevpCommandWithDedup(context.dispatchDeviceNo(), plan.command(), plan.dispatchScene());
+ if (!offered) {
+ return RerouteExecutionResult.skip("dispatch-dedup");
}
applyRerouteDispatchEffects(context, plan, clearedCommandCount);
@@ -531,6 +534,24 @@
}
RerouteCommandPlan plan = buildRerouteCommandPlan(context, decision);
return executeReroutePlan(context, plan);
+ }
+
+ private void preRegisterDispatchSession(RerouteContext context, RerouteCommandPlan plan) {
+ if (context == null || plan == null || plan.command() == null || context.wrkMast() == null || context.stationProtocol() == null) {
+ return;
+ }
+ if (stationMoveCoordinator == null) {
+ return;
+ }
+ OutOrderDispatchDecision dispatchDecision =
+ plan.decision() == null ? null : plan.decision().dispatchDecision();
+ stationMoveCoordinator.recordDispatch(
+ context.wrkMast().getWrkNo(),
+ context.stationProtocol().getStationId(),
+ plan.dispatchScene(),
+ plan.command(),
+ dispatchDecision != null && dispatchDecision.isCircle()
+ );
}
private void applyRerouteDispatchEffects(RerouteContext context,
@@ -551,15 +572,6 @@
dispatchDecision,
plan.command()
);
- if (stationMoveCoordinator != null) {
- stationMoveCoordinator.recordDispatch(
- wrkMast.getWrkNo(),
- stationProtocol.getStationId(),
- plan.dispatchScene(),
- plan.command(),
- dispatchDecision != null && dispatchDecision.isCircle()
- );
- }
if (context.sceneType() == RerouteSceneType.RUN_BLOCK_REROUTE) {
News.info("杈撻�佺珯鐐瑰牭濉炲悗閲嶆柊璁$畻璺緞鍛戒护涓嬪彂鎴愬姛锛岀珯鐐瑰彿={}锛屽伐浣滃彿={}锛屽懡浠ゆ暟鎹�={}",
stationProtocol.getStationId(),
@@ -678,10 +690,6 @@
stationProtocol.getStationId(),
RUN_BLOCK_DIRECT_REASSIGN_LIMIT_SECONDS);
stationDispatchRuntimeStateSupport.signalSegmentReset(wrkMast.getWrkNo(), STATION_MOVE_RESET_WAIT_MS);
- boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "checkStationRunBlock_direct");
- if (!offered) {
- return;
- }
if (stationMoveCoordinator != null) {
stationMoveCoordinator.markCancelPending(wrkMast.getWrkNo(), "reroute_pending");
stationMoveCoordinator.cancelSession(wrkMast.getWrkNo());
@@ -693,6 +701,10 @@
false
);
}
+ boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "checkStationRunBlock_direct");
+ if (!offered) {
+ News.warn("杈撻�佺珯鐐瑰牭濉炵洿娲惧懡浠ゅ叆闃熻鎷掔粷(鍙兘閲嶅)锛岀珯鐐瑰彿={}锛屽伐浣滃彿={}", stationProtocol.getStationId(), wrkMast.getWrkNo());
+ }
}
private int countCurrentTaskBufferCommands(List<StationTaskBufferItem> taskBufferItems, Integer currentTaskNo) {
--
Gitblit v1.9.1