#
Junjie
12 小时以前 cbb00d4729243e4949b3c921fc2f94cb03ca8aaa
#
6个文件已修改
128 ■■■■ 已修改文件
src/main/java/com/zy/core/thread/impl/ZyStationThread.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationThread.java
@@ -48,11 +48,12 @@
    private ZyStationConnectDriver zyStationConnectDriver;
    private int deviceLogCollectTime = 200;
    private long deviceDataLogTime = System.currentTimeMillis();
    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
    private final RecentStationArrivalTracker recentArrivalTracker;
    public ZyStationThread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
        this.redisUtil = redisUtil;
        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
    }
    @Override
src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
@@ -55,11 +55,12 @@
    private int deviceLogCollectTime = 200;
    private long deviceDataLogTime = System.currentTimeMillis();
    private ExecutorService executor = Executors.newFixedThreadPool(9999);
    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
    private final RecentStationArrivalTracker recentArrivalTracker;
    public ZyStationV3Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
        this.redisUtil = redisUtil;
        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
    }
    @Override
src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
@@ -58,11 +58,12 @@
    private boolean initStatus = false;
    private long deviceDataLogTime = System.currentTimeMillis();
    private ExecutorService executor = Executors.newFixedThreadPool(9999);
    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
    private final RecentStationArrivalTracker recentArrivalTracker;
    public ZyStationV4Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
        this.redisUtil = redisUtil;
        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
        this.segmentExecutor = new StationMoveSegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
    }
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -74,11 +74,12 @@
    private long deviceDataLogTime = System.currentTimeMillis();
    private ExecutorService executor = Executors.newFixedThreadPool(9999);
    private StationV5SegmentExecutor segmentExecutor;
    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
    private final RecentStationArrivalTracker recentArrivalTracker;
    public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
        this.redisUtil = redisUtil;
        this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
        this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
    }
src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
@@ -1,82 +1,57 @@
package com.zy.core.thread.support;
import com.zy.common.utils.RedisUtil;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
public class RecentStationArrivalTracker {
    static final long DEFAULT_RETAIN_MS = 10_000L;
    static final String KEY_PREFIX = "station_recent_arrival_";
    private final long retainMs;
    private final LongSupplier nowSupplier;
    private final Map<Long, Long> arrivalTimeMap = new ConcurrentHashMap<>();
    private final AtomicLong lastCleanupAt = new AtomicLong(0L);
    private final RedisUtil redisUtil;
    private final long retainSeconds;
    private final Map<Integer, Integer> lastLoadedTaskByStation = new ConcurrentHashMap<>();
    public RecentStationArrivalTracker() {
        this(DEFAULT_RETAIN_MS, System::currentTimeMillis);
    public RecentStationArrivalTracker(RedisUtil redisUtil) {
        this(redisUtil, DEFAULT_RETAIN_MS);
    }
    public RecentStationArrivalTracker(long retainMs, LongSupplier nowSupplier) {
        this.retainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs;
        this.nowSupplier = nowSupplier == null ? System::currentTimeMillis : nowSupplier;
    public RecentStationArrivalTracker(RedisUtil redisUtil, long retainMs) {
        this.redisUtil = redisUtil;
        long effectiveRetainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs;
        this.retainSeconds = Math.max(1L, (long) Math.ceil(effectiveRetainMs / 1000d));
    }
    public void observe(Integer stationId, Integer taskNo, boolean loading) {
        if (!loading) {
        if (stationId == null || stationId <= 0) {
            return;
        }
        record(stationId, taskNo);
    }
    public void record(Integer stationId, Integer taskNo) {
        Long key = buildKey(stationId, taskNo);
        if (key == null) {
        if (!loading || taskNo == null || taskNo <= 0) {
            lastLoadedTaskByStation.remove(stationId);
            return;
        }
        long now = nowSupplier.getAsLong();
        arrivalTimeMap.put(key, now);
        cleanupIfNeeded(now);
        Integer previousTaskNo = lastLoadedTaskByStation.put(stationId, taskNo);
        if (Objects.equals(previousTaskNo, taskNo) || redisUtil == null) {
            return;
        }
        redisUtil.set(buildKey(stationId, taskNo), "1", retainSeconds);
    }
    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
        Long key = buildKey(stationId, taskNo);
        if (key == null) {
        if (redisUtil == null) {
            return false;
        }
        long now = nowSupplier.getAsLong();
        Long arrivalAt = arrivalTimeMap.get(key);
        if (arrivalAt == null) {
            cleanupIfNeeded(now);
            return false;
        }
        if (now - arrivalAt > retainMs) {
            arrivalTimeMap.remove(key, arrivalAt);
            cleanupIfNeeded(now);
            return false;
        }
        cleanupIfNeeded(now);
        return true;
        String key = buildKey(stationId, taskNo);
        return key != null && redisUtil.get(key) != null;
    }
    private void cleanupIfNeeded(long now) {
        long lastCleanup = lastCleanupAt.get();
        if (now - lastCleanup < retainMs || !lastCleanupAt.compareAndSet(lastCleanup, now)) {
            return;
        }
        for (Map.Entry<Long, Long> entry : arrivalTimeMap.entrySet()) {
            if (entry == null || now - entry.getValue() <= retainMs) {
                continue;
            }
            arrivalTimeMap.remove(entry.getKey(), entry.getValue());
        }
    }
    private Long buildKey(Integer stationId, Integer taskNo) {
    private String buildKey(Integer stationId, Integer taskNo) {
        if (stationId == null || stationId <= 0 || taskNo == null || taskNo <= 0) {
            return null;
        }
        return (((long) stationId) << 32) | (taskNo.longValue() & 0xffffffffL);
        return KEY_PREFIX + stationId + "_" + taskNo;
    }
}
src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
@@ -1,25 +1,44 @@
package com.zy.core.thread.support;
import com.zy.common.utils.RedisUtil;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class RecentStationArrivalTrackerTest {
    @Test
    void hasRecentArrival_returnsTrueOnlyWithinRetentionWindow() {
        AtomicLong now = new AtomicLong(1_000L);
        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(1_000L, now::get);
    void observe_writesRedisKeyOnlyOnFirstSightingOfLoadedTask() {
        RedisUtil redisUtil = mock(RedisUtil.class);
        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L);
        tracker.record(12, 1001);
        tracker.observe(12, 1001, true);
        tracker.observe(12, 1001, true);
        verify(redisUtil).set(eq("station_recent_arrival_12_1001"), eq("1"), eq(10L));
    }
    @Test
    void hasRecentArrival_returnsFalseAfterRedisKeyExpires() {
        RedisUtil redisUtil = mock(RedisUtil.class);
        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L);
        when(redisUtil.get("station_recent_arrival_12_1001")).thenReturn(null);
        assertFalse(tracker.hasRecentArrival(12, 1001));
    }
    @Test
    void hasRecentArrival_returnsTrueWhenRedisKeyExists() {
        RedisUtil redisUtil = mock(RedisUtil.class);
        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(redisUtil, 10_000L);
        when(redisUtil.get("station_recent_arrival_12_1001")).thenReturn("1");
        assertTrue(tracker.hasRecentArrival(12, 1001));
        now.addAndGet(1_001L);
        assertFalse(tracker.hasRecentArrival(12, 1001));
    }
}