From 5d68f36fb16c07ea5459a167c9711f681c2f71b2 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期五, 27 三月 2026 17:10:25 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java                  |    8 +
 src/main/java/com/zy/core/thread/StationThread.java                           |    4 
 src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java      |  143 ++++++++++++++++++++++++++++
 src/main/java/com/zy/core/thread/impl/ZyStationThread.java                    |    8 +
 src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java                  |    8 +
 src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java                  |    8 +
 src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java     |   82 ++++++++++++++++
 src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java |   25 +++++
 src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java          |    8 
 9 files changed, 290 insertions(+), 4 deletions(-)

diff --git a/src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java b/src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java
index 9740c55..cf33ff0 100644
--- a/src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java
+++ b/src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java
@@ -66,10 +66,10 @@
             }
             Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
             StationProtocol stationProtocol = statusMap == null ? null : statusMap.get(basStation.getStationId());
-            if (stationProtocol == null) {
-                continue;
-            }
-            if (!wrkMast.getWrkNo().equals(stationProtocol.getTaskNo()) || !stationProtocol.isLoading()) {
+            boolean arrived = stationProtocol != null
+                    && wrkMast.getWrkNo().equals(stationProtocol.getTaskNo())
+                    && stationProtocol.isLoading();
+            if (!arrived && !stationThread.hasRecentArrival(basStation.getStationId(), wrkMast.getWrkNo())) {
                 continue;
             }
             boolean updated = wrkAnalysisService.completeInboundStationRun(wrkMast, new Date());
diff --git a/src/main/java/com/zy/core/thread/StationThread.java b/src/main/java/com/zy/core/thread/StationThread.java
index c1ca9d4..3586470 100644
--- a/src/main/java/com/zy/core/thread/StationThread.java
+++ b/src/main/java/com/zy/core/thread/StationThread.java
@@ -14,6 +14,10 @@
 
     Map<Integer, StationProtocol> getStatusMap();
 
+    default boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+        return false;
+    }
+
     StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize);
 
     default StationCommand getCommand(StationCommandType commandType,
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationThread.java b/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
index 249ebbc..707c0fa 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
@@ -8,6 +8,7 @@
 import com.zy.asrs.utils.Utils;
 import com.zy.core.network.DeviceConnectPool;
 import com.zy.core.thread.StationThread;
+import com.zy.core.thread.support.RecentStationArrivalTracker;
 import com.alibaba.fastjson.JSON;
 import com.core.common.DateUtils;
 import com.core.common.SpringUtils;
@@ -47,6 +48,7 @@
     private ZyStationConnectDriver zyStationConnectDriver;
     private int deviceLogCollectTime = 200;
     private long deviceDataLogTime = System.currentTimeMillis();
+    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
 
     public ZyStationThread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
@@ -142,6 +144,7 @@
                     stationProtocol.setRunBlock(statusEntity.isRunBlock());
                     stationProtocol.setEnableIn(statusEntity.isEnableIn());
                     stationProtocol.setWeight(statusEntity.getWeight());
+                    recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
                 }
 
                 if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -192,6 +195,11 @@
     }
 
     @Override
+    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+        return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
+    }
+
+    @Override
     public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
         StationCommand stationCommand = new StationCommand();
         stationCommand.setTaskNo(taskNo);
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
index 9bf0843..3e2e1f3 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
@@ -30,6 +30,7 @@
 import com.zy.core.network.DeviceConnectPool;
 import com.zy.core.network.ZyStationConnectDriver;
 import com.zy.core.network.entity.ZyStationStatusEntity;
+import com.zy.core.thread.support.RecentStationArrivalTracker;
 import com.zy.core.utils.DeviceLogRedisKeyBuilder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -54,6 +55,7 @@
     private int deviceLogCollectTime = 200;
     private long deviceDataLogTime = System.currentTimeMillis();
     private ExecutorService executor = Executors.newFixedThreadPool(9999);
+    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
 
     public ZyStationV3Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
@@ -148,6 +150,7 @@
                     stationProtocol.setRunBlock(statusEntity.isRunBlock());
                     stationProtocol.setEnableIn(statusEntity.isEnableIn());
                     stationProtocol.setWeight(statusEntity.getWeight());
+                    recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
                 }
 
                 if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -208,6 +211,11 @@
     }
 
     @Override
+    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+        return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
+    }
+
+    @Override
     public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
         return getCommand(commandType, taskNo, stationId, targetStationId, palletSize, null);
     }
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
index dcd7ae6..9921b26 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
@@ -31,6 +31,7 @@
 import com.zy.core.network.ZyStationConnectDriver;
 import com.zy.core.network.entity.ZyStationStatusEntity;
 import com.zy.core.thread.impl.v5.StationMoveSegmentExecutor;
+import com.zy.core.thread.support.RecentStationArrivalTracker;
 import com.zy.core.utils.DeviceLogRedisKeyBuilder;
 import com.zy.system.entity.Config;
 import com.zy.system.service.ConfigService;
@@ -57,6 +58,7 @@
     private boolean initStatus = false;
     private long deviceDataLogTime = System.currentTimeMillis();
     private ExecutorService executor = Executors.newFixedThreadPool(9999);
+    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
 
     public ZyStationV4Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
@@ -157,6 +159,7 @@
                     stationProtocol.setWeight(statusEntity.getWeight());
                     stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
                     stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
+                    recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
                 }
 
                 if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -217,6 +220,11 @@
     }
 
     @Override
+    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+        return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
+    }
+
+    @Override
     public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
         return getCommand(commandType, taskNo, stationId, targetStationId, palletSize, null);
     }
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
index 6f633a9..84636e9 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -34,6 +34,7 @@
 import com.zy.core.network.entity.ZyStationStatusEntity;
 import com.zy.core.service.StationTaskLoopService;
 import com.zy.core.thread.impl.v5.StationV5SegmentExecutor;
+import com.zy.core.thread.support.RecentStationArrivalTracker;
 import com.zy.core.trace.StationTaskTraceRegistry;
 import com.zy.core.utils.DeviceLogRedisKeyBuilder;
 import lombok.Data;
@@ -73,6 +74,7 @@
     private long deviceDataLogTime = System.currentTimeMillis();
     private ExecutorService executor = Executors.newFixedThreadPool(9999);
     private StationV5SegmentExecutor segmentExecutor;
+    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
 
     public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
         this.deviceConfig = deviceConfig;
@@ -173,6 +175,7 @@
                     stationProtocol.setWeight(statusEntity.getWeight());
                     stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
                     stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
+                    recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
                 }
 
                 if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -235,6 +238,11 @@
     }
 
     @Override
+    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+        return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
+    }
+
+    @Override
     public StationCommand getCommand(StationCommandType commandType,
                                      Integer taskNo,
                                      Integer stationId,
diff --git a/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java b/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
new file mode 100644
index 0000000..a153465
--- /dev/null
+++ b/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
@@ -0,0 +1,82 @@
+package com.zy.core.thread.support;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+
+public class RecentStationArrivalTracker {
+
+    static final long DEFAULT_RETAIN_MS = 10_000L;
+
+    private final long retainMs;
+    private final LongSupplier nowSupplier;
+    private final Map<Long, Long> arrivalTimeMap = new ConcurrentHashMap<>();
+    private final AtomicLong lastCleanupAt = new AtomicLong(0L);
+
+    public RecentStationArrivalTracker() {
+        this(DEFAULT_RETAIN_MS, System::currentTimeMillis);
+    }
+
+    public RecentStationArrivalTracker(long retainMs, LongSupplier nowSupplier) {
+        this.retainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs;
+        this.nowSupplier = nowSupplier == null ? System::currentTimeMillis : nowSupplier;
+    }
+
+    public void observe(Integer stationId, Integer taskNo, boolean loading) {
+        if (!loading) {
+            return;
+        }
+        record(stationId, taskNo);
+    }
+
+    public void record(Integer stationId, Integer taskNo) {
+        Long key = buildKey(stationId, taskNo);
+        if (key == null) {
+            return;
+        }
+        long now = nowSupplier.getAsLong();
+        arrivalTimeMap.put(key, now);
+        cleanupIfNeeded(now);
+    }
+
+    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+        Long key = buildKey(stationId, taskNo);
+        if (key == null) {
+            return false;
+        }
+        long now = nowSupplier.getAsLong();
+        Long arrivalAt = arrivalTimeMap.get(key);
+        if (arrivalAt == null) {
+            cleanupIfNeeded(now);
+            return false;
+        }
+        if (now - arrivalAt > retainMs) {
+            arrivalTimeMap.remove(key, arrivalAt);
+            cleanupIfNeeded(now);
+            return false;
+        }
+        cleanupIfNeeded(now);
+        return true;
+    }
+
+    private void cleanupIfNeeded(long now) {
+        long lastCleanup = lastCleanupAt.get();
+        if (now - lastCleanup < retainMs || !lastCleanupAt.compareAndSet(lastCleanup, now)) {
+            return;
+        }
+        for (Map.Entry<Long, Long> entry : arrivalTimeMap.entrySet()) {
+            if (entry == null || now - entry.getValue() <= retainMs) {
+                continue;
+            }
+            arrivalTimeMap.remove(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private Long buildKey(Integer stationId, Integer taskNo) {
+        if (stationId == null || stationId <= 0 || taskNo == null || taskNo <= 0) {
+            return null;
+        }
+        return (((long) stationId) << 32) | (taskNo.longValue() & 0xffffffffL);
+    }
+}
diff --git a/src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java b/src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java
new file mode 100644
index 0000000..076d862
--- /dev/null
+++ b/src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java
@@ -0,0 +1,143 @@
+package com.zy.asrs.task;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.zy.asrs.entity.BasStation;
+import com.zy.asrs.entity.WrkMast;
+import com.zy.asrs.service.BasStationService;
+import com.zy.asrs.service.WrkAnalysisService;
+import com.zy.asrs.service.WrkMastService;
+import com.zy.core.cache.SlaveConnection;
+import com.zy.core.enums.SlaveType;
+import com.zy.core.enums.StationCommandType;
+import com.zy.core.enums.WrkStsType;
+import com.zy.core.model.CommandResponse;
+import com.zy.core.model.command.StationCommand;
+import com.zy.core.model.protocol.StationProtocol;
+import com.zy.core.thread.StationThread;
+import com.zy.core.utils.StationOperateProcessUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class WrkAnalysisStationArrivalScannerTest {
+
+    @Test
+    void scanInboundStationArrival_completesTaskWhenRecentArrivalWasObserved() {
+        WrkMastService wrkMastService = mock(WrkMastService.class);
+        BasStationService basStationService = mock(BasStationService.class);
+        WrkAnalysisService wrkAnalysisService = mock(WrkAnalysisService.class);
+        StationOperateProcessUtils stationOperateProcessUtils = mock(StationOperateProcessUtils.class);
+
+        WrkAnalysisStationArrivalScanner scanner = new WrkAnalysisStationArrivalScanner(
+                wrkMastService,
+                basStationService,
+                wrkAnalysisService,
+                stationOperateProcessUtils
+        );
+
+        WrkMast wrkMast = new WrkMast();
+        wrkMast.setWrkNo(1001);
+        wrkMast.setIoType(1);
+        wrkMast.setWrkSts(WrkStsType.INBOUND_STATION_RUN.sts);
+        wrkMast.setStaNo(12);
+
+        BasStation basStation = new BasStation();
+        basStation.setStationId(12);
+        basStation.setDeviceNo(3);
+
+        when(wrkMastService.list(any(QueryWrapper.class))).thenReturn(List.of(wrkMast));
+        when(basStationService.getOne(any())).thenReturn(basStation);
+        when(wrkAnalysisService.completeInboundStationRun(any(WrkMast.class), any(Date.class))).thenReturn(true);
+
+        ArrivalAwareStationThread stationThread = new ArrivalAwareStationThread(true);
+        StationProtocol stationProtocol = new StationProtocol();
+        stationProtocol.setStationId(12);
+        stationProtocol.setTaskNo(0);
+        stationProtocol.setLoading(false);
+        stationThread.putStatus(stationProtocol);
+
+        SlaveConnection.put(SlaveType.Devp, 3, stationThread);
+        try {
+            scanner.scanInboundStationArrival();
+        } finally {
+            SlaveConnection.remove(SlaveType.Devp, 3);
+        }
+
+        verify(wrkAnalysisService).completeInboundStationRun(any(WrkMast.class), any(Date.class));
+    }
+
+    private static class ArrivalAwareStationThread implements StationThread {
+
+        private final boolean recentArrival;
+        private final Map<Integer, StationProtocol> statusMap = new HashMap<>();
+
+        private ArrivalAwareStationThread(boolean recentArrival) {
+            this.recentArrival = recentArrival;
+        }
+
+        private void putStatus(StationProtocol stationProtocol) {
+            statusMap.put(stationProtocol.getStationId(), stationProtocol);
+        }
+
+        @Override
+        public List<StationProtocol> getStatus() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public Map<Integer, StationProtocol> getStatusMap() {
+            return statusMap;
+        }
+
+        public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+            return recentArrival;
+        }
+
+        @Override
+        public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
+            return null;
+        }
+
+        @Override
+        public boolean clearPath(Integer taskNo) {
+            return false;
+        }
+
+        @Override
+        public CommandResponse sendCommand(StationCommand command) {
+            return null;
+        }
+
+        @Override
+        public CommandResponse sendOriginCommand(String address, short[] data) {
+            return null;
+        }
+
+        @Override
+        public byte[] readOriginCommand(String address, int length) {
+            return new byte[0];
+        }
+
+        @Override
+        public void run() {
+        }
+
+        @Override
+        public boolean connect() {
+            return true;
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}
diff --git a/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java b/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
new file mode 100644
index 0000000..5cfe204
--- /dev/null
+++ b/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
@@ -0,0 +1,25 @@
+package com.zy.core.thread.support;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class RecentStationArrivalTrackerTest {
+
+    @Test
+    void hasRecentArrival_returnsTrueOnlyWithinRetentionWindow() {
+        AtomicLong now = new AtomicLong(1_000L);
+        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(1_000L, now::get);
+
+        tracker.record(12, 1001);
+
+        assertTrue(tracker.hasRecentArrival(12, 1001));
+
+        now.addAndGet(1_001L);
+
+        assertFalse(tracker.hasRecentArrival(12, 1001));
+    }
+}

--
Gitblit v1.9.1