Junjie
3 天以前 63b01db83d9aad8a15276b4236a9a22e4aeef065
src/main/java/com/zy/core/move/StationMoveCoordinator.java
@@ -5,6 +5,7 @@
import com.zy.common.utils.RedisUtil;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.model.command.StationCommand;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -12,15 +13,22 @@
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
@Slf4j
@Component
public class StationMoveCoordinator {
    private static final int SESSION_EXPIRE_SECONDS = 60 * 60 * 24;
    private static final long TASK_DISPATCH_LOCK_SLOW_THRESHOLD_MS = 50L;
    private static final long RECORD_DISPATCH_SLOW_THRESHOLD_MS = 50L;
    private final Map<Integer, ReentrantLock> taskDispatchLocks = new ConcurrentHashMap<>();
    @Autowired
    private StationMoveSessionRegistry sessionRegistry;
@@ -36,6 +44,10 @@
    public boolean isActiveRoute(Integer taskNo, Integer routeVersion) {
        return sessionRegistry != null && sessionRegistry.isActiveRoute(taskNo, routeVersion);
    }
    public boolean canDispatchRoute(Integer taskNo, Integer routeVersion) {
        return sessionRegistry != null && sessionRegistry.canDispatchRoute(taskNo, routeVersion);
    }
    public void markSegmentIssued(Integer taskNo, Integer routeVersion) {
@@ -71,9 +83,33 @@
        if (session == null || !session.isActive()) {
            return;
        }
        log.info("markCancelPending, taskNo={}, routeVersion={}, cancelReason={}", taskNo, session.getRouteVersion(), cancelReason);
        session.setStatus(StationMoveSession.STATUS_CANCEL_PENDING);
        session.setCancelReason(cancelReason);
        saveSession(session);
    }
    public <T> T withTaskDispatchLock(Integer taskNo, Supplier<T> supplier) {
        if (supplier == null) {
            return null;
        }
        if (taskNo == null || taskNo <= 0) {
            return supplier.get();
        }
        ReentrantLock lock = taskDispatchLocks.computeIfAbsent(taskNo, key -> new ReentrantLock());
        long waitStartMs = System.currentTimeMillis();
        lock.lock();
        long lockWaitMs = System.currentTimeMillis() - waitStartMs;
        long holdStartMs = System.currentTimeMillis();
        try {
            return supplier.get();
        } finally {
            long holdMs = System.currentTimeMillis() - holdStartMs;
            if (lockWaitMs > TASK_DISPATCH_LOCK_SLOW_THRESHOLD_MS || holdMs > TASK_DISPATCH_LOCK_SLOW_THRESHOLD_MS) {
                log.warn("taskDispatchLock slow, taskNo={}, lockWaitMs={}ms, lockHoldMs={}ms", taskNo, lockWaitMs, holdMs);
            }
            lock.unlock();
        }
    }
    public boolean shouldSuppressDispatch(Integer taskNo, Integer currentStationId, StationCommand candidateCommand) {
@@ -110,6 +146,7 @@
            return null;
        }
        long startMs = System.currentTimeMillis();
        StationMoveSession current = loadSession(taskNo);
        long now = System.currentTimeMillis();
        String pathSignature = buildPathSignature(command);
@@ -149,6 +186,16 @@
        command.setRouteVersion(session.getRouteVersion());
        saveSession(session);
        long recordDispatchCostMs = System.currentTimeMillis() - startMs;
        log.info("recordDispatch done, taskNo={}, routeVersion={}, reuse={}, prevRouteVersion={}, dispatchStationId={}, triggerName={}, recordDispatchCostMs={}ms",
                taskNo, session.getRouteVersion(), reuseCurrent,
                current == null ? null : current.getRouteVersion(),
                dispatchStationId, triggerName, recordDispatchCostMs);
        if (recordDispatchCostMs > RECORD_DISPATCH_SLOW_THRESHOLD_MS) {
            log.warn("recordDispatch slow, taskNo={}, dispatchStationId={}, triggerName={}, recordDispatchCostMs={}ms, pathSize={}",
                    taskNo, dispatchStationId, triggerName, recordDispatchCostMs,
                    fullPathStationIds == null ? 0 : fullPathStationIds.size());
        }
        if (circleRoute) {
            saveLegacyCircleCommand(taskNo, command);
@@ -165,6 +212,7 @@
            clearLegacyCircleCommand(taskNo);
            return null;
        }
        log.info("cancelSession, taskNo={}, routeVersion={}, cancelReason=reroute_cancelled", taskNo, session.getRouteVersion());
        session.setStatus(StationMoveSession.STATUS_CANCELLED);
        session.setCancelReason("reroute_cancelled");
        saveSession(session);
@@ -216,8 +264,11 @@
        }
        StationMoveSession session = sessionRegistry.load(taskNo);
        if (session == null || !Objects.equals(session.getRouteVersion(), routeVersion)) {
            log.warn("updateTerminal skipped: session mismatch, taskNo={}, cmdRouteVersion={}, sessionRouteVersion={}, targetStatus={}, cancelReason={}",
                    taskNo, routeVersion, session == null ? null : session.getRouteVersion(), status, cancelReason);
            return;
        }
        log.info("updateTerminal, taskNo={}, routeVersion={}, status={}, cancelReason={}", taskNo, routeVersion, status, cancelReason);
        session.setCurrentStationId(currentStationId);
        session.setStatus(status);
        session.setCancelReason(cancelReason);