From cbb00d4729243e4949b3c921fc2f94cb03ca8aaa Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期五, 27 三月 2026 18:47:43 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java                  |    3 
 src/main/java/com/zy/core/thread/impl/ZyStationThread.java                    |    3 
 src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java                  |    3 
 src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java                  |    3 
 src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java     |   77 ++++++++-----------------
 src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java |   39 +++++++++---
 6 files changed, 63 insertions(+), 65 deletions(-)

diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationThread.java b/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
index 707c0fa..8b42cdd 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
@@ -48,11 +48,12 @@
     private ZyStationConnectDriver zyStationConnectDriver;
     private int deviceLogCollectTime = 200;
     private long deviceDataLogTime = System.currentTimeMillis();
-    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
+    private final RecentStationArrivalTracker recentArrivalTracker;
 
     public ZyStationThread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
         this.redisUtil = redisUtil;
+        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
     }
 
     @Override
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
index 3e2e1f3..6accb28 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
@@ -55,11 +55,12 @@
     private int deviceLogCollectTime = 200;
     private long deviceDataLogTime = System.currentTimeMillis();
     private ExecutorService executor = Executors.newFixedThreadPool(9999);
-    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
+    private final RecentStationArrivalTracker recentArrivalTracker;
 
     public ZyStationV3Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
         this.redisUtil = redisUtil;
+        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
     }
 
     @Override
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
index 9921b26..46cb90d 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
@@ -58,11 +58,12 @@
     private boolean initStatus = false;
     private long deviceDataLogTime = System.currentTimeMillis();
     private ExecutorService executor = Executors.newFixedThreadPool(9999);
-    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
+    private final RecentStationArrivalTracker recentArrivalTracker;
 
     public ZyStationV4Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
         this.redisUtil = redisUtil;
+        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
         this.segmentExecutor = new StationMoveSegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
     }
 
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
index 84636e9..1e8f4c4 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -74,11 +74,12 @@
     private long deviceDataLogTime = System.currentTimeMillis();
     private ExecutorService executor = Executors.newFixedThreadPool(9999);
     private StationV5SegmentExecutor segmentExecutor;
-    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
+    private final RecentStationArrivalTracker recentArrivalTracker;
 
     public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
         this.redisUtil = redisUtil;
+        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
         this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
     }
 
diff --git a/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java b/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
index a153465..193e43d 100644
--- a/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
+++ b/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
@@ -1,82 +1,57 @@
 package com.zy.core.thread.support;
 
+import com.zy.common.utils.RedisUtil;
+
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.LongSupplier;
 
 public class RecentStationArrivalTracker {
 
     static final long DEFAULT_RETAIN_MS = 10_000L;
+    static final String KEY_PREFIX = "station_recent_arrival_";
 
-    private final long retainMs;
-    private final LongSupplier nowSupplier;
-    private final Map<Long, Long> arrivalTimeMap = new ConcurrentHashMap<>();
-    private final AtomicLong lastCleanupAt = new AtomicLong(0L);
+    private final RedisUtil redisUtil;
+    private final long retainSeconds;
+    private final Map<Integer, Integer> lastLoadedTaskByStation = new ConcurrentHashMap<>();
 
-    public RecentStationArrivalTracker() {
-        this(DEFAULT_RETAIN_MS, System::currentTimeMillis);
+    public RecentStationArrivalTracker(RedisUtil redisUtil) {
+        this(redisUtil, DEFAULT_RETAIN_MS);
     }
 
-    public RecentStationArrivalTracker(long retainMs, LongSupplier nowSupplier) {
-        this.retainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs;
-        this.nowSupplier = nowSupplier == null ? System::currentTimeMillis : nowSupplier;
+    public RecentStationArrivalTracker(RedisUtil redisUtil, long retainMs) {
+        this.redisUtil = redisUtil;
+        long effectiveRetainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs;
+        this.retainSeconds = Math.max(1L, (long) Math.ceil(effectiveRetainMs / 1000d));
     }
 
     public void observe(Integer stationId, Integer taskNo, boolean loading) {
-        if (!loading) {
+        if (stationId == null || stationId <= 0) {
             return;
         }
-        record(stationId, taskNo);
-    }
-
-    public void record(Integer stationId, Integer taskNo) {
-        Long key = buildKey(stationId, taskNo);
-        if (key == null) {
+        if (!loading || taskNo == null || taskNo <= 0) {
+            lastLoadedTaskByStation.remove(stationId);
             return;
         }
-        long now = nowSupplier.getAsLong();
-        arrivalTimeMap.put(key, now);
-        cleanupIfNeeded(now);
+        Integer previousTaskNo = lastLoadedTaskByStation.put(stationId, taskNo);
+        if (Objects.equals(previousTaskNo, taskNo) || redisUtil == null) {
+            return;
+        }
+        redisUtil.set(buildKey(stationId, taskNo), "1", retainSeconds);
     }
 
     public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
-        Long key = buildKey(stationId, taskNo);
-        if (key == null) {
+        if (redisUtil == null) {
             return false;
         }
-        long now = nowSupplier.getAsLong();
-        Long arrivalAt = arrivalTimeMap.get(key);
-        if (arrivalAt == null) {
-            cleanupIfNeeded(now);
-            return false;
-        }
-        if (now - arrivalAt > retainMs) {
-            arrivalTimeMap.remove(key, arrivalAt);
-            cleanupIfNeeded(now);
-            return false;
-        }
-        cleanupIfNeeded(now);
-        return true;
+        String key = buildKey(stationId, taskNo);
+        return key != null && redisUtil.get(key) != null;
     }
 
-    private void cleanupIfNeeded(long now) {
-        long lastCleanup = lastCleanupAt.get();
-        if (now - lastCleanup < retainMs || !lastCleanupAt.compareAndSet(lastCleanup, now)) {
-            return;
-        }
-        for (Map.Entry<Long, Long> entry : arrivalTimeMap.entrySet()) {
-            if (entry == null || now - entry.getValue() <= retainMs) {
-                continue;
-            }
-            arrivalTimeMap.remove(entry.getKey(), entry.getValue());
-        }
-    }
-
-    private Long buildKey(Integer stationId, Integer taskNo) {
+    private String buildKey(Integer stationId, Integer taskNo) {
         if (stationId == null || stationId <= 0 || taskNo == null || taskNo <= 0) {
             return null;
         }
-        return (((long) stationId) << 32) | (taskNo.longValue() & 0xffffffffL);
+        return KEY_PREFIX + stationId + "_" + taskNo;
     }
 }
diff --git a/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java b/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
index 5cfe204..96b85ad 100644
--- a/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
+++ b/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
@@ -1,25 +1,44 @@
 package com.zy.core.thread.support;
 
+import com.zy.common.utils.RedisUtil;
 import org.junit.jupiter.api.Test;
-
-import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 class RecentStationArrivalTrackerTest {
 
     @Test
-    void hasRecentArrival_returnsTrueOnlyWithinRetentionWindow() {
-        AtomicLong now = new AtomicLong(1_000L);
-        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(1_000L, now::get);
+    void observe_writesRedisKeyOnlyOnFirstSightingOfLoadedTask() {
+        RedisUtil redisUtil = mock(RedisUtil.class);
+        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L);
 
-        tracker.record(12, 1001);
+        tracker.observe(12, 1001, true);
+        tracker.observe(12, 1001, true);
+
+        verify(redisUtil).set(eq("station_recent_arrival_12_1001"), eq("1"), eq(10L));
+    }
+
+    @Test
+    void hasRecentArrival_returnsFalseAfterRedisKeyExpires() {
+        RedisUtil redisUtil = mock(RedisUtil.class);
+        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L);
+
+        when(redisUtil.get("station_recent_arrival_12_1001")).thenReturn(null);
+        assertFalse(tracker.hasRecentArrival(12, 1001));
+    }
+
+    @Test
+    void hasRecentArrival_returnsTrueWhenRedisKeyExists() {
+        RedisUtil redisUtil = mock(RedisUtil.class);
+        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L);
+
+        when(redisUtil.get("station_recent_arrival_12_1001")).thenReturn("1");
 
         assertTrue(tracker.hasRecentArrival(12, 1001));
-
-        now.addAndGet(1_001L);
-
-        assertFalse(tracker.hasRecentArrival(12, 1001));
     }
 }

--
Gitblit v1.9.1