From dc3f9cc91759823ce59486f19b138be4b296a0f1 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 28 四月 2026 09:43:28 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/core/move/StationMoveCoordinator.java |   55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 55 insertions(+), 0 deletions(-)

diff --git a/src/main/java/com/zy/core/move/StationMoveCoordinator.java b/src/main/java/com/zy/core/move/StationMoveCoordinator.java
index 1adc417..2f167e1 100644
--- a/src/main/java/com/zy/core/move/StationMoveCoordinator.java
+++ b/src/main/java/com/zy/core/move/StationMoveCoordinator.java
@@ -5,6 +5,7 @@
 import com.zy.common.utils.RedisUtil;
 import com.zy.core.enums.RedisKeyType;
 import com.zy.core.model.command.StationCommand;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -12,15 +13,22 @@
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Supplier;
 
+@Slf4j
 @Component
 public class StationMoveCoordinator {
 
     private static final int SESSION_EXPIRE_SECONDS = 60 * 60 * 24;
+    private static final long TASK_DISPATCH_LOCK_SLOW_THRESHOLD_MS = 50L;
+    private static final long RECORD_DISPATCH_SLOW_THRESHOLD_MS = 50L;
+    private final Map<Integer, ReentrantLock> taskDispatchLocks = new ConcurrentHashMap<>();
 
     @Autowired
     private StationMoveSessionRegistry sessionRegistry;
@@ -36,6 +44,10 @@
 
     public boolean isActiveRoute(Integer taskNo, Integer routeVersion) {
         return sessionRegistry != null && sessionRegistry.isActiveRoute(taskNo, routeVersion);
+    }
+
+    public boolean canDispatchRoute(Integer taskNo, Integer routeVersion) {
+        return sessionRegistry != null && sessionRegistry.canDispatchRoute(taskNo, routeVersion);
     }
 
     public void markSegmentIssued(Integer taskNo, Integer routeVersion) {
@@ -71,9 +83,33 @@
         if (session == null || !session.isActive()) {
             return;
         }
+        log.info("markCancelPending, taskNo={}, routeVersion={}, cancelReason={}", taskNo, session.getRouteVersion(), cancelReason);
         session.setStatus(StationMoveSession.STATUS_CANCEL_PENDING);
         session.setCancelReason(cancelReason);
         saveSession(session);
+    }
+
+    public <T> T withTaskDispatchLock(Integer taskNo, Supplier<T> supplier) {
+        if (supplier == null) {
+            return null;
+        }
+        if (taskNo == null || taskNo <= 0) {
+            return supplier.get();
+        }
+        ReentrantLock lock = taskDispatchLocks.computeIfAbsent(taskNo, key -> new ReentrantLock());
+        long waitStartMs = System.currentTimeMillis();
+        lock.lock();
+        long lockWaitMs = System.currentTimeMillis() - waitStartMs;
+        long holdStartMs = System.currentTimeMillis();
+        try {
+            return supplier.get();
+        } finally {
+            long holdMs = System.currentTimeMillis() - holdStartMs;
+            if (lockWaitMs > TASK_DISPATCH_LOCK_SLOW_THRESHOLD_MS || holdMs > TASK_DISPATCH_LOCK_SLOW_THRESHOLD_MS) {
+                log.warn("taskDispatchLock slow, taskNo={}, lockWaitMs={}ms, lockHoldMs={}ms", taskNo, lockWaitMs, holdMs);
+            }
+            lock.unlock();
+        }
     }
 
     public boolean shouldSuppressDispatch(Integer taskNo, Integer currentStationId, StationCommand candidateCommand) {
@@ -87,14 +123,17 @@
         }
 
         String candidateSignature = buildPathSignature(candidateCommand);
+        // 鍚� task銆佸悓褰撳墠浣嶇疆銆佸悓璺緞绛惧悕鐨勫懡浠よ涓洪噸澶嶆淳鍙戯紝鐩存帴鍘嬪埗銆�
         if (!isBlank(candidateSignature) && Objects.equals(candidateSignature, session.getPathSignature())) {
             return true;
         }
 
+        // 鍒拌揪涓嬩竴鍐崇瓥绔欏悗鍏佽閲嶆柊鍐崇瓥锛屼笉缁х画鍙楁棫 session 鐨勪腑闂磋矾寰勪繚鎶ゃ��
         if (Objects.equals(currentStationId, session.getNextDecisionStationId())) {
             return false;
         }
 
+        // 杩樺鍦ㄦ棫璺嚎瑕嗙洊鑼冨洿鍐呮椂锛屽叾浠栬Е鍙戞簮涓嶅簲鍐嶆彃鍏ヤ竴鏉℃柊鍛戒护銆�
         return session.containsStation(currentStationId);
     }
 
@@ -107,6 +146,7 @@
             return null;
         }
 
+        long startMs = System.currentTimeMillis();
         StationMoveSession current = loadSession(taskNo);
         long now = System.currentTimeMillis();
         String pathSignature = buildPathSignature(command);
@@ -117,6 +157,7 @@
                 && Objects.equals(current.getNextDecisionStationId(), command.getTargetStaNo())
                 && Objects.equals(current.getPathSignature(), pathSignature);
 
+        // 鍚屼竴瑙﹀彂绔欍�佸悓涓�鐩爣銆佸悓涓�璺緞绛惧悕鏃跺鐢ㄥ綋鍓� session锛屽彧鍒锋柊涓嬪彂鏃堕棿锛屼笉鏂板紑 routeVersion銆�
         StationMoveSession session = reuseCurrent ? current : new StationMoveSession();
         if (!reuseCurrent) {
             session.setRouteVersion(current == null || current.getRouteVersion() == null
@@ -145,6 +186,16 @@
 
         command.setRouteVersion(session.getRouteVersion());
         saveSession(session);
+        long recordDispatchCostMs = System.currentTimeMillis() - startMs;
+        log.info("recordDispatch done, taskNo={}, routeVersion={}, reuse={}, prevRouteVersion={}, dispatchStationId={}, triggerName={}, recordDispatchCostMs={}ms",
+                taskNo, session.getRouteVersion(), reuseCurrent,
+                current == null ? null : current.getRouteVersion(),
+                dispatchStationId, triggerName, recordDispatchCostMs);
+        if (recordDispatchCostMs > RECORD_DISPATCH_SLOW_THRESHOLD_MS) {
+            log.warn("recordDispatch slow, taskNo={}, dispatchStationId={}, triggerName={}, recordDispatchCostMs={}ms, pathSize={}",
+                    taskNo, dispatchStationId, triggerName, recordDispatchCostMs,
+                    fullPathStationIds == null ? 0 : fullPathStationIds.size());
+        }
 
         if (circleRoute) {
             saveLegacyCircleCommand(taskNo, command);
@@ -161,6 +212,7 @@
             clearLegacyCircleCommand(taskNo);
             return null;
         }
+        log.info("cancelSession, taskNo={}, routeVersion={}, cancelReason=reroute_cancelled", taskNo, session.getRouteVersion());
         session.setStatus(StationMoveSession.STATUS_CANCELLED);
         session.setCancelReason("reroute_cancelled");
         saveSession(session);
@@ -212,8 +264,11 @@
         }
         StationMoveSession session = sessionRegistry.load(taskNo);
         if (session == null || !Objects.equals(session.getRouteVersion(), routeVersion)) {
+            log.warn("updateTerminal skipped: session mismatch, taskNo={}, cmdRouteVersion={}, sessionRouteVersion={}, targetStatus={}, cancelReason={}",
+                    taskNo, routeVersion, session == null ? null : session.getRouteVersion(), status, cancelReason);
             return;
         }
+        log.info("updateTerminal, taskNo={}, routeVersion={}, status={}, cancelReason={}", taskNo, routeVersion, status, cancelReason);
         session.setCurrentStationId(currentStationId);
         session.setStatus(status);
         session.setCancelReason(cancelReason);

--
Gitblit v1.9.1