From 852664df1caf38831793b341edcada9dd7b6c22a Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期三, 06 五月 2026 19:28:33 +0800
Subject: [PATCH] #dfs
---
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java | 199 +++++++++++++++++++++++++++++++++++++++++--------
1 files changed, 167 insertions(+), 32 deletions(-)
diff --git a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
index 3da25e3..21496f0 100644
--- a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
+++ b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -6,6 +6,7 @@
import com.zy.asrs.domain.vo.StationTaskTraceSegmentVo;
import com.zy.asrs.entity.DeviceConfig;
import com.zy.common.utils.RedisUtil;
+import lombok.extern.slf4j.Slf4j;
import com.zy.core.cache.SlaveConnection;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
@@ -15,6 +16,8 @@
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.move.StationMoveCoordinator;
import com.zy.core.trace.StationTaskTraceRegistry;
+import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider;
+import com.zy.core.thread.support.StationTaskLocationRegistry;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
@@ -24,16 +27,24 @@
import java.util.Map;
import java.util.function.Function;
+@Slf4j
public class StationSegmentExecutor {
private static final String CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio";
private static final double DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = 0.3d;
private static final long CURRENT_STATION_TIMEOUT_MS = 1000L * 60L;
+ private static final long TASK_LOCATION_STALE_MS = 2_000L;
private final DeviceConfig deviceConfig;
private final RedisUtil redisUtil;
private final Function<StationCommand, CommandResponse> commandSender;
private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
+
+ private enum SegmentSendResult {
+ DISPATCHED,
+ CANCELLED,
+ RETRY
+ }
public StationSegmentExecutor(DeviceConfig deviceConfig,
RedisUtil redisUtil,
@@ -83,10 +94,11 @@
long lastSeenAt = System.currentTimeMillis();
int segCursor = 0;
Integer lastCurrentStationId = null;
+ int lastMatchedPathIndex = -1;
boolean firstRun = true;
while (true) {
try {
- if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
+ if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) {
if (traceRegistry != null) {
traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
@@ -138,12 +150,17 @@
break;
}
- int currentIndex = effectiveFullPath.indexOf(currentStation.getStationId());
+ int currentIndex = resolveCurrentPathIndex(
+ effectiveFullPath,
+ currentStation.getStationId(),
+ lastMatchedPathIndex
+ );
if (currentIndex < 0) {
Thread.sleep(500L);
firstRun = false;
continue;
}
+ lastMatchedPathIndex = currentIndex;
int remaining = effectiveFullPath.size() - currentIndex - 1;
if (remaining <= 0) {
@@ -186,47 +203,129 @@
}
}
+ private int resolveCurrentPathIndex(List<Integer> fullPathStationIds,
+ Integer currentStationId,
+ int lastMatchedPathIndex) {
+ if (fullPathStationIds == null || fullPathStationIds.isEmpty() || currentStationId == null) {
+ return -1;
+ }
+ if (lastMatchedPathIndex >= 0
+ && lastMatchedPathIndex < fullPathStationIds.size()
+ && equalsInteger(currentStationId, fullPathStationIds.get(lastMatchedPathIndex))) {
+ return lastMatchedPathIndex;
+ }
+
+ int nextIndex = findNextStationIndex(fullPathStationIds, currentStationId, Math.max(lastMatchedPathIndex + 1, 0));
+ if (nextIndex >= 0) {
+ return nextIndex;
+ }
+ return findNextStationIndex(fullPathStationIds, currentStationId, 0);
+ }
+
+ private int findNextStationIndex(List<Integer> path, Integer stationId, int fromIndex) {
+ if (path == null || path.isEmpty() || stationId == null) {
+ return -1;
+ }
+ int startIdx = Math.max(fromIndex, 0);
+ for (int i = startIdx; i < path.size(); i++) {
+ if (equalsInteger(stationId, path.get(i))) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
private boolean sendSegmentWithRetry(StationCommand command,
StationTaskTraceRegistry traceRegistry,
Integer traceVersion,
Integer currentStationId) {
+ // 鍦ㄤ笅鍙戞柊鍒嗘鍓嶆鏌ヨ矾鐢辩増鏈槸鍚︿粛鐒舵湁鏁堬紝閬垮厤鍦ㄨ矾鐢辩増鏈凡鏇存柊鐨勬儏鍐典笅涓嬪彂鏃х増鏈懡浠�
+ if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+ // 棣栨鏍¢獙澶辫触鍙兘鏄� Redis 鍐欏叆寤惰繜瀵艰嚧鐨勶紝鐭殏绛夊緟鍚庨噸璇曚竴娆°��
+ sleepQuietly(50L);
+ if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+ if (traceRegistry != null && command != null) {
+ traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+ buildDetails("reason", "route_version_replaced_before_segment_send", "routeVersion", command.getRouteVersion()));
+ }
+ markCancelled(command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getRouteVersion(),
+ currentStationId,
+ "route_version_replaced_before_segment_send");
+ return false;
+ }
+ }
+
while (true) {
- if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
- if (traceRegistry != null && command != null) {
- traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
- buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
- }
- markCancelled(command == null ? null : command.getTaskNo(),
- command == null ? null : command.getRouteVersion(),
- currentStationId,
- "route_version_replaced");
+ SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId);
+ if (sendResult == SegmentSendResult.CANCELLED) {
return false;
}
- if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
- if (traceRegistry != null && command != null) {
- traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
- buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
- }
- markCancelled(command == null ? null : command.getTaskNo(),
- command == null ? null : command.getRouteVersion(),
- currentStationId,
- "redis_cancel_signal");
- return false;
- }
- CommandResponse commandResponse = commandSender.apply(command);
- if (commandResponse == null) {
+ if (sendResult == SegmentSendResult.RETRY) {
sleepQuietly(200L);
continue;
}
- if (commandResponse.getResult()) {
- markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
- return true;
- }
- sleepQuietly(200L);
+ return true;
}
}
+ private SegmentSendResult executeLockedSegmentSend(StationCommand command,
+ StationTaskTraceRegistry traceRegistry,
+ Integer traceVersion,
+ Integer currentStationId) {
+ Integer taskNo = command == null ? null : command.getTaskNo();
+ StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+ if (moveCoordinator != null) {
+ // 鍒嗘鍙戦�佺殑鏈�缁堟鏌ュ拰瀹為檯涓嬪彂闇�瑕佷笌 reroute 鍏辩敤浠诲姟閿併��
+ // 杩欐牱鍒囪矾绾跨▼涓�鏃﹁繘鍏� CANCEL_PENDING/RESET锛屾棫璺嚎灏变笉鑳藉啀绌胯繃鏈�鍚庤繖涓�姝ュ彂鍒拌澶囦晶銆�
+ return moveCoordinator.withTaskDispatchLock(taskNo,
+ () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId));
+ }
+ return doSendSegment(command, traceRegistry, traceVersion, currentStationId);
+ }
+
+ private SegmentSendResult doSendSegment(StationCommand command,
+ StationTaskTraceRegistry traceRegistry,
+ Integer traceVersion,
+ Integer currentStationId) {
+ if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
+ if (traceRegistry != null && command != null) {
+ traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+ buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
+ }
+ markCancelled(command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getRouteVersion(),
+ currentStationId,
+ "route_version_replaced");
+ return SegmentSendResult.CANCELLED;
+ }
+ if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
+ if (traceRegistry != null && command != null) {
+ traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
+ buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
+ }
+ markCancelled(command == null ? null : command.getTaskNo(),
+ command == null ? null : command.getRouteVersion(),
+ currentStationId,
+ "redis_cancel_signal");
+ return SegmentSendResult.CANCELLED;
+ }
+ CommandResponse commandResponse = commandSender.apply(command);
+ if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) {
+ return SegmentSendResult.RETRY;
+ }
+ markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
+ return SegmentSendResult.DISPATCHED;
+ }
+
private double loadSegmentAdvanceRatio() {
+ if (isV5ThreadImpl()) {
+ StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class);
+ if (configProvider != null) {
+ return configProvider.getSegmentAdvanceRatio();
+ }
+ return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
+ }
try {
ConfigService configService = SpringUtils.getBean(ConfigService.class);
if (configService == null) {
@@ -280,6 +379,9 @@
}
private StationProtocol findCurrentStationByTask(Integer taskNo) {
+ if (isV5ThreadImpl()) {
+ return findCurrentStationByTaskFromRegistry(taskNo);
+ }
try {
com.zy.asrs.service.DeviceConfigService deviceConfigService = SpringUtils.getBean(com.zy.asrs.service.DeviceConfigService.class);
if (deviceConfigService == null) {
@@ -306,6 +408,27 @@
return null;
}
return null;
+ }
+
+ private StationProtocol findCurrentStationByTaskFromRegistry(Integer taskNo) {
+ StationTaskLocationRegistry registry = SpringUtils.getBean(StationTaskLocationRegistry.class);
+ if (registry == null) {
+ return null;
+ }
+ StationTaskLocationRegistry.TaskLocationSnapshot snapshot = registry.findActive(taskNo, TASK_LOCATION_STALE_MS);
+ if (snapshot == null || !snapshot.isLoading()) {
+ return null;
+ }
+ StationProtocol stationProtocol = new StationProtocol();
+ stationProtocol.setTaskNo(snapshot.getTaskNo());
+ stationProtocol.setStationId(snapshot.getStationId());
+ stationProtocol.setRunBlock(snapshot.isRunBlock());
+ stationProtocol.setLoading(true);
+ return stationProtocol;
+ }
+
+ private boolean isV5ThreadImpl() {
+ return deviceConfig != null && "ZyStationV5Thread".equals(deviceConfig.getThreadImpl());
}
private List<StationTaskTraceSegmentVo> buildTraceSegments(List<StationCommand> segmentCommands) {
@@ -401,15 +524,27 @@
}
}
- private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
+ private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) {
// Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
// do not register a move session and therefore have no routeVersion.
// They should keep the historical behavior and execute normally.
if (taskNo == null || routeVersion == null) {
return true;
}
- StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
- return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
+ StationMoveCoordinator moveCoordinator = loadMoveCoordinator();
+ if (moveCoordinator == null) {
+ return true;
+ }
+ boolean dispatchable = moveCoordinator.canDispatchRoute(taskNo, routeVersion);
+ if (!dispatchable) {
+ log.warn("isRouteDispatchable rejected, taskNo={}, routeVersion={}, threadImpl={}",
+ taskNo, routeVersion, deviceConfig == null ? null : deviceConfig.getThreadImpl());
+ }
+ return dispatchable;
+ }
+
+ private StationMoveCoordinator loadMoveCoordinator() {
+ return SpringUtils.getBean(StationMoveCoordinator.class);
}
private void markSegmentIssued(Integer taskNo, Integer routeVersion) {
--
Gitblit v1.9.1