#
Junjie
11 小时以前 5d68f36fb16c07ea5459a167c9711f681c2f71b2
#
3个文件已添加
6个文件已修改
294 ■■■■■ 已修改文件
src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/StationThread.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationThread.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java 143 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/asrs/task/WrkAnalysisStationArrivalScanner.java
@@ -66,10 +66,10 @@
            }
            Map<Integer, StationProtocol> statusMap = stationThread.getStatusMap();
            StationProtocol stationProtocol = statusMap == null ? null : statusMap.get(basStation.getStationId());
            if (stationProtocol == null) {
                continue;
            }
            if (!wrkMast.getWrkNo().equals(stationProtocol.getTaskNo()) || !stationProtocol.isLoading()) {
            boolean arrived = stationProtocol != null
                    && wrkMast.getWrkNo().equals(stationProtocol.getTaskNo())
                    && stationProtocol.isLoading();
            if (!arrived && !stationThread.hasRecentArrival(basStation.getStationId(), wrkMast.getWrkNo())) {
                continue;
            }
            boolean updated = wrkAnalysisService.completeInboundStationRun(wrkMast, new Date());
src/main/java/com/zy/core/thread/StationThread.java
@@ -14,6 +14,10 @@
    Map<Integer, StationProtocol> getStatusMap();
    default boolean hasRecentArrival(Integer stationId, Integer taskNo) {
        return false;
    }
    StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize);
    default StationCommand getCommand(StationCommandType commandType,
src/main/java/com/zy/core/thread/impl/ZyStationThread.java
@@ -8,6 +8,7 @@
import com.zy.asrs.utils.Utils;
import com.zy.core.network.DeviceConnectPool;
import com.zy.core.thread.StationThread;
import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.alibaba.fastjson.JSON;
import com.core.common.DateUtils;
import com.core.common.SpringUtils;
@@ -47,6 +48,7 @@
    private ZyStationConnectDriver zyStationConnectDriver;
    private int deviceLogCollectTime = 200;
    private long deviceDataLogTime = System.currentTimeMillis();
    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
    public ZyStationThread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
@@ -142,6 +144,7 @@
                    stationProtocol.setRunBlock(statusEntity.isRunBlock());
                    stationProtocol.setEnableIn(statusEntity.isEnableIn());
                    stationProtocol.setWeight(statusEntity.getWeight());
                    recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
                }
                if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -192,6 +195,11 @@
    }
    @Override
    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
        return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
    }
    @Override
    public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
        StationCommand stationCommand = new StationCommand();
        stationCommand.setTaskNo(taskNo);
src/main/java/com/zy/core/thread/impl/ZyStationV3Thread.java
@@ -30,6 +30,7 @@
import com.zy.core.network.DeviceConnectPool;
import com.zy.core.network.ZyStationConnectDriver;
import com.zy.core.network.entity.ZyStationStatusEntity;
import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.zy.core.utils.DeviceLogRedisKeyBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -54,6 +55,7 @@
    private int deviceLogCollectTime = 200;
    private long deviceDataLogTime = System.currentTimeMillis();
    private ExecutorService executor = Executors.newFixedThreadPool(9999);
    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
    public ZyStationV3Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
@@ -148,6 +150,7 @@
                    stationProtocol.setRunBlock(statusEntity.isRunBlock());
                    stationProtocol.setEnableIn(statusEntity.isEnableIn());
                    stationProtocol.setWeight(statusEntity.getWeight());
                    recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
                }
                if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -208,6 +211,11 @@
    }
    @Override
    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
        return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
    }
    @Override
    public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
        return getCommand(commandType, taskNo, stationId, targetStationId, palletSize, null);
    }
src/main/java/com/zy/core/thread/impl/ZyStationV4Thread.java
@@ -31,6 +31,7 @@
import com.zy.core.network.ZyStationConnectDriver;
import com.zy.core.network.entity.ZyStationStatusEntity;
import com.zy.core.thread.impl.v5.StationMoveSegmentExecutor;
import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.zy.core.utils.DeviceLogRedisKeyBuilder;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
@@ -57,6 +58,7 @@
    private boolean initStatus = false;
    private long deviceDataLogTime = System.currentTimeMillis();
    private ExecutorService executor = Executors.newFixedThreadPool(9999);
    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
    public ZyStationV4Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
@@ -157,6 +159,7 @@
                    stationProtocol.setWeight(statusEntity.getWeight());
                    stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
                    stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
                    recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
                }
                if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -217,6 +220,11 @@
    }
    @Override
    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
        return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
    }
    @Override
    public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
        return getCommand(commandType, taskNo, stationId, targetStationId, palletSize, null);
    }
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -34,6 +34,7 @@
import com.zy.core.network.entity.ZyStationStatusEntity;
import com.zy.core.service.StationTaskLoopService;
import com.zy.core.thread.impl.v5.StationV5SegmentExecutor;
import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.zy.core.trace.StationTaskTraceRegistry;
import com.zy.core.utils.DeviceLogRedisKeyBuilder;
import lombok.Data;
@@ -73,6 +74,7 @@
    private long deviceDataLogTime = System.currentTimeMillis();
    private ExecutorService executor = Executors.newFixedThreadPool(9999);
    private StationV5SegmentExecutor segmentExecutor;
    private final RecentStationArrivalTracker recentArrivalTracker = new RecentStationArrivalTracker();
    public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
        this.deviceConfig = deviceConfig;
@@ -173,6 +175,7 @@
                    stationProtocol.setWeight(statusEntity.getWeight());
                    stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
                    stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
                    recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
                }
                if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
@@ -235,6 +238,11 @@
    }
    @Override
    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
        return recentArrivalTracker.hasRecentArrival(stationId, taskNo);
    }
    @Override
    public StationCommand getCommand(StationCommandType commandType,
                                     Integer taskNo,
                                     Integer stationId,
src/main/java/com/zy/core/thread/support/RecentStationArrivalTracker.java
New file
@@ -0,0 +1,82 @@
package com.zy.core.thread.support;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
public class RecentStationArrivalTracker {
    static final long DEFAULT_RETAIN_MS = 10_000L;
    private final long retainMs;
    private final LongSupplier nowSupplier;
    private final Map<Long, Long> arrivalTimeMap = new ConcurrentHashMap<>();
    private final AtomicLong lastCleanupAt = new AtomicLong(0L);
    public RecentStationArrivalTracker() {
        this(DEFAULT_RETAIN_MS, System::currentTimeMillis);
    }
    public RecentStationArrivalTracker(long retainMs, LongSupplier nowSupplier) {
        this.retainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs;
        this.nowSupplier = nowSupplier == null ? System::currentTimeMillis : nowSupplier;
    }
    public void observe(Integer stationId, Integer taskNo, boolean loading) {
        if (!loading) {
            return;
        }
        record(stationId, taskNo);
    }
    public void record(Integer stationId, Integer taskNo) {
        Long key = buildKey(stationId, taskNo);
        if (key == null) {
            return;
        }
        long now = nowSupplier.getAsLong();
        arrivalTimeMap.put(key, now);
        cleanupIfNeeded(now);
    }
    public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
        Long key = buildKey(stationId, taskNo);
        if (key == null) {
            return false;
        }
        long now = nowSupplier.getAsLong();
        Long arrivalAt = arrivalTimeMap.get(key);
        if (arrivalAt == null) {
            cleanupIfNeeded(now);
            return false;
        }
        if (now - arrivalAt > retainMs) {
            arrivalTimeMap.remove(key, arrivalAt);
            cleanupIfNeeded(now);
            return false;
        }
        cleanupIfNeeded(now);
        return true;
    }
    private void cleanupIfNeeded(long now) {
        long lastCleanup = lastCleanupAt.get();
        if (now - lastCleanup < retainMs || !lastCleanupAt.compareAndSet(lastCleanup, now)) {
            return;
        }
        for (Map.Entry<Long, Long> entry : arrivalTimeMap.entrySet()) {
            if (entry == null || now - entry.getValue() <= retainMs) {
                continue;
            }
            arrivalTimeMap.remove(entry.getKey(), entry.getValue());
        }
    }
    private Long buildKey(Integer stationId, Integer taskNo) {
        if (stationId == null || stationId <= 0 || taskNo == null || taskNo <= 0) {
            return null;
        }
        return (((long) stationId) << 32) | (taskNo.longValue() & 0xffffffffL);
    }
}
src/test/java/com/zy/asrs/task/WrkAnalysisStationArrivalScannerTest.java
New file
@@ -0,0 +1,143 @@
package com.zy.asrs.task;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.zy.asrs.entity.BasStation;
import com.zy.asrs.entity.WrkMast;
import com.zy.asrs.service.BasStationService;
import com.zy.asrs.service.WrkAnalysisService;
import com.zy.asrs.service.WrkMastService;
import com.zy.core.cache.SlaveConnection;
import com.zy.core.enums.SlaveType;
import com.zy.core.enums.StationCommandType;
import com.zy.core.enums.WrkStsType;
import com.zy.core.model.CommandResponse;
import com.zy.core.model.command.StationCommand;
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.thread.StationThread;
import com.zy.core.utils.StationOperateProcessUtils;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class WrkAnalysisStationArrivalScannerTest {
    @Test
    void scanInboundStationArrival_completesTaskWhenRecentArrivalWasObserved() {
        WrkMastService wrkMastService = mock(WrkMastService.class);
        BasStationService basStationService = mock(BasStationService.class);
        WrkAnalysisService wrkAnalysisService = mock(WrkAnalysisService.class);
        StationOperateProcessUtils stationOperateProcessUtils = mock(StationOperateProcessUtils.class);
        WrkAnalysisStationArrivalScanner scanner = new WrkAnalysisStationArrivalScanner(
                wrkMastService,
                basStationService,
                wrkAnalysisService,
                stationOperateProcessUtils
        );
        WrkMast wrkMast = new WrkMast();
        wrkMast.setWrkNo(1001);
        wrkMast.setIoType(1);
        wrkMast.setWrkSts(WrkStsType.INBOUND_STATION_RUN.sts);
        wrkMast.setStaNo(12);
        BasStation basStation = new BasStation();
        basStation.setStationId(12);
        basStation.setDeviceNo(3);
        when(wrkMastService.list(any(QueryWrapper.class))).thenReturn(List.of(wrkMast));
        when(basStationService.getOne(any())).thenReturn(basStation);
        when(wrkAnalysisService.completeInboundStationRun(any(WrkMast.class), any(Date.class))).thenReturn(true);
        ArrivalAwareStationThread stationThread = new ArrivalAwareStationThread(true);
        StationProtocol stationProtocol = new StationProtocol();
        stationProtocol.setStationId(12);
        stationProtocol.setTaskNo(0);
        stationProtocol.setLoading(false);
        stationThread.putStatus(stationProtocol);
        SlaveConnection.put(SlaveType.Devp, 3, stationThread);
        try {
            scanner.scanInboundStationArrival();
        } finally {
            SlaveConnection.remove(SlaveType.Devp, 3);
        }
        verify(wrkAnalysisService).completeInboundStationRun(any(WrkMast.class), any(Date.class));
    }
    private static class ArrivalAwareStationThread implements StationThread {
        private final boolean recentArrival;
        private final Map<Integer, StationProtocol> statusMap = new HashMap<>();
        private ArrivalAwareStationThread(boolean recentArrival) {
            this.recentArrival = recentArrival;
        }
        private void putStatus(StationProtocol stationProtocol) {
            statusMap.put(stationProtocol.getStationId(), stationProtocol);
        }
        @Override
        public List<StationProtocol> getStatus() {
            return Collections.emptyList();
        }
        @Override
        public Map<Integer, StationProtocol> getStatusMap() {
            return statusMap;
        }
        public boolean hasRecentArrival(Integer stationId, Integer taskNo) {
            return recentArrival;
        }
        @Override
        public StationCommand getCommand(StationCommandType commandType, Integer taskNo, Integer stationId, Integer targetStationId, Integer palletSize) {
            return null;
        }
        @Override
        public boolean clearPath(Integer taskNo) {
            return false;
        }
        @Override
        public CommandResponse sendCommand(StationCommand command) {
            return null;
        }
        @Override
        public CommandResponse sendOriginCommand(String address, short[] data) {
            return null;
        }
        @Override
        public byte[] readOriginCommand(String address, int length) {
            return new byte[0];
        }
        @Override
        public void run() {
        }
        @Override
        public boolean connect() {
            return true;
        }
        @Override
        public void close() {
        }
    }
}
src/test/java/com/zy/core/thread/support/RecentStationArrivalTrackerTest.java
New file
@@ -0,0 +1,25 @@
package com.zy.core.thread.support;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class RecentStationArrivalTrackerTest {
    @Test
    void hasRecentArrival_returnsTrueOnlyWithinRetentionWindow() {
        AtomicLong now = new AtomicLong(1_000L);
        RecentStationArrivalTracker tracker = new RecentStationArrivalTracker(1_000L, now::get);
        tracker.record(12, 1001);
        assertTrue(tracker.hasRecentArrival(12, 1001));
        now.addAndGet(1_001L);
        assertFalse(tracker.hasRecentArrival(12, 1001));
    }
}