src/main/java/com/zy/core/thread/impl/ZyStationThread.java
@@ -48,11 +48,12 @@ private ZyStationConnectDriver zyStationConnectDriver; private int deviceLogCollectTime = 200; private long deviceDataLogTime = System.currentTimeMillis(); private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker(); private final RecentStationArrivalTracker recentArrivalTracker; public ZyStationThread(DeviceConfig deviceConfig, RedisUtil redisUtil) { this.deviceConfig = deviceConfig; this.redisUtil = redisUtil; this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil); } @Override src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
@@ -55,11 +55,12 @@ private int deviceLogCollectTime = 200; private long deviceDataLogTime = System.currentTimeMillis(); private ExecutorService executor = Executors.newFixedThreadPool(9999); private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker(); private final RecentStationArrivalTracker recentArrivalTracker; public ZyStationV3Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) { this.deviceConfig = deviceConfig; this.redisUtil = redisUtil; this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil); } @Override src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
@@ -58,11 +58,12 @@ private boolean initStatus = false; private long deviceDataLogTime = System.currentTimeMillis(); private ExecutorService executor = Executors.newFixedThreadPool(9999); private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker(); private final RecentStationArrivalTracker recentArrivalTracker; public ZyStationV4Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) { this.deviceConfig = deviceConfig; this.redisUtil = redisUtil; this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil); this.segmentExecutor = new StationMoveSegmentExecutor(deviceConfig, redisUtil, this::sendCommand); } src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -74,11 +74,12 @@ private long deviceDataLogTime = System.currentTimeMillis(); private ExecutorService executor = Executors.newFixedThreadPool(9999); private StationV5SegmentExecutor segmentExecutor; private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker(); private final RecentStationArrivalTracker recentArrivalTracker; public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) { this.deviceConfig = deviceConfig; this.redisUtil = redisUtil; this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil); this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand); } src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
@@ -1,82 +1,57 @@ package com.zy.core.thread.support; import com.zy.common.utils.RedisUtil; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; public class RecentStationArrivalTracker { static final long DEFAULT_RETAIN_MS = 10_000L; static final String KEY_PREFIX = "station_recent_arrival_"; private final long retainMs; private final LongSupplier nowSupplier; private final Map<Long, Long> arrivalTimeMap = new ConcurrentHashMap<>(); private final AtomicLong lastCleanupAt = new AtomicLong(0L); private final RedisUtil redisUtil; private final long retainSeconds; private final Map<Integer, Integer> lastLoadedTaskByStation = new ConcurrentHashMap<>(); public RecentStationArrivalTracker() { this(DEFAULT_RETAIN_MS, System::currentTimeMillis); public RecentStationArrivalTracker(RedisUtil redisUtil) { this(redisUtil, DEFAULT_RETAIN_MS); } public RecentStationArrivalTracker(long retainMs, LongSupplier nowSupplier) { this.retainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs; this.nowSupplier = nowSupplier == null ? System::currentTimeMillis : nowSupplier; public RecentStationArrivalTracker(RedisUtil redisUtil, long retainMs) { this.redisUtil = redisUtil; long effectiveRetainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs; this.retainSeconds = Math.max(1L, (long) Math.ceil(effectiveRetainMs / 1000d)); } public void observe(Integer stationId, Integer taskNo, boolean loading) { if (!loading) { if (stationId == null || stationId <= 0) { return; } record(stationId, taskNo); } public void record(Integer stationId, Integer taskNo) { Long key = buildKey(stationId, taskNo); if (key == null) { if (!loading || taskNo == null || taskNo <= 0) { lastLoadedTaskByStation.remove(stationId); return; } long now = nowSupplier.getAsLong(); arrivalTimeMap.put(key, now); cleanupIfNeeded(now); Integer previousTaskNo = lastLoadedTaskByStation.put(stationId, taskNo); if (Objects.equals(previousTaskNo, taskNo) || redisUtil == null) { return; } redisUtil.set(buildKey(stationId, taskNo), "1", retainSeconds); } public boolean hasRecentArrival(Integer stationId, Integer taskNo) { Long key = buildKey(stationId, taskNo); if (key == null) { if (redisUtil == null) { return false; } long now = nowSupplier.getAsLong(); Long arrivalAt = arrivalTimeMap.get(key); if (arrivalAt == null) { cleanupIfNeeded(now); return false; } if (now - arrivalAt > retainMs) { arrivalTimeMap.remove(key, arrivalAt); cleanupIfNeeded(now); return false; } cleanupIfNeeded(now); return true; String key = buildKey(stationId, taskNo); return key != null && redisUtil.get(key) != null; } private void cleanupIfNeeded(long now) { long lastCleanup = lastCleanupAt.get(); if (now - lastCleanup < retainMs || !lastCleanupAt.compareAndSet(lastCleanup, now)) { return; } for (Map.Entry<Long, Long> entry : arrivalTimeMap.entrySet()) { if (entry == null || now - entry.getValue() <= retainMs) { continue; } arrivalTimeMap.remove(entry.getKey(), entry.getValue()); } } private Long buildKey(Integer stationId, Integer taskNo) { private String buildKey(Integer stationId, Integer taskNo) { if (stationId == null || stationId <= 0 || taskNo == null || taskNo <= 0) { return null; } return (((long) stationId) << 32) | (taskNo.longValue() & 0xffffffffL); return KEY_PREFIX + stationId + "_" + taskNo; } } src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
@@ -1,25 +1,44 @@ package com.zy.core.thread.support; import com.zy.common.utils.RedisUtil; import org.junit.jupiter.api.Test; import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class RecentStationArrivalTrackerTest { @Test void hasRecentArrival_returnsTrueOnlyWithinRetentionWindow() { AtomicLong now = new AtomicLong(1_000L); RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(1_000L, now::get); void observe_writesRedisKeyOnlyOnFirstSightingOfLoadedTask() { RedisUtil redisUtil = mock(RedisUtil.class); RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L); tracker.record(12, 1001); tracker.observe(12, 1001, true); tracker.observe(12, 1001, true); verify(redisUtil).set(eq("station_recent_arrival_12_1001"), eq("1"), eq(10L)); } @Test void hasRecentArrival_returnsFalseAfterRedisKeyExpires() { RedisUtil redisUtil = mock(RedisUtil.class); RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L); when(redisUtil.get("station_recent_arrival_12_1001")).thenReturn(null); assertFalse(tracker.hasRecentArrival(12, 1001)); } @Test void hasRecentArrival_returnsTrueWhenRedisKeyExists() { RedisUtil redisUtil = mock(RedisUtil.class); RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L); when(redisUtil.get("station_recent_arrival_12_1001")).thenReturn("1"); assertTrue(tracker.hasRecentArrival(12, 1001)); now.addAndGet(1_001L); assertFalse(tracker.hasRecentArrival(12, 1001)); } }