#
Junjie
8 小时以前 644c041d75574ad0ce318c81dc15cb74849f816a
src/main/java/com/zy/core/move/StationMoveCoordinator.java
@@ -12,15 +12,19 @@
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
@Component
public class StationMoveCoordinator {
    private static final int SESSION_EXPIRE_SECONDS = 60 * 60 * 24;
    private final Map<Integer, ReentrantLock> taskDispatchLocks = new ConcurrentHashMap<>();
    @Autowired
    private StationMoveSessionRegistry sessionRegistry;
@@ -36,6 +40,10 @@
    public boolean isActiveRoute(Integer taskNo, Integer routeVersion) {
        return sessionRegistry != null && sessionRegistry.isActiveRoute(taskNo, routeVersion);
    }
    public boolean canDispatchRoute(Integer taskNo, Integer routeVersion) {
        return sessionRegistry != null && sessionRegistry.canDispatchRoute(taskNo, routeVersion);
    }
    public void markSegmentIssued(Integer taskNo, Integer routeVersion) {
@@ -76,6 +84,23 @@
        saveSession(session);
    }
    public <T> T withTaskDispatchLock(Integer taskNo, Supplier<T> supplier) {
        if (supplier == null) {
            return null;
        }
        if (taskNo == null || taskNo <= 0) {
            return supplier.get();
        }
        // 同一任务的切路和分段发送必须共享一把锁,避免旧 routeVersion 在线程晚到时继续把上一条段命令写出去。
        ReentrantLock lock = taskDispatchLocks.computeIfAbsent(taskNo, key -> new ReentrantLock());
        lock.lock();
        try {
            return supplier.get();
        } finally {
            lock.unlock();
        }
    }
    public boolean shouldSuppressDispatch(Integer taskNo, Integer currentStationId, StationCommand candidateCommand) {
        if (taskNo == null || taskNo <= 0 || currentStationId == null || candidateCommand == null) {
            return false;
@@ -87,14 +112,17 @@
        }
        String candidateSignature = buildPathSignature(candidateCommand);
        // 同 task、同当前位置、同路径签名的命令视为重复派发,直接压制。
        if (!isBlank(candidateSignature) && Objects.equals(candidateSignature, session.getPathSignature())) {
            return true;
        }
        // 到达下一决策站后允许重新决策,不继续受旧 session 的中间路径保护。
        if (Objects.equals(currentStationId, session.getNextDecisionStationId())) {
            return false;
        }
        // 还处在旧路线覆盖范围内时,其他触发源不应再插入一条新命令。
        return session.containsStation(currentStationId);
    }
@@ -117,6 +145,7 @@
                && Objects.equals(current.getNextDecisionStationId(), command.getTargetStaNo())
                && Objects.equals(current.getPathSignature(), pathSignature);
        // 同一触发站、同一目标、同一路径签名时复用当前 session,只刷新下发时间,不新开 routeVersion。
        StationMoveSession session = reuseCurrent ? current : new StationMoveSession();
        if (!reuseCurrent) {
            session.setRouteVersion(current == null || current.getRouteVersion() == null