From 5d68f36fb16c07ea5459a167c9711f681c2f71b2 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期五, 27 三月 2026 17:10:25 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java | 8 +
src/main/java/com/zy/core/thread/StationThread.java | 4
src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java | 143 ++++++++++++++++++++++++++++
src/main/java/com/zy/core/thread/impl/ZyStationThread.java | 8 +
src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java | 8 +
src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java | 8 +
src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java | 82 ++++++++++++++++
src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java | 25 +++++
src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java | 8
9 files changed, 290 insertions(+), 4 deletions(-)
diff --git a/src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java b/src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java
index 9740c55..cf33ff0 100644
--- a/src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java
+++ b/src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java
@@ -66,10 +66,10 @@
}
Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
StationProtocol stationProtocol = statusMap == null ? null : statusMap.get(basStation.getStationId());
- if (stationProtocol == null) {
- continue;
- }
- if (!wrkMast.getWrkNo().equals(stationProtocol.getTaskNo()) || !stationProtocol.isLoading()) {
+ boolean arrived = stationProtocol != null
+ && wrkMast.getWrkNo().equals(stationProtocol.getTaskNo())
+ && stationProtocol.isLoading();
+ if (!arrived && !stationThread.hasRecentArrival(basStation.getStationId(), wrkMast.getWrkNo())) {
continue;
}
boolean updated = wrkAnalysisService.completeInboundStationRun(wrkMast, new Date());
diff --git a/src/main/java/com/zy/core/thread/StationThread.java b/src/main/java/com/zy/core/thread/StationThread.java
index c1ca9d4..3586470 100644
--- a/src/main/java/com/zy/core/thread/StationThread.java
+++ b/src/main/java/com/zy/core/thread/StationThread.java
@@ -14,6 +14,10 @@
Map<Integer, StationProtocol> getStatusMap();
+ default boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+ return false;
+ }
+
StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize);
default StationCommand getCommand(StationCommandType commandType,
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationThread.java b/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
index 249ebbc..707c0fa 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationThread.java
@@ -8,6 +8,7 @@
import com.zy.asrs.utils.Utils;
import com.zy.core.network.DeviceConnectPool;
import com.zy.core.thread.StationThread;
+import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.alibaba.fastjson.JSON;
import com.core.common.DateUtils;
import com.core.common.SpringUtils;
@@ -47,6 +48,7 @@
private ZyStationConnectDriver zyStationConnectDriver;
private int deviceLogCollectTime = 200;
private long deviceDataLogTime = System.currentTimeMillis();
+ private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
public ZyStationThread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
@@ -142,6 +144,7 @@
stationProtocol.setRunBlock(statusEntity.isRunBlock());
stationProtocol.setEnableIn(statusEntity.isEnableIn());
stationProtocol.setWeight(statusEntity.getWeight());
+ recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
}
if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -192,6 +195,11 @@
}
@Override
+ public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+ return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
+ }
+
+ @Override
public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
StationCommand stationCommand = new StationCommand();
stationCommand.setTaskNo(taskNo);
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
index 9bf0843..3e2e1f3 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
@@ -30,6 +30,7 @@
import com.zy.core.network.DeviceConnectPool;
import com.zy.core.network.ZyStationConnectDriver;
import com.zy.core.network.entity.ZyStationStatusEntity;
+import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.zy.core.utils.DeviceLogRedisKeyBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -54,6 +55,7 @@
private int deviceLogCollectTime = 200;
private long deviceDataLogTime = System.currentTimeMillis();
private ExecutorService executor = Executors.newFixedThreadPool(9999);
+ private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
public ZyStationV3Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
@@ -148,6 +150,7 @@
stationProtocol.setRunBlock(statusEntity.isRunBlock());
stationProtocol.setEnableIn(statusEntity.isEnableIn());
stationProtocol.setWeight(statusEntity.getWeight());
+ recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
}
if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -208,6 +211,11 @@
}
@Override
+ public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+ return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
+ }
+
+ @Override
public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
return getCommand(commandType, taskNo, stationId, targetStationId, palletSize, null);
}
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
index dcd7ae6..9921b26 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
@@ -31,6 +31,7 @@
import com.zy.core.network.ZyStationConnectDriver;
import com.zy.core.network.entity.ZyStationStatusEntity;
import com.zy.core.thread.impl.v5.StationMoveSegmentExecutor;
+import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.zy.core.utils.DeviceLogRedisKeyBuilder;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
@@ -57,6 +58,7 @@
private boolean initStatus = false;
private long deviceDataLogTime = System.currentTimeMillis();
private ExecutorService executor = Executors.newFixedThreadPool(9999);
+ private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
public ZyStationV4Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
@@ -157,6 +159,7 @@
stationProtocol.setWeight(statusEntity.getWeight());
stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
+ recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
}
if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -217,6 +220,11 @@
}
@Override
+ public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+ return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
+ }
+
+ @Override
public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
return getCommand(commandType, taskNo, stationId, targetStationId, palletSize, null);
}
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
index 6f633a9..84636e9 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -34,6 +34,7 @@
import com.zy.core.network.entity.ZyStationStatusEntity;
import com.zy.core.service.StationTaskLoopService;
import com.zy.core.thread.impl.v5.StationV5SegmentExecutor;
+import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.zy.core.trace.StationTaskTraceRegistry;
import com.zy.core.utils.DeviceLogRedisKeyBuilder;
import lombok.Data;
@@ -73,6 +74,7 @@
private long deviceDataLogTime = System.currentTimeMillis();
private ExecutorService executor = Executors.newFixedThreadPool(9999);
private StationV5SegmentExecutor segmentExecutor;
+ private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
@@ -173,6 +175,7 @@
stationProtocol.setWeight(statusEntity.getWeight());
stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
+ recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
}
if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -235,6 +238,11 @@
}
@Override
+ public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+ return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
+ }
+
+ @Override
public StationCommand getCommand(StationCommandType commandType,
Integer taskNo,
Integer stationId,
diff --git a/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java b/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
new file mode 100644
index 0000000..a153465
--- /dev/null
+++ b/src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
@@ -0,0 +1,82 @@
+package com.zy.core.thread.support;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+
+public class RecentStationArrivalTracker {
+
+ static final long DEFAULT_RETAIN_MS = 10_000L;
+
+ private final long retainMs;
+ private final LongSupplier nowSupplier;
+ private final Map<Long, Long> arrivalTimeMap = new ConcurrentHashMap<>();
+ private final AtomicLong lastCleanupAt = new AtomicLong(0L);
+
+ public RecentStationArrivalTracker() {
+ this(DEFAULT_RETAIN_MS, System::currentTimeMillis);
+ }
+
+ public RecentStationArrivalTracker(long retainMs, LongSupplier nowSupplier) {
+ this.retainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs;
+ this.nowSupplier = nowSupplier == null ? System::currentTimeMillis : nowSupplier;
+ }
+
+ public void observe(Integer stationId, Integer taskNo, boolean loading) {
+ if (!loading) {
+ return;
+ }
+ record(stationId, taskNo);
+ }
+
+ public void record(Integer stationId, Integer taskNo) {
+ Long key = buildKey(stationId, taskNo);
+ if (key == null) {
+ return;
+ }
+ long now = nowSupplier.getAsLong();
+ arrivalTimeMap.put(key, now);
+ cleanupIfNeeded(now);
+ }
+
+ public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+ Long key = buildKey(stationId, taskNo);
+ if (key == null) {
+ return false;
+ }
+ long now = nowSupplier.getAsLong();
+ Long arrivalAt = arrivalTimeMap.get(key);
+ if (arrivalAt == null) {
+ cleanupIfNeeded(now);
+ return false;
+ }
+ if (now - arrivalAt > retainMs) {
+ arrivalTimeMap.remove(key, arrivalAt);
+ cleanupIfNeeded(now);
+ return false;
+ }
+ cleanupIfNeeded(now);
+ return true;
+ }
+
+ private void cleanupIfNeeded(long now) {
+ long lastCleanup = lastCleanupAt.get();
+ if (now - lastCleanup < retainMs || !lastCleanupAt.compareAndSet(lastCleanup, now)) {
+ return;
+ }
+ for (Map.Entry<Long, Long> entry : arrivalTimeMap.entrySet()) {
+ if (entry == null || now - entry.getValue() <= retainMs) {
+ continue;
+ }
+ arrivalTimeMap.remove(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private Long buildKey(Integer stationId, Integer taskNo) {
+ if (stationId == null || stationId <= 0 || taskNo == null || taskNo <= 0) {
+ return null;
+ }
+ return (((long) stationId) << 32) | (taskNo.longValue() & 0xffffffffL);
+ }
+}
diff --git a/src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java b/src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java
new file mode 100644
index 0000000..076d862
--- /dev/null
+++ b/src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java
@@ -0,0 +1,143 @@
+package com.zy.asrs.task;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.zy.asrs.entity.BasStation;
+import com.zy.asrs.entity.WrkMast;
+import com.zy.asrs.service.BasStationService;
+import com.zy.asrs.service.WrkAnalysisService;
+import com.zy.asrs.service.WrkMastService;
+import com.zy.core.cache.SlaveConnection;
+import com.zy.core.enums.SlaveType;
+import com.zy.core.enums.StationCommandType;
+import com.zy.core.enums.WrkStsType;
+import com.zy.core.model.CommandResponse;
+import com.zy.core.model.command.StationCommand;
+import com.zy.core.model.protocol.StationProtocol;
+import com.zy.core.thread.StationThread;
+import com.zy.core.utils.StationOperateProcessUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class WrkAnalysisStationArrivalScannerTest {
+
+ @Test
+ void scanInboundStationArrival_completesTaskWhenRecentArrivalWasObserved() {
+ WrkMastService wrkMastService = mock(WrkMastService.class);
+ BasStationService basStationService = mock(BasStationService.class);
+ WrkAnalysisService wrkAnalysisService = mock(WrkAnalysisService.class);
+ StationOperateProcessUtils stationOperateProcessUtils = mock(StationOperateProcessUtils.class);
+
+ WrkAnalysisStationArrivalScanner scanner = new WrkAnalysisStationArrivalScanner(
+ wrkMastService,
+ basStationService,
+ wrkAnalysisService,
+ stationOperateProcessUtils
+ );
+
+ WrkMast wrkMast = new WrkMast();
+ wrkMast.setWrkNo(1001);
+ wrkMast.setIoType(1);
+ wrkMast.setWrkSts(WrkStsType.INBOUND_STATION_RUN.sts);
+ wrkMast.setStaNo(12);
+
+ BasStation basStation = new BasStation();
+ basStation.setStationId(12);
+ basStation.setDeviceNo(3);
+
+ when(wrkMastService.list(any(QueryWrapper.class))).thenReturn(List.of(wrkMast));
+ when(basStationService.getOne(any())).thenReturn(basStation);
+ when(wrkAnalysisService.completeInboundStationRun(any(WrkMast.class), any(Date.class))).thenReturn(true);
+
+ ArrivalAwareStationThread stationThread = new ArrivalAwareStationThread(true);
+ StationProtocol stationProtocol = new StationProtocol();
+ stationProtocol.setStationId(12);
+ stationProtocol.setTaskNo(0);
+ stationProtocol.setLoading(false);
+ stationThread.putStatus(stationProtocol);
+
+ SlaveConnection.put(SlaveType.Devp, 3, stationThread);
+ try {
+ scanner.scanInboundStationArrival();
+ } finally {
+ SlaveConnection.remove(SlaveType.Devp, 3);
+ }
+
+ verify(wrkAnalysisService).completeInboundStationRun(any(WrkMast.class), any(Date.class));
+ }
+
+ private static class ArrivalAwareStationThread implements StationThread {
+
+ private final boolean recentArrival;
+ private final Map<Integer, StationProtocol> statusMap = new HashMap<>();
+
+ private ArrivalAwareStationThread(boolean recentArrival) {
+ this.recentArrival = recentArrival;
+ }
+
+ private void putStatus(StationProtocol stationProtocol) {
+ statusMap.put(stationProtocol.getStationId(), stationProtocol);
+ }
+
+ @Override
+ public List<StationProtocol> getStatus() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Map<Integer, StationProtocol> getStatusMap() {
+ return statusMap;
+ }
+
+ public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
+ return recentArrival;
+ }
+
+ @Override
+ public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
+ return null;
+ }
+
+ @Override
+ public boolean clearPath(Integer taskNo) {
+ return false;
+ }
+
+ @Override
+ public CommandResponse sendCommand(StationCommand command) {
+ return null;
+ }
+
+ @Override
+ public CommandResponse sendOriginCommand(String address, short[] data) {
+ return null;
+ }
+
+ @Override
+ public byte[] readOriginCommand(String address, int length) {
+ return new byte[0];
+ }
+
+ @Override
+ public void run() {
+ }
+
+ @Override
+ public boolean connect() {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
diff --git a/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java b/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
new file mode 100644
index 0000000..5cfe204
--- /dev/null
+++ b/src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
@@ -0,0 +1,25 @@
+package com.zy.core.thread.support;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class RecentStationArrivalTrackerTest {
+
+ @Test
+ void hasRecentArrival_returnsTrueOnlyWithinRetentionWindow() {
+ AtomicLong now = new AtomicLong(1_000L);
+ RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(1_000L, now::get);
+
+ tracker.record(12, 1001);
+
+ assertTrue(tracker.hasRecentArrival(12, 1001));
+
+ now.addAndGet(1_001L);
+
+ assertFalse(tracker.hasRecentArrival(12, 1001));
+ }
+}
--
Gitblit v1.9.1