From b8640dc78123f4be2483feed2f48b9183983f51f Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期一, 13 四月 2026 14:11:54 +0800
Subject: [PATCH] #站点运行优化
---
src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java | 160 ++++++----
src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java | 34 ++
src/main/java/com/zy/core/thread/support/StationTaskLocationRegistry.java | 158 ++++++++++
src/main/java/com/zy/core/thread/impl/v5/StationV5RuntimeConfigProvider.java | 216 ++++++++++++++
src/main/java/com/zy/common/utils/RedisUtil.java | 41 ++
src/main/java/com/zy/core/network/ZyStationConnectDriver.java | 45 ++
src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java | 39 ++
src/main/java/com/zy/core/task/BasStationOptAsyncPublisher.java | 60 ++++
src/main/java/com/zy/core/enums/RedisKeyType.java | 1
src/main/java/com/zy/core/thread/impl/v5/StationV5StatusReader.java | 115 +++++--
src/main/java/com/zy/core/ServerBootstrap.java | 4
src/main/java/com/zy/core/utils/station/StationDispatchRuntimeStateSupport.java | 7
12 files changed, 762 insertions(+), 118 deletions(-)
diff --git a/src/main/java/com/zy/common/utils/RedisUtil.java b/src/main/java/com/zy/common/utils/RedisUtil.java
index a51936c..2c27c76 100644
--- a/src/main/java/com/zy/common/utils/RedisUtil.java
+++ b/src/main/java/com/zy/common/utils/RedisUtil.java
@@ -4,9 +4,11 @@
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -209,6 +211,44 @@
}
redisTemplate.execute((RedisCallback<Void>) connection -> null);
return true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public boolean trySetStringIfAbsent(String key, String value, long timeSeconds) {
+ if (key == null || value == null) {
+ return false;
+ }
+ try {
+ Boolean result;
+ if (timeSeconds > 0) {
+ result = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeSeconds, TimeUnit.SECONDS);
+ } else {
+ result = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
+ }
+ return Boolean.TRUE.equals(result);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public boolean compareAndDelete(String key, String expectedValue) {
+ if (key == null || expectedValue == null) {
+ return false;
+ }
+ try {
+ DefaultRedisScript<Long> script = new DefaultRedisScript<>();
+ script.setScriptText(
+ "if redis.call('get', KEYS[1]) == ARGV[1] then " +
+ "return redis.call('del', KEYS[1]) " +
+ "else return 0 end"
+ );
+ script.setResultType(Long.class);
+ Long result = stringRedisTemplate.execute(script, Collections.singletonList(key), expectedValue);
+ return result != null && result > 0;
} catch (Exception e) {
e.printStackTrace();
return false;
@@ -705,4 +745,3 @@
}
-
diff --git a/src/main/java/com/zy/core/ServerBootstrap.java b/src/main/java/com/zy/core/ServerBootstrap.java
index 4522b3d..8e4a67e 100644
--- a/src/main/java/com/zy/core/ServerBootstrap.java
+++ b/src/main/java/com/zy/core/ServerBootstrap.java
@@ -47,6 +47,10 @@
private void clearStartupRuntimeLocks() {
redisUtil.del(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
+ java.util.Set<String> stationSendLockKeys = redisUtil.scanKeys(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key + ":", 2048);
+ if (stationSendLockKeys != null && !stationSendLockKeys.isEmpty()) {
+ redisUtil.del(stationSendLockKeys.toArray(new String[0]));
+ }
// News.info("绯荤粺鍚姩鏃跺凡娓呯悊杈撻�佺珯鍛戒护鎵ц閿侊紝key={}", RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
}
diff --git a/src/main/java/com/zy/core/enums/RedisKeyType.java b/src/main/java/com/zy/core/enums/RedisKeyType.java
index 37609ce..d8aa37f 100644
--- a/src/main/java/com/zy/core/enums/RedisKeyType.java
+++ b/src/main/java/com/zy/core/enums/RedisKeyType.java
@@ -48,6 +48,7 @@
STATION_OUT_EXECUTE_LIMIT("station_out_execute_limit_"),
STATION_OUT_PENDING_DISPATCH_("station_out_pending_dispatch_"),
STATION_OUT_ORDER_DISPATCH_LIMIT_("station_out_order_dispatch_limit_"),
+ STATION_RUN_BLOCK_DIRECT_REASSIGN_LIMIT_("station_run_block_direct_reassign_limit_"),
STATION_OUT_EXECUTE_COMPLETE_LIMIT("station_out_execute_complete_limit_"),
CHECK_STATION_RUN_BLOCK_LIMIT_("check_station_run_block_limit_"),
STATION_RUN_BLOCK_REROUTE_STATE_("station_run_block_reroute_state_"),
diff --git a/src/main/java/com/zy/core/network/ZyStationConnectDriver.java b/src/main/java/com/zy/core/network/ZyStationConnectDriver.java
index e2fac59..c6c86f2 100644
--- a/src/main/java/com/zy/core/network/ZyStationConnectDriver.java
+++ b/src/main/java/com/zy/core/network/ZyStationConnectDriver.java
@@ -20,6 +20,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.UUID;
/**
* 杈撻�佺珯杩炴帴椹卞姩
@@ -29,8 +30,10 @@
private static final ZyStationFakeSegConnect zyStationFakeSegConnect = new ZyStationFakeSegConnect();
private static final ZyStationV4FakeSegConnect zyStationV4FakeSegConnect = new ZyStationV4FakeSegConnect();
- private static final long SEND_LOCK_WARN_MS = 3_000L;
+ private static final long SEND_LOCK_WARN_MS = 500L;
private static final long SEND_COST_WARN_MS = 5_000L;
+ private static final long SEND_LOCK_POLL_MS = 20L;
+ private static final long SEND_LOCK_EXPIRE_SECONDS = 15L;
private volatile boolean connected = false;
private volatile boolean connecting = false;
@@ -162,37 +165,39 @@
if (!connected || connecting || connectApi == null) {
return new CommandResponse(false, "璁惧鏈繛鎺ワ紝鍛戒护涓嬪彂澶辫触");
}
+ String lockKey = buildStationExecuteLockKey();
+ String lockToken = UUID.randomUUID().toString();
long lockWaitStart = System.currentTimeMillis();
int waitRounds = 0;
- while (true) {
- Object lock = redisUtil.get(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
- if(lock != null) {
+ if (redisUtil != null) {
+ while (true) {
+ if (redisUtil.trySetStringIfAbsent(lockKey, lockToken, SEND_LOCK_EXPIRE_SECONDS)) {
+ break;
+ }
waitRounds++;
try {
- Thread.sleep(500);
- }catch (Exception e) {
+ Thread.sleep(SEND_LOCK_POLL_MS);
+ } catch (Exception e) {
e.printStackTrace();
}
- }else {
- redisUtil.set(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key, "lock", 60 * 5);
- break;
}
}
long lockWaitCost = System.currentTimeMillis() - lockWaitStart;
if (lockWaitCost >= SEND_LOCK_WARN_MS) {
- log.warn("杈撻�佸懡浠ょ瓑寰呭叏灞�鍙戦�侀攣瓒呮椂锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, waitMs={}, waitRounds={}",
+ log.warn("杈撻�佸懡浠ょ瓑寰呰澶囧彂閫侀攣瓒呮椂锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, waitMs={}, waitRounds={}, lockKey={}",
deviceConfig == null ? null : deviceConfig.getDeviceNo(),
command == null ? null : command.getTaskNo(),
command == null ? null : command.getStationId(),
command == null ? null : command.getTargetStaNo(),
lockWaitCost,
- waitRounds);
+ waitRounds,
+ lockKey);
}
long sendStart = System.currentTimeMillis();
try {
return connectApi.sendCommand(deviceConfig.getDeviceNo(), command);
} finally {
- redisUtil.del(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key);
+ releaseDeviceSendLock(lockKey, lockToken);
long sendCostMs = System.currentTimeMillis() - sendStart;
if (sendCostMs >= SEND_COST_WARN_MS) {
log.warn("杈撻�佸懡浠ゅ簳灞傚彂閫佽�楁椂杩囬暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, sendCostMs={}",
@@ -205,6 +210,22 @@
}
}
+ private String buildStationExecuteLockKey() {
+ Integer deviceNo = deviceConfig == null ? null : deviceConfig.getDeviceNo();
+ return RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key + ":" + deviceNo;
+ }
+
+ private void releaseDeviceSendLock(String lockKey, String lockToken) {
+ if (redisUtil == null || lockKey == null) {
+ return;
+ }
+ try {
+ redisUtil.compareAndDelete(lockKey, lockToken);
+ } catch (Exception e) {
+ log.warn("閲婃斁杈撻�佽澶囧彂閫侀攣澶辫触锛宭ockKey={}", lockKey, e);
+ }
+ }
+
public CommandResponse sendOriginCommand(String address, short[] data) {
ZyStationConnectApi connectApi = zyStationConnectApi;
if (!connected || connecting || connectApi == null) {
diff --git a/src/main/java/com/zy/core/task/BasStationOptAsyncPublisher.java b/src/main/java/com/zy/core/task/BasStationOptAsyncPublisher.java
new file mode 100644
index 0000000..d6fcc8c
--- /dev/null
+++ b/src/main/java/com/zy/core/task/BasStationOptAsyncPublisher.java
@@ -0,0 +1,60 @@
+package com.zy.core.task;
+
+import com.zy.asrs.entity.BasStationOpt;
+import com.zy.asrs.service.BasStationOptService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+@Slf4j
+@Component
+public class BasStationOptAsyncPublisher {
+
+ private static final String LANE_NAME = "bas-station-opt-save";
+ private static final String TASK_NAME = "publish-bas-station-opt";
+ private static final long MIN_INTERVAL_MS = 0L;
+ private static final int DEFAULT_QUEUE_CAPACITY = 2048;
+
+ private final ArrayBlockingQueue<BasStationOpt> pendingQueue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
+
+ @Autowired
+ private MainProcessTaskSubmitter mainProcessTaskSubmitter;
+ @Autowired
+ private BasStationOptService basStationOptService;
+
+ public boolean publish(BasStationOpt basStationOpt) {
+ if (basStationOpt == null) {
+ return true;
+ }
+ if (!pendingQueue.offer(basStationOpt)) {
+ log.error("BasStationOpt async publish queue full, fallback to sync save, taskNo={}, stationId={}, targetStationId={}",
+ basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId());
+ return false;
+ }
+ mainProcessTaskSubmitter.submitSerialTask(LANE_NAME, TASK_NAME, MIN_INTERVAL_MS, this::drain);
+ return true;
+ }
+
+ private void drain() {
+ while (true) {
+ BasStationOpt basStationOpt = pendingQueue.poll();
+ if (basStationOpt == null) {
+ return;
+ }
+ try {
+ basStationOptService.save(basStationOpt);
+ } catch (Exception e) {
+ log.error("BasStationOpt async publish error, fallback to sync save next time, taskNo={}, stationId={}, targetStationId={}",
+ basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), e);
+ try {
+ basStationOptService.save(basStationOpt);
+ } catch (Exception ex) {
+ log.error("BasStationOpt sync fallback save error, taskNo={}, stationId={}, targetStationId={}",
+ basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), ex);
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
index 3e83c85..d563265 100644
--- a/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
+++ b/src/main/java/com/zy/core/thread/impl/ZyStationV5Thread.java
@@ -21,6 +21,8 @@
import com.zy.core.network.ZyStationConnectDriver;
import com.zy.core.network.entity.ZyStationStatusEntity;
import com.zy.core.service.StationTaskLoopService;
+import com.zy.core.task.BasStationOptAsyncPublisher;
+import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider;
import com.zy.core.thread.impl.v5.StationV5RunBlockReroutePlanner;
import com.zy.core.thread.impl.v5.StationV5SegmentExecutor;
import com.zy.core.thread.impl.v5.StationV5StatusReader;
@@ -34,39 +36,54 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread {
private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
- private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize";
+ private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512;
private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20;
private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48;
private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L;
+ private static final int QUEUE_DRAIN_BATCH_SIZE = 32;
+ private static final long QUEUE_IDLE_SLEEP_MS = 20L;
private DeviceConfig deviceConfig;
private RedisUtil redisUtil;
private ZyStationConnectDriver zyStationConnectDriver;
- private final ExecutorService executor;
+ private final ThreadPoolExecutor executor;
private StationV5SegmentExecutor segmentExecutor;
private final RecentStationArrivalTracker recentArrivalTracker;
private final StationV5StatusReader statusReader;
private final StationV5RunBlockReroutePlanner runBlockReroutePlanner;
+ private final int executorQueueCapacity;
+ private final BasStationOptAsyncPublisher basStationOptAsyncPublisher;
public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) {
this.deviceConfig = deviceConfig;
this.redisUtil = redisUtil;
- int poolSize = resolveSegmentExecutorPoolSize(redisUtil);
- this.executor = Executors.newFixedThreadPool(poolSize);
+ StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class);
+ int poolSize = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE : configProvider.getSegmentExecutorPoolSize();
+ this.executorQueueCapacity = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY : configProvider.getSegmentExecutorQueueCapacity();
+ this.executor = new ThreadPoolExecutor(
+ poolSize,
+ poolSize,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(executorQueueCapacity)
+ );
this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil);
this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand);
this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker);
this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil);
- log.info("鍒濆鍖朧5杈撻�佺嚎绋嬫睜锛宒eviceNo={}, poolSize={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize);
+ this.basStationOptAsyncPublisher = SpringUtils.getBean(BasStationOptAsyncPublisher.class);
+ log.info("鍒濆鍖朧5杈撻�佺嚎绋嬫睜锛宒eviceNo={}, poolSize={}, queueCapacity={}",
+ deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize, executorQueueCapacity);
}
@Override
@@ -89,8 +106,10 @@
Thread processThread = new Thread(() -> {
while (true) {
try {
- pollAndDispatchQueuedCommand();
- Thread.sleep(100);
+ int dispatchedCount = pollAndDispatchQueuedCommandBatch();
+ if (dispatchedCount <= 0) {
+ Thread.sleep(QUEUE_IDLE_SLEEP_MS);
+ }
} catch (Exception e) {
log.error("StationV5Process Fail", e);
}
@@ -139,38 +158,71 @@
return statusReader.getTaskNoList();
}
- private void pollAndDispatchQueuedCommand() {
- Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
- if (task == null || task.getStep() == null || task.getStep() != 2) {
- return;
+ private int pollAndDispatchQueuedCommandBatch() {
+ int dispatchedCount = 0;
+ while (dispatchedCount < QUEUE_DRAIN_BATCH_SIZE) {
+ if (isExecutorQueueAtWatermark()) {
+ logExecutorAbnormal("executor-watermark", null);
+ break;
+ }
+ Task task = MessageQueue.peek(SlaveType.Devp, deviceConfig.getDeviceNo());
+ if (task == null || task.getStep() == null || task.getStep() != 2) {
+ break;
+ }
+ StationCommand command = (StationCommand) task.getData();
+ logExecutorAbnormal("queue-peek", command);
+ if (!submitSegmentCommand(command)) {
+ logExecutorAbnormal("submit-rejected", command);
+ break;
+ }
+ MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo());
+ dispatchedCount++;
}
- StationCommand command = (StationCommand) task.getData();
- logExecutorAbnormal("queue-poll", command);
- submitSegmentCommand(command);
+ return dispatchedCount;
}
- private void submitSegmentCommand(StationCommand command) {
+ private boolean submitSegmentCommand(StationCommand command) {
if (command == null || executor == null || segmentExecutor == null) {
- return;
+ return false;
}
- executor.submit(() -> {
- long start = System.currentTimeMillis();
- try {
- segmentExecutor.execute(command);
- } finally {
- long costMs = System.currentTimeMillis() - start;
- if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
- log.warn("V5杈撻�佸懡浠ゅ垎娈垫墽琛岃�楁椂杩囬暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
- deviceConfig.getDeviceNo(),
- command.getTaskNo(),
- command.getStationId(),
- command.getTargetStaNo(),
- costMs);
- logExecutorAbnormal("segment-finish-slow", command);
+ try {
+ executor.execute(() -> {
+ long start = System.currentTimeMillis();
+ try {
+ segmentExecutor.execute(command);
+ } finally {
+ long costMs = System.currentTimeMillis() - start;
+ if (costMs >= SEGMENT_EXECUTE_WARN_MS) {
+ log.warn("V5杈撻�佸懡浠ゅ垎娈垫墽琛岃�楁椂杩囬暱锛宒eviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}",
+ deviceConfig.getDeviceNo(),
+ command.getTaskNo(),
+ command.getStationId(),
+ command.getTargetStaNo(),
+ costMs);
+ logExecutorAbnormal("segment-finish-slow", command);
+ }
}
- }
- });
+ });
+ } catch (RejectedExecutionException e) {
+ log.error("V5杈撻�佺嚎绋嬫睜鎷掔粷鎵ц锛屼繚鐣欒澶囬槦鍒楃Н鍘嬶紝deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, activeCount={}, queuedCount={}, queueCapacity={}",
+ deviceConfig.getDeviceNo(),
+ command.getTaskNo(),
+ command.getStationId(),
+ command.getTargetStaNo(),
+ executor.getActiveCount(),
+ executor.getQueue() == null ? 0 : executor.getQueue().size(),
+ executorQueueCapacity);
+ return false;
+ }
logExecutorAbnormal("after-submit", command);
+ return true;
+ }
+
+ private boolean isExecutorQueueAtWatermark() {
+ if (executor == null || executor.getQueue() == null) {
+ return false;
+ }
+ return executor.getQueue().size() >= executorQueueCapacity;
}
@Override
@@ -408,18 +460,19 @@
commandResponse != null && Boolean.TRUE.equals(commandResponse.getResult()) ? 1 : 0,
JSON.toJSONString(commandResponse)
);
- optService.save(basStationOpt);
+ if (basStationOptAsyncPublisher == null || !basStationOptAsyncPublisher.publish(basStationOpt)) {
+ optService.save(basStationOpt);
+ }
}
return commandResponse;
}
private void logExecutorAbnormal(String scene, StationCommand command) {
- if (!(executor instanceof ThreadPoolExecutor)) {
+ if (executor == null) {
return;
}
- ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
- int activeCount = threadPoolExecutor.getActiveCount();
- int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size();
+ int activeCount = executor.getActiveCount();
+ int queuedCount = executor.getQueue() == null ? 0 : executor.getQueue().size();
if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) {
return;
}
@@ -429,35 +482,10 @@
command == null ? null : command.getTaskNo(),
command == null ? null : command.getStationId(),
command == null ? null : command.getTargetStaNo(),
- threadPoolExecutor.getPoolSize(),
+ executor.getPoolSize(),
activeCount,
queuedCount,
- threadPoolExecutor.getCompletedTaskCount());
- }
-
- @SuppressWarnings("unchecked")
- private int resolveSegmentExecutorPoolSize(RedisUtil redisUtil) {
- if (redisUtil == null) {
- return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
- }
- try {
- Object systemConfigMapObj = redisUtil.get(com.zy.core.enums.RedisKeyType.SYSTEM_CONFIG_MAP.key);
- if (!(systemConfigMapObj instanceof HashMap)) {
- return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
- }
- HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
- String poolSizeText = systemConfigMap.get(CFG_SEGMENT_EXECUTOR_POOL_SIZE);
- if (poolSizeText == null || poolSizeText.trim().isEmpty()) {
- return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
- }
- int configured = Integer.parseInt(poolSizeText.trim());
- if (configured < 16) {
- return 16;
- }
- return Math.min(configured, 512);
- } catch (Exception ignore) {
- return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE;
- }
+ executor.getCompletedTaskCount());
}
@Override
diff --git a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
index 95209a6..7d81825 100644
--- a/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
+++ b/src/main/java/com/zy/core/thread/impl/station/StationSegmentExecutor.java
@@ -15,6 +15,8 @@
import com.zy.core.model.protocol.StationProtocol;
import com.zy.core.move.StationMoveCoordinator;
import com.zy.core.trace.StationTaskTraceRegistry;
+import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider;
+import com.zy.core.thread.support.StationTaskLocationRegistry;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
@@ -29,6 +31,7 @@
private static final String CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio";
private static final double DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = 0.3d;
private static final long CURRENT_STATION_TIMEOUT_MS = 1000L * 60L;
+ private static final long TASK_LOCATION_STALE_MS = 2_000L;
private final DeviceConfig deviceConfig;
private final RedisUtil redisUtil;
@@ -310,6 +313,13 @@
}
private double loadSegmentAdvanceRatio() {
+ if (isV5ThreadImpl()) {
+ StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class);
+ if (configProvider != null) {
+ return configProvider.getSegmentAdvanceRatio();
+ }
+ return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
+ }
try {
ConfigService configService = SpringUtils.getBean(ConfigService.class);
if (configService == null) {
@@ -363,6 +373,9 @@
}
private StationProtocol findCurrentStationByTask(Integer taskNo) {
+ if (isV5ThreadImpl()) {
+ return findCurrentStationByTaskFromRegistry(taskNo);
+ }
try {
com.zy.asrs.service.DeviceConfigService deviceConfigService = SpringUtils.getBean(com.zy.asrs.service.DeviceConfigService.class);
if (deviceConfigService == null) {
@@ -391,6 +404,27 @@
return null;
}
+ private StationProtocol findCurrentStationByTaskFromRegistry(Integer taskNo) {
+ StationTaskLocationRegistry registry = SpringUtils.getBean(StationTaskLocationRegistry.class);
+ if (registry == null) {
+ return null;
+ }
+ StationTaskLocationRegistry.TaskLocationSnapshot snapshot = registry.findActive(taskNo, TASK_LOCATION_STALE_MS);
+ if (snapshot == null || !snapshot.isLoading()) {
+ return null;
+ }
+ StationProtocol stationProtocol = new StationProtocol();
+ stationProtocol.setTaskNo(snapshot.getTaskNo());
+ stationProtocol.setStationId(snapshot.getStationId());
+ stationProtocol.setRunBlock(snapshot.isRunBlock());
+ stationProtocol.setLoading(true);
+ return stationProtocol;
+ }
+
+ private boolean isV5ThreadImpl() {
+ return deviceConfig != null && "ZyStationV5Thread".equals(deviceConfig.getThreadImpl());
+ }
+
private List<StationTaskTraceSegmentVo> buildTraceSegments(List<StationCommand> segmentCommands) {
List<StationTaskTraceSegmentVo> result = new ArrayList<>();
if (segmentCommands == null) {
diff --git a/src/main/java/com/zy/core/thread/impl/v5/StationV5RuntimeConfigProvider.java b/src/main/java/com/zy/core/thread/impl/v5/StationV5RuntimeConfigProvider.java
new file mode 100644
index 0000000..f36129f
--- /dev/null
+++ b/src/main/java/com/zy/core/thread/impl/v5/StationV5RuntimeConfigProvider.java
@@ -0,0 +1,216 @@
+package com.zy.core.thread.impl.v5;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.core.common.Cools;
+import com.zy.common.utils.RedisUtil;
+import com.zy.core.enums.RedisKeyType;
+import com.zy.system.entity.Config;
+import com.zy.system.service.ConfigService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+
+@Slf4j
+@Component
+public class StationV5RuntimeConfigProvider {
+
+ private static final String CFG_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio";
+ private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize";
+ private static final String CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY = "stationV5SegmentExecutorQueueCapacity";
+ private static final String CFG_CONFIG_REFRESH_SECONDS = "stationCommandConfigRefreshSeconds";
+
+ private static final double DEFAULT_SEGMENT_ADVANCE_RATIO = 0.3d;
+ private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
+ private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512;
+ private static final int DEFAULT_CONFIG_REFRESH_SECONDS = 30;
+
+ @Autowired(required = false)
+ private RedisUtil redisUtil;
+ @Autowired(required = false)
+ private ConfigService configService;
+
+ private volatile CacheSnapshot cacheSnapshot = new CacheSnapshot(
+ DEFAULT_SEGMENT_ADVANCE_RATIO,
+ DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE,
+ DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY,
+ DEFAULT_CONFIG_REFRESH_SECONDS,
+ 0L
+ );
+
+ public double getSegmentAdvanceRatio() {
+ return loadSnapshot().segmentAdvanceRatio;
+ }
+
+ public int getSegmentExecutorPoolSize() {
+ return loadSnapshot().segmentExecutorPoolSize;
+ }
+
+ public int getSegmentExecutorQueueCapacity() {
+ return loadSnapshot().segmentExecutorQueueCapacity;
+ }
+
+ private CacheSnapshot loadSnapshot() {
+ CacheSnapshot snapshot = cacheSnapshot;
+ long now = System.currentTimeMillis();
+ if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) {
+ return snapshot;
+ }
+ synchronized (this) {
+ snapshot = cacheSnapshot;
+ now = System.currentTimeMillis();
+ if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) {
+ return snapshot;
+ }
+ CacheSnapshot refreshed = refreshSnapshot(now);
+ cacheSnapshot = refreshed;
+ return refreshed;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private CacheSnapshot refreshSnapshot(long now) {
+ HashMap<String, String> systemConfigMap = null;
+ try {
+ Object systemConfigMapObj = redisUtil == null ? null : redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
+ if (systemConfigMapObj instanceof HashMap) {
+ systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
+ }
+ } catch (Exception e) {
+ log.warn("鍔犺浇 V5 杈撻�佽繍琛屾椂閰嶇疆缂撳瓨澶辫触锛宖allback to db. reason=redis-read-error", e);
+ }
+
+ int refreshSeconds = normalizeRefreshSeconds(readConfigText(systemConfigMap, CFG_CONFIG_REFRESH_SECONDS));
+ double segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigText(systemConfigMap, CFG_SEGMENT_ADVANCE_RATIO));
+ int poolSize = normalizePoolSize(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_POOL_SIZE));
+ int queueCapacity = normalizeQueueCapacity(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY));
+
+ if (systemConfigMap == null) {
+ segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigTextFromDb(CFG_SEGMENT_ADVANCE_RATIO), segmentAdvanceRatio);
+ poolSize = normalizePoolSize(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_POOL_SIZE), poolSize);
+ queueCapacity = normalizeQueueCapacity(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY), queueCapacity);
+ refreshSeconds = normalizeRefreshSeconds(readConfigTextFromDb(CFG_CONFIG_REFRESH_SECONDS), refreshSeconds);
+ }
+ return new CacheSnapshot(segmentAdvanceRatio, poolSize, queueCapacity, refreshSeconds, now);
+ }
+
+ private String readConfigText(HashMap<String, String> systemConfigMap, String code) {
+ if (systemConfigMap == null || code == null) {
+ return null;
+ }
+ return systemConfigMap.get(code);
+ }
+
+ private String readConfigTextFromDb(String code) {
+ if (configService == null || code == null) {
+ return null;
+ }
+ try {
+ Config config = configService.getOne(new QueryWrapper<Config>().eq("code", code));
+ return config == null ? null : config.getValue();
+ } catch (Exception e) {
+ log.warn("鍔犺浇 V5 杈撻�佽繍琛屾椂閰嶇疆鏁版嵁搴撳け璐ワ紝code={}", code, e);
+ return null;
+ }
+ }
+
+ private double normalizeSegmentAdvanceRatio(String valueText) {
+ return normalizeSegmentAdvanceRatio(valueText, DEFAULT_SEGMENT_ADVANCE_RATIO);
+ }
+
+ private double normalizeSegmentAdvanceRatio(String valueText, double defaultValue) {
+ if (valueText == null) {
+ return defaultValue;
+ }
+ String text = valueText.trim();
+ if (text.isEmpty()) {
+ return defaultValue;
+ }
+ if (text.endsWith("%")) {
+ text = text.substring(0, text.length() - 1).trim();
+ }
+ try {
+ double ratio = Double.parseDouble(text);
+ if (ratio > 1d && ratio <= 100d) {
+ ratio = ratio / 100d;
+ }
+ if (ratio < 0d) {
+ return 0d;
+ }
+ if (ratio > 1d) {
+ return 1d;
+ }
+ return ratio;
+ } catch (Exception ignore) {
+ return defaultValue;
+ }
+ }
+
+ private int normalizePoolSize(String valueText) {
+ return normalizePoolSize(valueText, DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE);
+ }
+
+ private int normalizePoolSize(String valueText, int defaultValue) {
+ int configured = parsePositiveInt(valueText, defaultValue);
+ if (configured < 16) {
+ return 16;
+ }
+ return Math.min(configured, 512);
+ }
+
+ private int normalizeQueueCapacity(String valueText) {
+ return normalizeQueueCapacity(valueText, DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY);
+ }
+
+ private int normalizeQueueCapacity(String valueText, int defaultValue) {
+ int configured = parsePositiveInt(valueText, defaultValue);
+ if (configured < 64) {
+ return 64;
+ }
+ return Math.min(configured, 4096);
+ }
+
+ private int normalizeRefreshSeconds(String valueText) {
+ return normalizeRefreshSeconds(valueText, DEFAULT_CONFIG_REFRESH_SECONDS);
+ }
+
+ private int normalizeRefreshSeconds(String valueText, int defaultValue) {
+ int configured = parsePositiveInt(valueText, defaultValue);
+ if (configured < 5) {
+ return 5;
+ }
+ return Math.min(configured, 300);
+ }
+
+ private int parsePositiveInt(String valueText, int defaultValue) {
+ if (Cools.isEmpty(valueText)) {
+ return defaultValue;
+ }
+ try {
+ return Integer.parseInt(valueText.trim());
+ } catch (Exception ignore) {
+ return defaultValue;
+ }
+ }
+
+ private static class CacheSnapshot {
+ private final double segmentAdvanceRatio;
+ private final int segmentExecutorPoolSize;
+ private final int segmentExecutorQueueCapacity;
+ private final int refreshSeconds;
+ private final long loadedAtMs;
+
+ private CacheSnapshot(double segmentAdvanceRatio,
+ int segmentExecutorPoolSize,
+ int segmentExecutorQueueCapacity,
+ int refreshSeconds,
+ long loadedAtMs) {
+ this.segmentAdvanceRatio = segmentAdvanceRatio;
+ this.segmentExecutorPoolSize = segmentExecutorPoolSize;
+ this.segmentExecutorQueueCapacity = segmentExecutorQueueCapacity;
+ this.refreshSeconds = refreshSeconds;
+ this.loadedAtMs = loadedAtMs;
+ }
+ }
+}
diff --git a/src/main/java/com/zy/core/thread/impl/v5/StationV5StatusReader.java b/src/main/java/com/zy/core/thread/impl/v5/StationV5StatusReader.java
index a0e7e35..05fa8d2 100644
--- a/src/main/java/com/zy/core/thread/impl/v5/StationV5StatusReader.java
+++ b/src/main/java/com/zy/core/thread/impl/v5/StationV5StatusReader.java
@@ -20,6 +20,7 @@
import com.zy.core.task.DeviceAsyncLogPublisher;
import com.zy.core.thread.support.RecentStationArrivalTracker;
import com.zy.core.thread.support.StationErrLogSupport;
+import com.zy.core.thread.support.StationTaskLocationRegistry;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -27,6 +28,7 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
public class StationV5StatusReader {
@@ -34,7 +36,9 @@
private final RedisUtil redisUtil;
private final RecentStationArrivalTracker recentArrivalTracker;
private final DeviceAsyncLogPublisher devpAsyncLogPublisher;
+ private final StationTaskLocationRegistry stationTaskLocationRegistry;
private final List<StationProtocol> statusList = new ArrayList<>();
+ private final Map<Integer, StationProtocol> statusMap = new HashMap<>();
private volatile List<Integer> taskNoList = new ArrayList<>();
private boolean initialized = false;
private long deviceDataLogTime = System.currentTimeMillis();
@@ -46,6 +50,7 @@
this.redisUtil = redisUtil;
this.recentArrivalTracker = recentArrivalTracker;
this.devpAsyncLogPublisher = SpringUtils.getBean(DeviceAsyncLogPublisher.class);
+ this.stationTaskLocationRegistry = SpringUtils.getBean(StationTaskLocationRegistry.class);
}
public void readStatus(ZyStationConnectDriver zyStationConnectDriver) {
@@ -74,57 +79,72 @@
StationProtocol stationProtocol = new StationProtocol();
stationProtocol.setStationId(entity.getStationId());
statusList.add(stationProtocol);
+ statusMap.put(entity.getStationId(), stationProtocol);
}
initialized = true;
}
int deviceLogCollectTime = initialized ? Utils.getDeviceLogCollectTime() : 200;
List<ZyStationStatusEntity> zyStationStatusEntities = zyStationConnectDriver.getStatus();
+ if (zyStationStatusEntities == null || zyStationStatusEntities.isEmpty()) {
+ return;
+ }
LinkedHashSet<Integer> taskNoSet = new LinkedHashSet<>();
+ LinkedHashSet<Integer> loadingTaskNoSet = new LinkedHashSet<>();
+ long observeAt = System.currentTimeMillis();
for (ZyStationStatusEntity statusEntity : zyStationStatusEntities) {
- for (StationProtocol stationProtocol : statusList) {
- if (stationProtocol.getStationId().equals(statusEntity.getStationId())) {
- stationProtocol.setTaskNo(statusEntity.getTaskNo());
- stationProtocol.setTargetStaNo(statusEntity.getTargetStaNo());
- stationProtocol.setAutoing(statusEntity.isAutoing());
- stationProtocol.setLoading(statusEntity.isLoading());
- stationProtocol.setInEnable(statusEntity.isInEnable());
- stationProtocol.setOutEnable(statusEntity.isOutEnable());
- stationProtocol.setEmptyMk(statusEntity.isEmptyMk());
- stationProtocol.setFullPlt(statusEntity.isFullPlt());
- stationProtocol.setPalletHeight(statusEntity.getPalletHeight());
- stationProtocol.setError(statusEntity.getError());
- stationProtocol.setErrorMsg(statusEntity.getErrorMsg());
- stationProtocol.setBarcode(statusEntity.getBarcode());
- stationProtocol.setRunBlock(statusEntity.isRunBlock());
- stationProtocol.setEnableIn(statusEntity.isEnableIn());
- stationProtocol.setWeight(statusEntity.getWeight());
- stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
- stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
- stationProtocol.setIoMode(statusEntity.getIoMode());
- stationProtocol.setInBarcodeError(statusEntity.isInBarcodeError());
- if (statusEntity.getTaskNo() != null && statusEntity.getTaskNo() > 0) {
- taskNoSet.add(statusEntity.getTaskNo());
- }
- if (statusEntity.getTaskBufferItems() != null) {
- statusEntity.getTaskBufferItems().forEach(item -> {
- Integer bufferTaskNo = item == null ? null : item.getTaskNo();
- if (bufferTaskNo != null && bufferTaskNo > 0) {
- taskNoSet.add(bufferTaskNo);
- }
- });
- }
- recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
+ if (statusEntity == null || statusEntity.getStationId() == null) {
+ continue;
+ }
+ StationProtocol stationProtocol = statusMap.get(statusEntity.getStationId());
+ if (stationProtocol == null) {
+ continue;
+ }
+ stationProtocol.setTaskNo(statusEntity.getTaskNo());
+ stationProtocol.setTargetStaNo(statusEntity.getTargetStaNo());
+ stationProtocol.setAutoing(statusEntity.isAutoing());
+ stationProtocol.setLoading(statusEntity.isLoading());
+ stationProtocol.setInEnable(statusEntity.isInEnable());
+ stationProtocol.setOutEnable(statusEntity.isOutEnable());
+ stationProtocol.setEmptyMk(statusEntity.isEmptyMk());
+ stationProtocol.setFullPlt(statusEntity.isFullPlt());
+ stationProtocol.setPalletHeight(statusEntity.getPalletHeight());
+ stationProtocol.setError(statusEntity.getError());
+ stationProtocol.setErrorMsg(statusEntity.getErrorMsg());
+ stationProtocol.setBarcode(statusEntity.getBarcode());
+ stationProtocol.setRunBlock(statusEntity.isRunBlock());
+ stationProtocol.setEnableIn(statusEntity.isEnableIn());
+ stationProtocol.setWeight(statusEntity.getWeight());
+ stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx());
+ stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems());
+ stationProtocol.setIoMode(statusEntity.getIoMode());
+ stationProtocol.setInBarcodeError(statusEntity.isInBarcodeError());
+ if (statusEntity.getTaskNo() != null && statusEntity.getTaskNo() > 0) {
+ taskNoSet.add(statusEntity.getTaskNo());
+ if (statusEntity.isLoading()) {
+ loadingTaskNoSet.add(statusEntity.getTaskNo());
}
-
- if (!Cools.isEmpty(stationProtocol.getSystemWarning())) {
- if (stationProtocol.isAutoing() && !stationProtocol.isLoading()) {
- stationProtocol.setSystemWarning("");
+ }
+ if (statusEntity.getTaskBufferItems() != null) {
+ statusEntity.getTaskBufferItems().forEach(item -> {
+ Integer bufferTaskNo = item == null ? null : item.getTaskNo();
+ if (bufferTaskNo != null && bufferTaskNo > 0) {
+ taskNoSet.add(bufferTaskNo);
}
- }
+ });
+ }
+ recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading());
+ syncTaskLocation(statusEntity, observeAt);
+ if (!Cools.isEmpty(stationProtocol.getSystemWarning())
+ && stationProtocol.isAutoing()
+ && !stationProtocol.isLoading()) {
+ stationProtocol.setSystemWarning("");
}
}
taskNoList = new ArrayList<>(taskNoSet);
+ if (stationTaskLocationRegistry != null) {
+ stationTaskLocationRegistry.cleanupByDevice(deviceConfig.getDeviceNo(), loadingTaskNoSet);
+ }
OutputQueue.DEVP.offer(MessageFormat.format("銆恵0}銆慬id:{1}] <<<<< 瀹炴椂鏁版嵁鏇存柊鎴愬姛",
DateUtils.convert(new Date()), deviceConfig.getDeviceNo()));
@@ -173,4 +193,23 @@
public List<Integer> getTaskNoList() {
return taskNoList;
}
+
+ private void syncTaskLocation(ZyStationStatusEntity statusEntity, long observeAt) {
+ if (stationTaskLocationRegistry == null || statusEntity == null) {
+ return;
+ }
+ Integer taskNo = statusEntity.getTaskNo();
+ if (taskNo != null && taskNo > 0 && statusEntity.isLoading()) {
+ stationTaskLocationRegistry.update(
+ taskNo,
+ deviceConfig.getDeviceNo(),
+ statusEntity.getStationId(),
+ true,
+ statusEntity.isRunBlock(),
+ observeAt
+ );
+ return;
+ }
+ stationTaskLocationRegistry.remove(taskNo, deviceConfig.getDeviceNo(), statusEntity.getStationId());
+ }
}
diff --git a/src/main/java/com/zy/core/thread/support/StationTaskLocationRegistry.java b/src/main/java/com/zy/core/thread/support/StationTaskLocationRegistry.java
new file mode 100644
index 0000000..d90c6f2
--- /dev/null
+++ b/src/main/java/com/zy/core/thread/support/StationTaskLocationRegistry.java
@@ -0,0 +1,158 @@
+package com.zy.core.thread.support;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+@Component
+public class StationTaskLocationRegistry {
+
+ private static final long DEFAULT_STALE_THRESHOLD_MS = 2_000L;
+ private static final long LOOKUP_WARN_INTERVAL_MS = 5_000L;
+
+ private final ConcurrentHashMap<Integer, TaskLocationSnapshot> loadingTaskLocationMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Integer, Long> lookupWarnAtMap = new ConcurrentHashMap<>();
+
+ public void update(Integer taskNo,
+ Integer deviceNo,
+ Integer stationId,
+ boolean loading,
+ boolean runBlock,
+ long updateTime) {
+ if (taskNo == null || taskNo <= 0) {
+ return;
+ }
+ if (!loading || deviceNo == null || stationId == null) {
+ remove(taskNo, null, null);
+ return;
+ }
+ loadingTaskLocationMap.put(taskNo, new TaskLocationSnapshot(taskNo, deviceNo, stationId, true, runBlock, updateTime));
+ }
+
+ public void remove(Integer taskNo, Integer deviceNo, Integer stationId) {
+ if (taskNo == null || taskNo <= 0) {
+ return;
+ }
+ loadingTaskLocationMap.computeIfPresent(taskNo, (key, snapshot) -> {
+ if (snapshot == null) {
+ return null;
+ }
+ if (deviceNo != null && !Objects.equals(deviceNo, snapshot.getDeviceNo())) {
+ return snapshot;
+ }
+ if (stationId != null && !Objects.equals(stationId, snapshot.getStationId())) {
+ return snapshot;
+ }
+ return null;
+ });
+ }
+
+ public TaskLocationSnapshot findActive(Integer taskNo) {
+ return findActive(taskNo, DEFAULT_STALE_THRESHOLD_MS);
+ }
+
+ public TaskLocationSnapshot findActive(Integer taskNo, long staleThresholdMs) {
+ if (taskNo == null || taskNo <= 0) {
+ return null;
+ }
+ TaskLocationSnapshot snapshot = loadingTaskLocationMap.get(taskNo);
+ if (snapshot == null) {
+ warnLookup(taskNo, "miss", null);
+ return null;
+ }
+ long ageMs = Math.max(0L, System.currentTimeMillis() - snapshot.getUpdateTime());
+ if (ageMs > staleThresholdMs) {
+ warnLookup(taskNo, "stale", ageMs);
+ return null;
+ }
+ return snapshot;
+ }
+
+ public void cleanupByDevice(Integer deviceNo, Iterable<Integer> activeTaskNoList) {
+ if (deviceNo == null) {
+ return;
+ }
+ ConcurrentHashMap<Integer, Boolean> activeMap = new ConcurrentHashMap<>();
+ if (activeTaskNoList != null) {
+ for (Integer taskNo : activeTaskNoList) {
+ if (taskNo != null && taskNo > 0) {
+ activeMap.put(taskNo, Boolean.TRUE);
+ }
+ }
+ }
+ for (Map.Entry<Integer, TaskLocationSnapshot> entry : loadingTaskLocationMap.entrySet()) {
+ TaskLocationSnapshot snapshot = entry.getValue();
+ if (snapshot == null || !Objects.equals(deviceNo, snapshot.getDeviceNo())) {
+ continue;
+ }
+ if (!activeMap.containsKey(entry.getKey())) {
+ loadingTaskLocationMap.remove(entry.getKey(), snapshot);
+ }
+ }
+ }
+
+ private void warnLookup(Integer taskNo, String reason, Long ageMs) {
+ long now = System.currentTimeMillis();
+ Long lastWarnAt = lookupWarnAtMap.get(taskNo);
+ if (lastWarnAt != null && now - lastWarnAt < LOOKUP_WARN_INTERVAL_MS) {
+ return;
+ }
+ lookupWarnAtMap.put(taskNo, now);
+ if (ageMs == null) {
+ log.warn("task-location-registry miss, taskNo={}", taskNo);
+ return;
+ }
+ log.warn("task-location-registry stale, taskNo={}, ageMs={}", taskNo, ageMs);
+ }
+
+ public static class TaskLocationSnapshot {
+ private final Integer taskNo;
+ private final Integer deviceNo;
+ private final Integer stationId;
+ private final boolean loading;
+ private final boolean runBlock;
+ private final long updateTime;
+
+ public TaskLocationSnapshot(Integer taskNo,
+ Integer deviceNo,
+ Integer stationId,
+ boolean loading,
+ boolean runBlock,
+ long updateTime) {
+ this.taskNo = taskNo;
+ this.deviceNo = deviceNo;
+ this.stationId = stationId;
+ this.loading = loading;
+ this.runBlock = runBlock;
+ this.updateTime = updateTime;
+ }
+
+ public Integer getTaskNo() {
+ return taskNo;
+ }
+
+ public Integer getDeviceNo() {
+ return deviceNo;
+ }
+
+ public Integer getStationId() {
+ return stationId;
+ }
+
+ public boolean isLoading() {
+ return loading;
+ }
+
+ public boolean isRunBlock() {
+ return runBlock;
+ }
+
+ public long getUpdateTime() {
+ return updateTime;
+ }
+ }
+}
diff --git a/src/main/java/com/zy/core/utils/station/StationDispatchRuntimeStateSupport.java b/src/main/java/com/zy/core/utils/station/StationDispatchRuntimeStateSupport.java
index 7328225..5356fd2 100644
--- a/src/main/java/com/zy/core/utils/station/StationDispatchRuntimeStateSupport.java
+++ b/src/main/java/com/zy/core/utils/station/StationDispatchRuntimeStateSupport.java
@@ -137,6 +137,13 @@
return tryAcquireLock(RedisKeyType.STATION_OUT_ORDER_DISPATCH_LIMIT_.key + wrkNo + "_" + stationId, seconds);
}
+ public boolean tryAcquireRunBlockDirectReassignLock(Integer wrkNo, Integer stationId, int seconds) {
+ if (wrkNo == null || wrkNo <= 0 || stationId == null) {
+ return true;
+ }
+ return tryAcquireLock(RedisKeyType.STATION_RUN_BLOCK_DIRECT_REASSIGN_LIMIT_.key + wrkNo + "_" + stationId, seconds);
+ }
+
public void signalSegmentReset(Integer taskNo, long waitMs) {
if (redisUtil == null || taskNo == null || taskNo <= 0) {
return;
diff --git a/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java b/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
index b3f358d..3cf4495 100644
--- a/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
+++ b/src/main/java/com/zy/core/utils/station/StationRerouteProcessor.java
@@ -51,6 +51,7 @@
public class StationRerouteProcessor {
private static final int OUT_ORDER_DISPATCH_LIMIT_SECONDS = 2;
private static final long STATION_MOVE_RESET_WAIT_MS = 1000L;
+ private static final int RUN_BLOCK_DIRECT_REASSIGN_LIMIT_SECONDS = 15 * 60;
@Autowired
private BasDevpService basDevpService;
@@ -447,15 +448,26 @@
if (basDevp == null || stationThread == null || stationProtocol == null || wrkMast == null) {
return;
}
+ if (isDirectReassignContextStale(stationProtocol, wrkMast)) {
+ return;
+ }
int currentTaskBufferCommandCount = countCurrentTaskBufferCommands(
stationProtocol.getTaskBufferItems(),
stationProtocol.getTaskNo()
);
if (currentTaskBufferCommandCount > 0) {
- News.info("杈撻�佺珯鐐硅繍琛屽牭濉為噸鍒嗛厤宸茶烦杩囷紝缂撳瓨鍖轰粛瀛樺湪褰撳墠浠诲姟鍛戒护銆傜珯鐐瑰彿={}锛屽伐浣滃彿={}锛屽綋鍓嶄换鍔″懡浠ゆ暟={}",
+ News.info("杈撻�佺珯鐐硅繍琛屽牭濉為噸鍒嗛厤妫�娴嬪埌鏃у垎娈靛懡浠ゆ畫鐣欙紝灏嗗厛閲嶇疆鏈湴鍒嗘鐘舵�佸悗缁х画閲嶅彂銆傜珯鐐瑰彿={}锛屽伐浣滃彿={}锛屽綋鍓嶄换鍔″懡浠ゆ暟={}",
stationProtocol.getStationId(),
stationProtocol.getTaskNo(),
currentTaskBufferCommandCount);
+ }
+ if (!stationDispatchRuntimeStateSupport.tryAcquireRunBlockDirectReassignLock(
+ wrkMast.getWrkNo(),
+ stationProtocol.getStationId(),
+ RUN_BLOCK_DIRECT_REASSIGN_LIMIT_SECONDS)) {
+ News.info("杈撻�佺珯鐐硅繍琛屽牭濉為噸鍒嗛厤宸茶烦杩囷紝15鍒嗛挓鍐呬笉鍏佽閲嶅鐢宠銆傜珯鐐瑰彿={}锛屽伐浣滃彿={}",
+ stationProtocol.getStationId(),
+ wrkMast.getWrkNo());
return;
}
String response = wmsOperateUtils.applyReassignTaskLocNo(wrkMast.getWrkNo(), stationProtocol.getStationId());
@@ -534,6 +546,7 @@
if (!wrkMastService.updateById(wrkMast)) {
return;
}
+ stationDispatchRuntimeStateSupport.signalSegmentReset(wrkMast.getWrkNo(), STATION_MOVE_RESET_WAIT_MS);
boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "checkStationRunBlock_direct");
if (!offered) {
return;
@@ -551,6 +564,30 @@
}
}
+ private boolean isDirectReassignContextStale(StationProtocol stationProtocol, WrkMast wrkMast) {
+ if (stationProtocol == null || wrkMast == null || stationMoveCoordinator == null) {
+ return false;
+ }
+ Integer taskNo = wrkMast.getWrkNo();
+ Integer triggerStationId = stationProtocol.getStationId();
+ if (taskNo == null || taskNo <= 0 || triggerStationId == null) {
+ return false;
+ }
+ StationMoveSession session = stationMoveCoordinator.loadSession(taskNo);
+ if (session == null || !session.isActive()) {
+ return false;
+ }
+ Integer currentStationId = session.getCurrentStationId();
+ if (currentStationId == null || Objects.equals(currentStationId, triggerStationId)) {
+ return false;
+ }
+ News.info("杈撻�佺珯鐐硅繍琛屽牭濉為噸鍒嗛厤宸茶烦杩囷紝浠诲姟宸茬寮�瑙﹀彂绔欑偣銆傝Е鍙戠珯鐐�={}锛屽綋鍓嶇珯鐐�={}锛屽伐浣滃彿={}",
+ triggerStationId,
+ currentStationId,
+ taskNo);
+ return true;
+ }
+
private int countCurrentTaskBufferCommands(List<StationTaskBufferItem> taskBufferItems, Integer currentTaskNo) {
if (taskBufferItems == null || taskBufferItems.isEmpty() || currentTaskNo == null || currentTaskNo <= 0) {
return 0;
--
Gitblit v1.9.1