From cbb00d4729243e4949b3c921fc2f94cb03ca8aaa Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期五, 27 三月 2026 18:47:43 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java | 3
src/main/java/com/zy/core/thread/impl/ZyStationThread.java | 3
src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java | 3
src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java | 3
src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java | 77 ++++++++-----------------
src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java | 39 +++++++++---
6 files changed, 63 insertions(+), 65 deletions(-)
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationThread.java b/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
index 707c0fa..8b42cdd 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
@@ -48,11 +48,12 @@
private ZyStationConnectDriver zyStationConnectDriver;
private int deviceLogCollectTime = 200;
private long deviceDataLogTime = System.currentTimeMillis();
- private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
+ private final RecentStationArrivalTracker recentArrivalTracker;
public ZyStationThread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
this.redisUtil = redisUtil;
+ this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
}
@Override
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
index 3e2e1f3..6accb28 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
@@ -55,11 +55,12 @@
private int deviceLogCollectTime = 200;
private long deviceDataLogTime = System.currentTimeMillis();
private ExecutorService executor = Executors.newFixedThreadPool(9999);
- private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
+ private final RecentStationArrivalTracker recentArrivalTracker;
public ZyStationV3Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
this.redisUtil = redisUtil;
+ this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
}
@Override
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
index 9921b26..46cb90d 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
@@ -58,11 +58,12 @@
private boolean initStatus = false;
private long deviceDataLogTime = System.currentTimeMillis();
private ExecutorService executor = Executors.newFixedThreadPool(9999);
- private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
+ private final RecentStationArrivalTracker recentArrivalTracker;
public ZyStationV4Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
this.redisUtil = redisUtil;
+ this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
this.segmentExecutor = new StationMoveSegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
}
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
index 84636e9..1e8f4c4 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -74,11 +74,12 @@
private long deviceDataLogTime = System.currentTimeMillis();
private ExecutorService executor = Executors.newFixedThreadPool(9999);
private StationV5SegmentExecutor segmentExecutor;
- private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
+ private final RecentStationArrivalTracker recentArrivalTracker;
public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
this.redisUtil = redisUtil;
+ this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
}
diff --git a/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java b/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
index a153465..193e43d 100644
--- a/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
+++ b/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
@@ -1,82 +1,57 @@
package com.zy.core.thread.support;
+import com.zy.common.utils.RedisUtil;
+
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.LongSupplier;
public class RecentStationArrivalTracker {
static final long DEFAULT_RETAIN_MS = 10_000L;
+ static final String KEY_PREFIX = "station_recent_arrival_";
- private final long retainMs;
- private final LongSupplier nowSupplier;
- private final Map<Long, Long> arrivalTimeMap = new ConcurrentHashMap<>();
- private final AtomicLong lastCleanupAt = new AtomicLong(0L);
+ private final RedisUtil redisUtil;
+ private final long retainSeconds;
+ private final Map<Integer, Integer> lastLoadedTaskByStation = new ConcurrentHashMap<>();
- public RecentStationArrivalTracker() {
- this(DEFAULT_RETAIN_MS, System::currentTimeMillis);
+ public RecentStationArrivalTracker(RedisUtil redisUtil) {
+ this(redisUtil, DEFAULT_RETAIN_MS);
}
- public RecentStationArrivalTracker(long retainMs, LongSupplier nowSupplier) {
- this.retainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs;
- this.nowSupplier = nowSupplier == null ? System::currentTimeMillis : nowSupplier;
+ public RecentStationArrivalTracker(RedisUtil redisUtil, long retainMs) {
+ this.redisUtil = redisUtil;
+ long effectiveRetainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs;
+ this.retainSeconds = Math.max(1L, (long) Math.ceil(effectiveRetainMs / 1000d));
}
public void observe(Integer stationId, Integer taskNo, boolean loading) {
- if (!loading) {
+ if (stationId == null || stationId <= 0) {
return;
}
- record(stationId, taskNo);
- }
-
- public void record(Integer stationId, Integer taskNo) {
- Long key = buildKey(stationId, taskNo);
- if (key == null) {
+ if (!loading || taskNo == null || taskNo <= 0) {
+ lastLoadedTaskByStation.remove(stationId);
return;
}
- long now = nowSupplier.getAsLong();
- arrivalTimeMap.put(key, now);
- cleanupIfNeeded(now);
+ Integer previousTaskNo = lastLoadedTaskByStation.put(stationId, taskNo);
+ if (Objects.equals(previousTaskNo, taskNo) || redisUtil == null) {
+ return;
+ }
+ redisUtil.set(buildKey(stationId, taskNo), "1", retainSeconds);
}
public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
- Long key = buildKey(stationId, taskNo);
- if (key == null) {
+ if (redisUtil == null) {
return false;
}
- long now = nowSupplier.getAsLong();
- Long arrivalAt = arrivalTimeMap.get(key);
- if (arrivalAt == null) {
- cleanupIfNeeded(now);
- return false;
- }
- if (now - arrivalAt > retainMs) {
- arrivalTimeMap.remove(key, arrivalAt);
- cleanupIfNeeded(now);
- return false;
- }
- cleanupIfNeeded(now);
- return true;
+ String key = buildKey(stationId, taskNo);
+ return key != null && redisUtil.get(key) != null;
}
- private void cleanupIfNeeded(long now) {
- long lastCleanup = lastCleanupAt.get();
- if (now - lastCleanup < retainMs || !lastCleanupAt.compareAndSet(lastCleanup, now)) {
- return;
- }
- for (Map.Entry<Long, Long> entry : arrivalTimeMap.entrySet()) {
- if (entry == null || now - entry.getValue() <= retainMs) {
- continue;
- }
- arrivalTimeMap.remove(entry.getKey(), entry.getValue());
- }
- }
-
- private Long buildKey(Integer stationId, Integer taskNo) {
+ private String buildKey(Integer stationId, Integer taskNo) {
if (stationId == null || stationId <= 0 || taskNo == null || taskNo <= 0) {
return null;
}
- return (((long) stationId) << 32) | (taskNo.longValue() & 0xffffffffL);
+ return KEY_PREFIX + stationId + "_" + taskNo;
}
}
diff --git a/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java b/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
index 5cfe204..96b85ad 100644
--- a/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
+++ b/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
@@ -1,25 +1,44 @@
package com.zy.core.thread.support;
+import com.zy.common.utils.RedisUtil;
import org.junit.jupiter.api.Test;
-
-import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
class RecentStationArrivalTrackerTest {
@Test
- void hasRecentArrival_returnsTrueOnlyWithinRetentionWindow() {
- AtomicLong now = new AtomicLong(1_000L);
- RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(1_000L, now::get);
+ void observe_writesRedisKeyOnlyOnFirstSightingOfLoadedTask() {
+ RedisUtil redisUtil = mock(RedisUtil.class);
+ RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L);
- tracker.record(12, 1001);
+ tracker.observe(12, 1001, true);
+ tracker.observe(12, 1001, true);
+
+ verify(redisUtil).set(eq("station_recent_arrival_12_1001"), eq("1"), eq(10L));
+ }
+
+ @Test
+ void hasRecentArrival_returnsFalseAfterRedisKeyExpires() {
+ RedisUtil redisUtil = mock(RedisUtil.class);
+ RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L);
+
+ when(redisUtil.get("station_recent_arrival_12_1001")).thenReturn(null);
+ assertFalse(tracker.hasRecentArrival(12, 1001));
+ }
+
+ @Test
+ void hasRecentArrival_returnsTrueWhenRedisKeyExists() {
+ RedisUtil redisUtil = mock(RedisUtil.class);
+ RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L);
+
+ when(redisUtil.get("station_recent_arrival_12_1001")).thenReturn("1");
assertTrue(tracker.hasRecentArrival(12, 1001));
-
- now.addAndGet(1_001L);
-
- assertFalse(tracker.hasRecentArrival(12, 1001));
}
}
--
Gitblit v1.9.1