From f3b64d003bc3458af3dd434e6187d3aba23a64aa Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期四, 26 三月 2026 14:35:44 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java | 98 +++++++++++++++++++++++++++++++++---------------
1 files changed, 67 insertions(+), 31 deletions(-)
diff --git a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
index 3da25e3..50ca80f 100644
--- a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
+++ b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -35,6 +35,12 @@
private final Function<StationCommand, CommandResponse> commandSender;
private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
+ private enum SegmentSendResult {
+ DISPATCHED,
+ CANCELLED,
+ RETRY
+ }
+
public StationSegmentExecutor(DeviceConfig deviceConfig,
RedisUtil redisUtil,
Function<StationCommand, CommandResponse> commandSender) {
@@ -86,7 +92,7 @@
boolean firstRun = true;
while (true) {
try {
- if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
+ if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) {
if (traceRegistry != null) {
traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
@@ -191,39 +197,65 @@
Integer traceVersion,
Integer currentStationId) {
while (true) {
- if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
- if (traceRegistry != null && command != null) {
- traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
- buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
- }
- markCancelled(command == null ? null : command.getTaskNo(),
- command == null ? null : command.getRouteVersion(),
- currentStationId,
- "route_version_replaced");
+ SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
+ if (sendResult == SegmentSendResult.CANCELLED) {
return false;
}
- if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
- if (traceRegistry != null && command != null) {
- traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
- buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
- }
- markCancelled(command == null ? null : command.getTaskNo(),
- command == null ? null : command.getRouteVersion(),
- currentStationId,
- "redis_cancel_signal");
- return false;
- }
- CommandResponse commandResponse = commandSender.apply(command);
- if (commandResponse == null) {
+ if (sendResult == SegmentSendResult.RETRY) {
sleepQuietly(200L);
continue;
}
- if (commandResponse.getResult()) {
- markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
- return true;
- }
- sleepQuietly(200L);
+ return true;
}
+ }
+
+ private SegmentSendResult executeLockedSegmentSend(StationCommand command,
+ StationTaskTraceRegistry traceRegistry,
+ Integer traceVersion,
+ Integer currentStationId) {
+ Integer taskNo = command == null ? null : command.getTaskNo();
+ StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+ if (moveCoordinator != null) {
+ // 鍒嗘鍙戦�佺殑鏈�缁堟鏌ュ拰瀹為檯涓嬪彂闇�瑕佷笌 reroute 鍏辩敤浠诲姟閿併��
+ // 杩欐牱鍒囪矾绾跨▼涓�鏃﹁繘鍏� CANCEL_PENDING/RESET锛屾棫璺嚎灏变笉鑳藉啀绌胯繃鏈�鍚庤繖涓�姝ュ彂鍒拌澶囦晶銆�
+ return moveCoordinator.withTaskDispatchLock(taskNo,
+ () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId));
+ }
+ return doSendSegment(command, traceRegistry, traceVersion, currentStationId);
+ }
+
+ private SegmentSendResult doSendSegment(StationCommand command,
+ StationTaskTraceRegistry traceRegistry,
+ Integer traceVersion,
+ Integer currentStationId) {
+ if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+ if (traceRegistry != null && command != null) {
+ traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+ buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
+ }
+ markCancelled(command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getRouteVersion(),
+ currentStationId,
+ "route_version_replaced");
+ return SegmentSendResult.CANCELLED;
+ }
+ if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
+ if (traceRegistry != null && command != null) {
+ traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+ buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
+ }
+ markCancelled(command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getRouteVersion(),
+ currentStationId,
+ "redis_cancel_signal");
+ return SegmentSendResult.CANCELLED;
+ }
+ CommandResponse commandResponse = commandSender.apply(command);
+ if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) {
+ return SegmentSendResult.RETRY;
+ }
+ markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
+ return SegmentSendResult.DISPATCHED;
}
private double loadSegmentAdvanceRatio() {
@@ -401,15 +433,19 @@
}
}
- private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
+ private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) {
// Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
// do not register a move session and therefore have no routeVersion.
// They should keep the historical behavior and execute normally.
if (taskNo == null || routeVersion == null) {
return true;
}
- StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
- return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
+ StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+ return moveCoordinator == null || moveCoordinator.canDispatchRoute(taskNo, routeVersion);
+ }
+
+ private StationMoveCoordinator loadMoveCoordinator() {
+ return SpringUtils.getBean(StationMoveCoordinator.class);
}
private void markSegmentIssued(Integer taskNo, Integer routeVersion) {
--
Gitblit v1.9.1