| | |
| | | import org.springframework.data.redis.core.RedisCallback; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.data.redis.core.StringRedisTemplate; |
| | | import org.springframework.data.redis.core.script.DefaultRedisScript; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.CollectionUtils; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | |
| | | } |
| | | redisTemplate.execute((RedisCallback<Void>) connection -> null); |
| | | return true; |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | public boolean trySetStringIfAbsent(String key, String value, long timeSeconds) { |
| | | if (key == null || value == null) { |
| | | return false; |
| | | } |
| | | try { |
| | | Boolean result; |
| | | if (timeSeconds > 0) { |
| | | result = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeSeconds, TimeUnit.SECONDS); |
| | | } else { |
| | | result = stringRedisTemplate.opsForValue().setIfAbsent(key, value); |
| | | } |
| | | return Boolean.TRUE.equals(result); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | public boolean compareAndDelete(String key, String expectedValue) { |
| | | if (key == null || expectedValue == null) { |
| | | return false; |
| | | } |
| | | try { |
| | | DefaultRedisScript<Long> script = new DefaultRedisScript<>(); |
| | | script.setScriptText( |
| | | "if redis.call('get', KEYS[1]) == ARGV[1] then " + |
| | | "return redis.call('del', KEYS[1]) " + |
| | | "else return 0 end" |
| | | ); |
| | | script.setResultType(Long.class); |
| | | Long result = stringRedisTemplate.execute(script, Collections.singletonList(key), expectedValue); |
| | | return result != null && result > 0; |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | return false; |
| | |
| | | |
| | | |
| | | } |
| | | |
| | |
| | | |
| | | private void clearStartupRuntimeLocks() { |
| | | redisUtil.del(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key); |
| | | java.util.Set<String> stationSendLockKeys = redisUtil.scanKeys(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key + ":", 2048); |
| | | if (stationSendLockKeys != null && !stationSendLockKeys.isEmpty()) { |
| | | redisUtil.del(stationSendLockKeys.toArray(new String[0])); |
| | | } |
| | | // News.info("系统启动时已清理输送站命令执行锁,key={}", RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key); |
| | | } |
| | | |
| | |
| | | STATION_OUT_EXECUTE_LIMIT("station_out_execute_limit_"), |
| | | STATION_OUT_PENDING_DISPATCH_("station_out_pending_dispatch_"), |
| | | STATION_OUT_ORDER_DISPATCH_LIMIT_("station_out_order_dispatch_limit_"), |
| | | STATION_RUN_BLOCK_DIRECT_REASSIGN_LIMIT_("station_run_block_direct_reassign_limit_"), |
| | | STATION_OUT_EXECUTE_COMPLETE_LIMIT("station_out_execute_complete_limit_"), |
| | | CHECK_STATION_RUN_BLOCK_LIMIT_("check_station_run_block_limit_"), |
| | | STATION_RUN_BLOCK_REROUTE_STATE_("station_run_block_reroute_state_"), |
| | |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ThreadFactory; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.UUID; |
| | | |
| | | /** |
| | | * 输送站连接驱动 |
| | |
| | | |
| | | private static final ZyStationFakeSegConnect zyStationFakeSegConnect = new ZyStationFakeSegConnect(); |
| | | private static final ZyStationV4FakeSegConnect zyStationV4FakeSegConnect = new ZyStationV4FakeSegConnect(); |
| | | private static final long SEND_LOCK_WARN_MS = 3_000L; |
| | | private static final long SEND_LOCK_WARN_MS = 500L; |
| | | private static final long SEND_COST_WARN_MS = 5_000L; |
| | | private static final long SEND_LOCK_POLL_MS = 20L; |
| | | private static final long SEND_LOCK_EXPIRE_SECONDS = 15L; |
| | | |
| | | private volatile boolean connected = false; |
| | | private volatile boolean connecting = false; |
| | |
| | | if (!connected || connecting || connectApi == null) { |
| | | return new CommandResponse(false, "设备未连接,命令下发失败"); |
| | | } |
| | | String lockKey = buildStationExecuteLockKey(); |
| | | String lockToken = UUID.randomUUID().toString(); |
| | | long lockWaitStart = System.currentTimeMillis(); |
| | | int waitRounds = 0; |
| | | while (true) { |
| | | Object lock = redisUtil.get(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key); |
| | | if(lock != null) { |
| | | if (redisUtil != null) { |
| | | while (true) { |
| | | if (redisUtil.trySetStringIfAbsent(lockKey, lockToken, SEND_LOCK_EXPIRE_SECONDS)) { |
| | | break; |
| | | } |
| | | waitRounds++; |
| | | try { |
| | | Thread.sleep(500); |
| | | }catch (Exception e) { |
| | | Thread.sleep(SEND_LOCK_POLL_MS); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }else { |
| | | redisUtil.set(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key, "lock", 60 * 5); |
| | | break; |
| | | } |
| | | } |
| | | long lockWaitCost = System.currentTimeMillis() - lockWaitStart; |
| | | if (lockWaitCost >= SEND_LOCK_WARN_MS) { |
| | | log.warn("输送命令等待全局发送锁超时,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, waitMs={}, waitRounds={}", |
| | | log.warn("输送命令等待设备发送锁超时,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, waitMs={}, waitRounds={}, lockKey={}", |
| | | deviceConfig == null ? null : deviceConfig.getDeviceNo(), |
| | | command == null ? null : command.getTaskNo(), |
| | | command == null ? null : command.getStationId(), |
| | | command == null ? null : command.getTargetStaNo(), |
| | | lockWaitCost, |
| | | waitRounds); |
| | | waitRounds, |
| | | lockKey); |
| | | } |
| | | long sendStart = System.currentTimeMillis(); |
| | | try { |
| | | return connectApi.sendCommand(deviceConfig.getDeviceNo(), command); |
| | | } finally { |
| | | redisUtil.del(RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key); |
| | | releaseDeviceSendLock(lockKey, lockToken); |
| | | long sendCostMs = System.currentTimeMillis() - sendStart; |
| | | if (sendCostMs >= SEND_COST_WARN_MS) { |
| | | log.warn("输送命令底层发送耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, sendCostMs={}", |
| | |
| | | } |
| | | } |
| | | |
| | | private String buildStationExecuteLockKey() { |
| | | Integer deviceNo = deviceConfig == null ? null : deviceConfig.getDeviceNo(); |
| | | return RedisKeyType.STATION_EXECUTE_COMMAND_LOCK.key + ":" + deviceNo; |
| | | } |
| | | |
| | | private void releaseDeviceSendLock(String lockKey, String lockToken) { |
| | | if (redisUtil == null || lockKey == null) { |
| | | return; |
| | | } |
| | | try { |
| | | redisUtil.compareAndDelete(lockKey, lockToken); |
| | | } catch (Exception e) { |
| | | log.warn("释放输送设备发送锁失败,lockKey={}", lockKey, e); |
| | | } |
| | | } |
| | | |
| | | public CommandResponse sendOriginCommand(String address, short[] data) { |
| | | ZyStationConnectApi connectApi = zyStationConnectApi; |
| | | if (!connected || connecting || connectApi == null) { |
| New file |
| | |
| | | package com.zy.core.task; |
| | | |
| | | import com.zy.asrs.entity.BasStationOpt; |
| | | import com.zy.asrs.service.BasStationOptService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.concurrent.ArrayBlockingQueue; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class BasStationOptAsyncPublisher { |
| | | |
| | | private static final String LANE_NAME = "bas-station-opt-save"; |
| | | private static final String TASK_NAME = "publish-bas-station-opt"; |
| | | private static final long MIN_INTERVAL_MS = 0L; |
| | | private static final int DEFAULT_QUEUE_CAPACITY = 2048; |
| | | |
| | | private final ArrayBlockingQueue<BasStationOpt> pendingQueue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); |
| | | |
| | | @Autowired |
| | | private MainProcessTaskSubmitter mainProcessTaskSubmitter; |
| | | @Autowired |
| | | private BasStationOptService basStationOptService; |
| | | |
| | | public boolean publish(BasStationOpt basStationOpt) { |
| | | if (basStationOpt == null) { |
| | | return true; |
| | | } |
| | | if (!pendingQueue.offer(basStationOpt)) { |
| | | log.error("BasStationOpt async publish queue full, fallback to sync save, taskNo={}, stationId={}, targetStationId={}", |
| | | basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId()); |
| | | return false; |
| | | } |
| | | mainProcessTaskSubmitter.submitSerialTask(LANE_NAME, TASK_NAME, MIN_INTERVAL_MS, this::drain); |
| | | return true; |
| | | } |
| | | |
| | | private void drain() { |
| | | while (true) { |
| | | BasStationOpt basStationOpt = pendingQueue.poll(); |
| | | if (basStationOpt == null) { |
| | | return; |
| | | } |
| | | try { |
| | | basStationOptService.save(basStationOpt); |
| | | } catch (Exception e) { |
| | | log.error("BasStationOpt async publish error, fallback to sync save next time, taskNo={}, stationId={}, targetStationId={}", |
| | | basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), e); |
| | | try { |
| | | basStationOptService.save(basStationOpt); |
| | | } catch (Exception ex) { |
| | | log.error("BasStationOpt sync fallback save error, taskNo={}, stationId={}, targetStationId={}", |
| | | basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), ex); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | import com.zy.core.network.ZyStationConnectDriver; |
| | | import com.zy.core.network.entity.ZyStationStatusEntity; |
| | | import com.zy.core.service.StationTaskLoopService; |
| | | import com.zy.core.task.BasStationOptAsyncPublisher; |
| | | import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider; |
| | | import com.zy.core.thread.impl.v5.StationV5RunBlockReroutePlanner; |
| | | import com.zy.core.thread.impl.v5.StationV5SegmentExecutor; |
| | | import com.zy.core.thread.impl.v5.StationV5StatusReader; |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Objects; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ArrayBlockingQueue; |
| | | import java.util.concurrent.RejectedExecutionException; |
| | | import java.util.concurrent.ThreadPoolExecutor; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | @Data |
| | | @Slf4j |
| | | public class ZyStationV5Thread implements Runnable, com.zy.core.thread.StationThread { |
| | | |
| | | private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128; |
| | | private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize"; |
| | | private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512; |
| | | private static final int EXECUTOR_QUEUE_WARN_THRESHOLD = 20; |
| | | private static final int EXECUTOR_ACTIVE_WARN_THRESHOLD = 48; |
| | | private static final long SEGMENT_EXECUTE_WARN_MS = 10_000L; |
| | | private static final int QUEUE_DRAIN_BATCH_SIZE = 32; |
| | | private static final long QUEUE_IDLE_SLEEP_MS = 20L; |
| | | |
| | | private DeviceConfig deviceConfig; |
| | | private RedisUtil redisUtil; |
| | | private ZyStationConnectDriver zyStationConnectDriver; |
| | | private final ExecutorService executor; |
| | | private final ThreadPoolExecutor executor; |
| | | private StationV5SegmentExecutor segmentExecutor; |
| | | private final RecentStationArrivalTracker recentArrivalTracker; |
| | | private final StationV5StatusReader statusReader; |
| | | private final StationV5RunBlockReroutePlanner runBlockReroutePlanner; |
| | | private final int executorQueueCapacity; |
| | | private final BasStationOptAsyncPublisher basStationOptAsyncPublisher; |
| | | |
| | | public ZyStationV5Thread(DeviceConfig deviceConfig, RedisUtil redisUtil) { |
| | | this.deviceConfig = deviceConfig; |
| | | this.redisUtil = redisUtil; |
| | | int poolSize = resolveSegmentExecutorPoolSize(redisUtil); |
| | | this.executor = Executors.newFixedThreadPool(poolSize); |
| | | StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class); |
| | | int poolSize = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE : configProvider.getSegmentExecutorPoolSize(); |
| | | this.executorQueueCapacity = configProvider == null ? DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY : configProvider.getSegmentExecutorQueueCapacity(); |
| | | this.executor = new ThreadPoolExecutor( |
| | | poolSize, |
| | | poolSize, |
| | | 0L, |
| | | TimeUnit.MILLISECONDS, |
| | | new ArrayBlockingQueue<>(executorQueueCapacity) |
| | | ); |
| | | this.recentArrivalTracker = new RecentStationArrivalTracker(redisUtil); |
| | | this.segmentExecutor = new StationV5SegmentExecutor(deviceConfig, redisUtil, this::sendCommand); |
| | | this.statusReader = new StationV5StatusReader(deviceConfig, redisUtil, recentArrivalTracker); |
| | | this.runBlockReroutePlanner = new StationV5RunBlockReroutePlanner(redisUtil); |
| | | log.info("初始化V5输送线程池,deviceNo={}, poolSize={}", deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize); |
| | | this.basStationOptAsyncPublisher = SpringUtils.getBean(BasStationOptAsyncPublisher.class); |
| | | log.info("初始化V5输送线程池,deviceNo={}, poolSize={}, queueCapacity={}", |
| | | deviceConfig == null ? null : deviceConfig.getDeviceNo(), poolSize, executorQueueCapacity); |
| | | } |
| | | |
| | | @Override |
| | |
| | | Thread processThread = new Thread(() -> { |
| | | while (true) { |
| | | try { |
| | | pollAndDispatchQueuedCommand(); |
| | | Thread.sleep(100); |
| | | int dispatchedCount = pollAndDispatchQueuedCommandBatch(); |
| | | if (dispatchedCount <= 0) { |
| | | Thread.sleep(QUEUE_IDLE_SLEEP_MS); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("StationV5Process Fail", e); |
| | | } |
| | |
| | | return statusReader.getTaskNoList(); |
| | | } |
| | | |
| | | private void pollAndDispatchQueuedCommand() { |
| | | Task task = MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo()); |
| | | if (task == null || task.getStep() == null || task.getStep() != 2) { |
| | | return; |
| | | private int pollAndDispatchQueuedCommandBatch() { |
| | | int dispatchedCount = 0; |
| | | while (dispatchedCount < QUEUE_DRAIN_BATCH_SIZE) { |
| | | if (isExecutorQueueAtWatermark()) { |
| | | logExecutorAbnormal("executor-watermark", null); |
| | | break; |
| | | } |
| | | Task task = MessageQueue.peek(SlaveType.Devp, deviceConfig.getDeviceNo()); |
| | | if (task == null || task.getStep() == null || task.getStep() != 2) { |
| | | break; |
| | | } |
| | | StationCommand command = (StationCommand) task.getData(); |
| | | logExecutorAbnormal("queue-peek", command); |
| | | if (!submitSegmentCommand(command)) { |
| | | logExecutorAbnormal("submit-rejected", command); |
| | | break; |
| | | } |
| | | MessageQueue.poll(SlaveType.Devp, deviceConfig.getDeviceNo()); |
| | | dispatchedCount++; |
| | | } |
| | | StationCommand command = (StationCommand) task.getData(); |
| | | logExecutorAbnormal("queue-poll", command); |
| | | submitSegmentCommand(command); |
| | | return dispatchedCount; |
| | | } |
| | | |
| | | private void submitSegmentCommand(StationCommand command) { |
| | | private boolean submitSegmentCommand(StationCommand command) { |
| | | if (command == null || executor == null || segmentExecutor == null) { |
| | | return; |
| | | return false; |
| | | } |
| | | executor.submit(() -> { |
| | | long start = System.currentTimeMillis(); |
| | | try { |
| | | segmentExecutor.execute(command); |
| | | } finally { |
| | | long costMs = System.currentTimeMillis() - start; |
| | | if (costMs >= SEGMENT_EXECUTE_WARN_MS) { |
| | | log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}", |
| | | deviceConfig.getDeviceNo(), |
| | | command.getTaskNo(), |
| | | command.getStationId(), |
| | | command.getTargetStaNo(), |
| | | costMs); |
| | | logExecutorAbnormal("segment-finish-slow", command); |
| | | try { |
| | | executor.execute(() -> { |
| | | long start = System.currentTimeMillis(); |
| | | try { |
| | | segmentExecutor.execute(command); |
| | | } finally { |
| | | long costMs = System.currentTimeMillis() - start; |
| | | if (costMs >= SEGMENT_EXECUTE_WARN_MS) { |
| | | log.warn("V5输送命令分段执行耗时过长,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, costMs={}", |
| | | deviceConfig.getDeviceNo(), |
| | | command.getTaskNo(), |
| | | command.getStationId(), |
| | | command.getTargetStaNo(), |
| | | costMs); |
| | | logExecutorAbnormal("segment-finish-slow", command); |
| | | } |
| | | } |
| | | } |
| | | }); |
| | | }); |
| | | } catch (RejectedExecutionException e) { |
| | | log.error("V5输送线程池拒绝执行,保留设备队列积压,deviceNo={}, taskNo={}, stationId={}, targetStaNo={}, activeCount={}, queuedCount={}, queueCapacity={}", |
| | | deviceConfig.getDeviceNo(), |
| | | command.getTaskNo(), |
| | | command.getStationId(), |
| | | command.getTargetStaNo(), |
| | | executor.getActiveCount(), |
| | | executor.getQueue() == null ? 0 : executor.getQueue().size(), |
| | | executorQueueCapacity); |
| | | return false; |
| | | } |
| | | logExecutorAbnormal("after-submit", command); |
| | | return true; |
| | | } |
| | | |
| | | private boolean isExecutorQueueAtWatermark() { |
| | | if (executor == null || executor.getQueue() == null) { |
| | | return false; |
| | | } |
| | | return executor.getQueue().size() >= executorQueueCapacity; |
| | | } |
| | | |
| | | @Override |
| | |
| | | commandResponse != null && Boolean.TRUE.equals(commandResponse.getResult()) ? 1 : 0, |
| | | JSON.toJSONString(commandResponse) |
| | | ); |
| | | optService.save(basStationOpt); |
| | | if (basStationOptAsyncPublisher == null || !basStationOptAsyncPublisher.publish(basStationOpt)) { |
| | | optService.save(basStationOpt); |
| | | } |
| | | } |
| | | return commandResponse; |
| | | } |
| | | |
| | | private void logExecutorAbnormal(String scene, StationCommand command) { |
| | | if (!(executor instanceof ThreadPoolExecutor)) { |
| | | if (executor == null) { |
| | | return; |
| | | } |
| | | ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; |
| | | int activeCount = threadPoolExecutor.getActiveCount(); |
| | | int queuedCount = threadPoolExecutor.getQueue() == null ? 0 : threadPoolExecutor.getQueue().size(); |
| | | int activeCount = executor.getActiveCount(); |
| | | int queuedCount = executor.getQueue() == null ? 0 : executor.getQueue().size(); |
| | | if (activeCount < EXECUTOR_ACTIVE_WARN_THRESHOLD && queuedCount < EXECUTOR_QUEUE_WARN_THRESHOLD) { |
| | | return; |
| | | } |
| | |
| | | command == null ? null : command.getTaskNo(), |
| | | command == null ? null : command.getStationId(), |
| | | command == null ? null : command.getTargetStaNo(), |
| | | threadPoolExecutor.getPoolSize(), |
| | | executor.getPoolSize(), |
| | | activeCount, |
| | | queuedCount, |
| | | threadPoolExecutor.getCompletedTaskCount()); |
| | | } |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | private int resolveSegmentExecutorPoolSize(RedisUtil redisUtil) { |
| | | if (redisUtil == null) { |
| | | return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; |
| | | } |
| | | try { |
| | | Object systemConfigMapObj = redisUtil.get(com.zy.core.enums.RedisKeyType.SYSTEM_CONFIG_MAP.key); |
| | | if (!(systemConfigMapObj instanceof HashMap)) { |
| | | return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; |
| | | } |
| | | HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj; |
| | | String poolSizeText = systemConfigMap.get(CFG_SEGMENT_EXECUTOR_POOL_SIZE); |
| | | if (poolSizeText == null || poolSizeText.trim().isEmpty()) { |
| | | return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; |
| | | } |
| | | int configured = Integer.parseInt(poolSizeText.trim()); |
| | | if (configured < 16) { |
| | | return 16; |
| | | } |
| | | return Math.min(configured, 512); |
| | | } catch (Exception ignore) { |
| | | return DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE; |
| | | } |
| | | executor.getCompletedTaskCount()); |
| | | } |
| | | |
| | | @Override |
| | |
| | | import com.zy.core.model.protocol.StationProtocol; |
| | | import com.zy.core.move.StationMoveCoordinator; |
| | | import com.zy.core.trace.StationTaskTraceRegistry; |
| | | import com.zy.core.thread.impl.v5.StationV5RuntimeConfigProvider; |
| | | import com.zy.core.thread.support.StationTaskLocationRegistry; |
| | | import com.zy.system.entity.Config; |
| | | import com.zy.system.service.ConfigService; |
| | | |
| | |
| | | private static final String CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio"; |
| | | private static final double DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = 0.3d; |
| | | private static final long CURRENT_STATION_TIMEOUT_MS = 1000L * 60L; |
| | | private static final long TASK_LOCATION_STALE_MS = 2_000L; |
| | | |
| | | private final DeviceConfig deviceConfig; |
| | | private final RedisUtil redisUtil; |
| | |
| | | } |
| | | |
| | | private double loadSegmentAdvanceRatio() { |
| | | if (isV5ThreadImpl()) { |
| | | StationV5RuntimeConfigProvider configProvider = SpringUtils.getBean(StationV5RuntimeConfigProvider.class); |
| | | if (configProvider != null) { |
| | | return configProvider.getSegmentAdvanceRatio(); |
| | | } |
| | | return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO; |
| | | } |
| | | try { |
| | | ConfigService configService = SpringUtils.getBean(ConfigService.class); |
| | | if (configService == null) { |
| | |
| | | } |
| | | |
| | | private StationProtocol findCurrentStationByTask(Integer taskNo) { |
| | | if (isV5ThreadImpl()) { |
| | | return findCurrentStationByTaskFromRegistry(taskNo); |
| | | } |
| | | try { |
| | | com.zy.asrs.service.DeviceConfigService deviceConfigService = SpringUtils.getBean(com.zy.asrs.service.DeviceConfigService.class); |
| | | if (deviceConfigService == null) { |
| | |
| | | return null; |
| | | } |
| | | |
| | | private StationProtocol findCurrentStationByTaskFromRegistry(Integer taskNo) { |
| | | StationTaskLocationRegistry registry = SpringUtils.getBean(StationTaskLocationRegistry.class); |
| | | if (registry == null) { |
| | | return null; |
| | | } |
| | | StationTaskLocationRegistry.TaskLocationSnapshot snapshot = registry.findActive(taskNo, TASK_LOCATION_STALE_MS); |
| | | if (snapshot == null || !snapshot.isLoading()) { |
| | | return null; |
| | | } |
| | | StationProtocol stationProtocol = new StationProtocol(); |
| | | stationProtocol.setTaskNo(snapshot.getTaskNo()); |
| | | stationProtocol.setStationId(snapshot.getStationId()); |
| | | stationProtocol.setRunBlock(snapshot.isRunBlock()); |
| | | stationProtocol.setLoading(true); |
| | | return stationProtocol; |
| | | } |
| | | |
| | | private boolean isV5ThreadImpl() { |
| | | return deviceConfig != null && "ZyStationV5Thread".equals(deviceConfig.getThreadImpl()); |
| | | } |
| | | |
| | | private List<StationTaskTraceSegmentVo> buildTraceSegments(List<StationCommand> segmentCommands) { |
| | | List<StationTaskTraceSegmentVo> result = new ArrayList<>(); |
| | | if (segmentCommands == null) { |
| New file |
| | |
| | | package com.zy.core.thread.impl.v5; |
| | | |
| | | import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
| | | import com.core.common.Cools; |
| | | import com.zy.common.utils.RedisUtil; |
| | | import com.zy.core.enums.RedisKeyType; |
| | | import com.zy.system.entity.Config; |
| | | import com.zy.system.service.ConfigService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.HashMap; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class StationV5RuntimeConfigProvider { |
| | | |
| | | private static final String CFG_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio"; |
| | | private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize"; |
| | | private static final String CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY = "stationV5SegmentExecutorQueueCapacity"; |
| | | private static final String CFG_CONFIG_REFRESH_SECONDS = "stationCommandConfigRefreshSeconds"; |
| | | |
| | | private static final double DEFAULT_SEGMENT_ADVANCE_RATIO = 0.3d; |
| | | private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128; |
| | | private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512; |
| | | private static final int DEFAULT_CONFIG_REFRESH_SECONDS = 30; |
| | | |
| | | @Autowired(required = false) |
| | | private RedisUtil redisUtil; |
| | | @Autowired(required = false) |
| | | private ConfigService configService; |
| | | |
| | | private volatile CacheSnapshot cacheSnapshot = new CacheSnapshot( |
| | | DEFAULT_SEGMENT_ADVANCE_RATIO, |
| | | DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE, |
| | | DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY, |
| | | DEFAULT_CONFIG_REFRESH_SECONDS, |
| | | 0L |
| | | ); |
| | | |
| | | public double getSegmentAdvanceRatio() { |
| | | return loadSnapshot().segmentAdvanceRatio; |
| | | } |
| | | |
| | | public int getSegmentExecutorPoolSize() { |
| | | return loadSnapshot().segmentExecutorPoolSize; |
| | | } |
| | | |
| | | public int getSegmentExecutorQueueCapacity() { |
| | | return loadSnapshot().segmentExecutorQueueCapacity; |
| | | } |
| | | |
| | | private CacheSnapshot loadSnapshot() { |
| | | CacheSnapshot snapshot = cacheSnapshot; |
| | | long now = System.currentTimeMillis(); |
| | | if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) { |
| | | return snapshot; |
| | | } |
| | | synchronized (this) { |
| | | snapshot = cacheSnapshot; |
| | | now = System.currentTimeMillis(); |
| | | if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) { |
| | | return snapshot; |
| | | } |
| | | CacheSnapshot refreshed = refreshSnapshot(now); |
| | | cacheSnapshot = refreshed; |
| | | return refreshed; |
| | | } |
| | | } |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | private CacheSnapshot refreshSnapshot(long now) { |
| | | HashMap<String, String> systemConfigMap = null; |
| | | try { |
| | | Object systemConfigMapObj = redisUtil == null ? null : redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key); |
| | | if (systemConfigMapObj instanceof HashMap) { |
| | | systemConfigMap = (HashMap<String, String>) systemConfigMapObj; |
| | | } |
| | | } catch (Exception e) { |
| | | log.warn("加载 V5 输送运行时配置缓存失败,fallback to db. reason=redis-read-error", e); |
| | | } |
| | | |
| | | int refreshSeconds = normalizeRefreshSeconds(readConfigText(systemConfigMap, CFG_CONFIG_REFRESH_SECONDS)); |
| | | double segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigText(systemConfigMap, CFG_SEGMENT_ADVANCE_RATIO)); |
| | | int poolSize = normalizePoolSize(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_POOL_SIZE)); |
| | | int queueCapacity = normalizeQueueCapacity(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY)); |
| | | |
| | | if (systemConfigMap == null) { |
| | | segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigTextFromDb(CFG_SEGMENT_ADVANCE_RATIO), segmentAdvanceRatio); |
| | | poolSize = normalizePoolSize(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_POOL_SIZE), poolSize); |
| | | queueCapacity = normalizeQueueCapacity(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY), queueCapacity); |
| | | refreshSeconds = normalizeRefreshSeconds(readConfigTextFromDb(CFG_CONFIG_REFRESH_SECONDS), refreshSeconds); |
| | | } |
| | | return new CacheSnapshot(segmentAdvanceRatio, poolSize, queueCapacity, refreshSeconds, now); |
| | | } |
| | | |
| | | private String readConfigText(HashMap<String, String> systemConfigMap, String code) { |
| | | if (systemConfigMap == null || code == null) { |
| | | return null; |
| | | } |
| | | return systemConfigMap.get(code); |
| | | } |
| | | |
| | | private String readConfigTextFromDb(String code) { |
| | | if (configService == null || code == null) { |
| | | return null; |
| | | } |
| | | try { |
| | | Config config = configService.getOne(new QueryWrapper<Config>().eq("code", code)); |
| | | return config == null ? null : config.getValue(); |
| | | } catch (Exception e) { |
| | | log.warn("加载 V5 输送运行时配置数据库失败,code={}", code, e); |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | private double normalizeSegmentAdvanceRatio(String valueText) { |
| | | return normalizeSegmentAdvanceRatio(valueText, DEFAULT_SEGMENT_ADVANCE_RATIO); |
| | | } |
| | | |
| | | private double normalizeSegmentAdvanceRatio(String valueText, double defaultValue) { |
| | | if (valueText == null) { |
| | | return defaultValue; |
| | | } |
| | | String text = valueText.trim(); |
| | | if (text.isEmpty()) { |
| | | return defaultValue; |
| | | } |
| | | if (text.endsWith("%")) { |
| | | text = text.substring(0, text.length() - 1).trim(); |
| | | } |
| | | try { |
| | | double ratio = Double.parseDouble(text); |
| | | if (ratio > 1d && ratio <= 100d) { |
| | | ratio = ratio / 100d; |
| | | } |
| | | if (ratio < 0d) { |
| | | return 0d; |
| | | } |
| | | if (ratio > 1d) { |
| | | return 1d; |
| | | } |
| | | return ratio; |
| | | } catch (Exception ignore) { |
| | | return defaultValue; |
| | | } |
| | | } |
| | | |
| | | private int normalizePoolSize(String valueText) { |
| | | return normalizePoolSize(valueText, DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE); |
| | | } |
| | | |
| | | private int normalizePoolSize(String valueText, int defaultValue) { |
| | | int configured = parsePositiveInt(valueText, defaultValue); |
| | | if (configured < 16) { |
| | | return 16; |
| | | } |
| | | return Math.min(configured, 512); |
| | | } |
| | | |
| | | private int normalizeQueueCapacity(String valueText) { |
| | | return normalizeQueueCapacity(valueText, DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY); |
| | | } |
| | | |
| | | private int normalizeQueueCapacity(String valueText, int defaultValue) { |
| | | int configured = parsePositiveInt(valueText, defaultValue); |
| | | if (configured < 64) { |
| | | return 64; |
| | | } |
| | | return Math.min(configured, 4096); |
| | | } |
| | | |
| | | private int normalizeRefreshSeconds(String valueText) { |
| | | return normalizeRefreshSeconds(valueText, DEFAULT_CONFIG_REFRESH_SECONDS); |
| | | } |
| | | |
| | | private int normalizeRefreshSeconds(String valueText, int defaultValue) { |
| | | int configured = parsePositiveInt(valueText, defaultValue); |
| | | if (configured < 5) { |
| | | return 5; |
| | | } |
| | | return Math.min(configured, 300); |
| | | } |
| | | |
| | | private int parsePositiveInt(String valueText, int defaultValue) { |
| | | if (Cools.isEmpty(valueText)) { |
| | | return defaultValue; |
| | | } |
| | | try { |
| | | return Integer.parseInt(valueText.trim()); |
| | | } catch (Exception ignore) { |
| | | return defaultValue; |
| | | } |
| | | } |
| | | |
| | | private static class CacheSnapshot { |
| | | private final double segmentAdvanceRatio; |
| | | private final int segmentExecutorPoolSize; |
| | | private final int segmentExecutorQueueCapacity; |
| | | private final int refreshSeconds; |
| | | private final long loadedAtMs; |
| | | |
| | | private CacheSnapshot(double segmentAdvanceRatio, |
| | | int segmentExecutorPoolSize, |
| | | int segmentExecutorQueueCapacity, |
| | | int refreshSeconds, |
| | | long loadedAtMs) { |
| | | this.segmentAdvanceRatio = segmentAdvanceRatio; |
| | | this.segmentExecutorPoolSize = segmentExecutorPoolSize; |
| | | this.segmentExecutorQueueCapacity = segmentExecutorQueueCapacity; |
| | | this.refreshSeconds = refreshSeconds; |
| | | this.loadedAtMs = loadedAtMs; |
| | | } |
| | | } |
| | | } |
| | |
| | | import com.zy.core.task.DeviceAsyncLogPublisher; |
| | | import com.zy.core.thread.support.RecentStationArrivalTracker; |
| | | import com.zy.core.thread.support.StationErrLogSupport; |
| | | import com.zy.core.thread.support.StationTaskLocationRegistry; |
| | | |
| | | import java.text.MessageFormat; |
| | | import java.util.ArrayList; |
| | |
| | | import java.util.HashMap; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | public class StationV5StatusReader { |
| | | |
| | |
| | | private final RedisUtil redisUtil; |
| | | private final RecentStationArrivalTracker recentArrivalTracker; |
| | | private final DeviceAsyncLogPublisher devpAsyncLogPublisher; |
| | | private final StationTaskLocationRegistry stationTaskLocationRegistry; |
| | | private final List<StationProtocol> statusList = new ArrayList<>(); |
| | | private final Map<Integer, StationProtocol> statusMap = new HashMap<>(); |
| | | private volatile List<Integer> taskNoList = new ArrayList<>(); |
| | | private boolean initialized = false; |
| | | private long deviceDataLogTime = System.currentTimeMillis(); |
| | |
| | | this.redisUtil = redisUtil; |
| | | this.recentArrivalTracker = recentArrivalTracker; |
| | | this.devpAsyncLogPublisher = SpringUtils.getBean(DeviceAsyncLogPublisher.class); |
| | | this.stationTaskLocationRegistry = SpringUtils.getBean(StationTaskLocationRegistry.class); |
| | | } |
| | | |
| | | public void readStatus(ZyStationConnectDriver zyStationConnectDriver) { |
| | |
| | | StationProtocol stationProtocol = new StationProtocol(); |
| | | stationProtocol.setStationId(entity.getStationId()); |
| | | statusList.add(stationProtocol); |
| | | statusMap.put(entity.getStationId(), stationProtocol); |
| | | } |
| | | initialized = true; |
| | | } |
| | | |
| | | int deviceLogCollectTime = initialized ? Utils.getDeviceLogCollectTime() : 200; |
| | | List<ZyStationStatusEntity> zyStationStatusEntities = zyStationConnectDriver.getStatus(); |
| | | if (zyStationStatusEntities == null || zyStationStatusEntities.isEmpty()) { |
| | | return; |
| | | } |
| | | LinkedHashSet<Integer> taskNoSet = new LinkedHashSet<>(); |
| | | LinkedHashSet<Integer> loadingTaskNoSet = new LinkedHashSet<>(); |
| | | long observeAt = System.currentTimeMillis(); |
| | | for (ZyStationStatusEntity statusEntity : zyStationStatusEntities) { |
| | | for (StationProtocol stationProtocol : statusList) { |
| | | if (stationProtocol.getStationId().equals(statusEntity.getStationId())) { |
| | | stationProtocol.setTaskNo(statusEntity.getTaskNo()); |
| | | stationProtocol.setTargetStaNo(statusEntity.getTargetStaNo()); |
| | | stationProtocol.setAutoing(statusEntity.isAutoing()); |
| | | stationProtocol.setLoading(statusEntity.isLoading()); |
| | | stationProtocol.setInEnable(statusEntity.isInEnable()); |
| | | stationProtocol.setOutEnable(statusEntity.isOutEnable()); |
| | | stationProtocol.setEmptyMk(statusEntity.isEmptyMk()); |
| | | stationProtocol.setFullPlt(statusEntity.isFullPlt()); |
| | | stationProtocol.setPalletHeight(statusEntity.getPalletHeight()); |
| | | stationProtocol.setError(statusEntity.getError()); |
| | | stationProtocol.setErrorMsg(statusEntity.getErrorMsg()); |
| | | stationProtocol.setBarcode(statusEntity.getBarcode()); |
| | | stationProtocol.setRunBlock(statusEntity.isRunBlock()); |
| | | stationProtocol.setEnableIn(statusEntity.isEnableIn()); |
| | | stationProtocol.setWeight(statusEntity.getWeight()); |
| | | stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx()); |
| | | stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems()); |
| | | stationProtocol.setIoMode(statusEntity.getIoMode()); |
| | | stationProtocol.setInBarcodeError(statusEntity.isInBarcodeError()); |
| | | if (statusEntity.getTaskNo() != null && statusEntity.getTaskNo() > 0) { |
| | | taskNoSet.add(statusEntity.getTaskNo()); |
| | | } |
| | | if (statusEntity.getTaskBufferItems() != null) { |
| | | statusEntity.getTaskBufferItems().forEach(item -> { |
| | | Integer bufferTaskNo = item == null ? null : item.getTaskNo(); |
| | | if (bufferTaskNo != null && bufferTaskNo > 0) { |
| | | taskNoSet.add(bufferTaskNo); |
| | | } |
| | | }); |
| | | } |
| | | recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading()); |
| | | if (statusEntity == null || statusEntity.getStationId() == null) { |
| | | continue; |
| | | } |
| | | StationProtocol stationProtocol = statusMap.get(statusEntity.getStationId()); |
| | | if (stationProtocol == null) { |
| | | continue; |
| | | } |
| | | stationProtocol.setTaskNo(statusEntity.getTaskNo()); |
| | | stationProtocol.setTargetStaNo(statusEntity.getTargetStaNo()); |
| | | stationProtocol.setAutoing(statusEntity.isAutoing()); |
| | | stationProtocol.setLoading(statusEntity.isLoading()); |
| | | stationProtocol.setInEnable(statusEntity.isInEnable()); |
| | | stationProtocol.setOutEnable(statusEntity.isOutEnable()); |
| | | stationProtocol.setEmptyMk(statusEntity.isEmptyMk()); |
| | | stationProtocol.setFullPlt(statusEntity.isFullPlt()); |
| | | stationProtocol.setPalletHeight(statusEntity.getPalletHeight()); |
| | | stationProtocol.setError(statusEntity.getError()); |
| | | stationProtocol.setErrorMsg(statusEntity.getErrorMsg()); |
| | | stationProtocol.setBarcode(statusEntity.getBarcode()); |
| | | stationProtocol.setRunBlock(statusEntity.isRunBlock()); |
| | | stationProtocol.setEnableIn(statusEntity.isEnableIn()); |
| | | stationProtocol.setWeight(statusEntity.getWeight()); |
| | | stationProtocol.setTaskWriteIdx(statusEntity.getTaskWriteIdx()); |
| | | stationProtocol.setTaskBufferItems(statusEntity.getTaskBufferItems()); |
| | | stationProtocol.setIoMode(statusEntity.getIoMode()); |
| | | stationProtocol.setInBarcodeError(statusEntity.isInBarcodeError()); |
| | | if (statusEntity.getTaskNo() != null && statusEntity.getTaskNo() > 0) { |
| | | taskNoSet.add(statusEntity.getTaskNo()); |
| | | if (statusEntity.isLoading()) { |
| | | loadingTaskNoSet.add(statusEntity.getTaskNo()); |
| | | } |
| | | |
| | | if (!Cools.isEmpty(stationProtocol.getSystemWarning())) { |
| | | if (stationProtocol.isAutoing() && !stationProtocol.isLoading()) { |
| | | stationProtocol.setSystemWarning(""); |
| | | } |
| | | if (statusEntity.getTaskBufferItems() != null) { |
| | | statusEntity.getTaskBufferItems().forEach(item -> { |
| | | Integer bufferTaskNo = item == null ? null : item.getTaskNo(); |
| | | if (bufferTaskNo != null && bufferTaskNo > 0) { |
| | | taskNoSet.add(bufferTaskNo); |
| | | } |
| | | } |
| | | }); |
| | | } |
| | | recentArrivalTracker.observe(statusEntity.getStationId(), statusEntity.getTaskNo(), statusEntity.isLoading()); |
| | | syncTaskLocation(statusEntity, observeAt); |
| | | if (!Cools.isEmpty(stationProtocol.getSystemWarning()) |
| | | && stationProtocol.isAutoing() |
| | | && !stationProtocol.isLoading()) { |
| | | stationProtocol.setSystemWarning(""); |
| | | } |
| | | } |
| | | taskNoList = new ArrayList<>(taskNoSet); |
| | | if (stationTaskLocationRegistry != null) { |
| | | stationTaskLocationRegistry.cleanupByDevice(deviceConfig.getDeviceNo(), loadingTaskNoSet); |
| | | } |
| | | |
| | | OutputQueue.DEVP.offer(MessageFormat.format("【{0}】[id:{1}] <<<<< 实时数据更新成功", |
| | | DateUtils.convert(new Date()), deviceConfig.getDeviceNo())); |
| | |
| | | public List<Integer> getTaskNoList() { |
| | | return taskNoList; |
| | | } |
| | | |
| | | private void syncTaskLocation(ZyStationStatusEntity statusEntity, long observeAt) { |
| | | if (stationTaskLocationRegistry == null || statusEntity == null) { |
| | | return; |
| | | } |
| | | Integer taskNo = statusEntity.getTaskNo(); |
| | | if (taskNo != null && taskNo > 0 && statusEntity.isLoading()) { |
| | | stationTaskLocationRegistry.update( |
| | | taskNo, |
| | | deviceConfig.getDeviceNo(), |
| | | statusEntity.getStationId(), |
| | | true, |
| | | statusEntity.isRunBlock(), |
| | | observeAt |
| | | ); |
| | | return; |
| | | } |
| | | stationTaskLocationRegistry.remove(taskNo, deviceConfig.getDeviceNo(), statusEntity.getStationId()); |
| | | } |
| | | } |
| New file |
| | |
| | | package com.zy.core.thread.support; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.Map; |
| | | import java.util.Objects; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class StationTaskLocationRegistry { |
| | | |
| | | private static final long DEFAULT_STALE_THRESHOLD_MS = 2_000L; |
| | | private static final long LOOKUP_WARN_INTERVAL_MS = 5_000L; |
| | | |
| | | private final ConcurrentHashMap<Integer, TaskLocationSnapshot> loadingTaskLocationMap = new ConcurrentHashMap<>(); |
| | | private final ConcurrentHashMap<Integer, Long> lookupWarnAtMap = new ConcurrentHashMap<>(); |
| | | |
| | | public void update(Integer taskNo, |
| | | Integer deviceNo, |
| | | Integer stationId, |
| | | boolean loading, |
| | | boolean runBlock, |
| | | long updateTime) { |
| | | if (taskNo == null || taskNo <= 0) { |
| | | return; |
| | | } |
| | | if (!loading || deviceNo == null || stationId == null) { |
| | | remove(taskNo, null, null); |
| | | return; |
| | | } |
| | | loadingTaskLocationMap.put(taskNo, new TaskLocationSnapshot(taskNo, deviceNo, stationId, true, runBlock, updateTime)); |
| | | } |
| | | |
| | | public void remove(Integer taskNo, Integer deviceNo, Integer stationId) { |
| | | if (taskNo == null || taskNo <= 0) { |
| | | return; |
| | | } |
| | | loadingTaskLocationMap.computeIfPresent(taskNo, (key, snapshot) -> { |
| | | if (snapshot == null) { |
| | | return null; |
| | | } |
| | | if (deviceNo != null && !Objects.equals(deviceNo, snapshot.getDeviceNo())) { |
| | | return snapshot; |
| | | } |
| | | if (stationId != null && !Objects.equals(stationId, snapshot.getStationId())) { |
| | | return snapshot; |
| | | } |
| | | return null; |
| | | }); |
| | | } |
| | | |
| | | public TaskLocationSnapshot findActive(Integer taskNo) { |
| | | return findActive(taskNo, DEFAULT_STALE_THRESHOLD_MS); |
| | | } |
| | | |
| | | public TaskLocationSnapshot findActive(Integer taskNo, long staleThresholdMs) { |
| | | if (taskNo == null || taskNo <= 0) { |
| | | return null; |
| | | } |
| | | TaskLocationSnapshot snapshot = loadingTaskLocationMap.get(taskNo); |
| | | if (snapshot == null) { |
| | | warnLookup(taskNo, "miss", null); |
| | | return null; |
| | | } |
| | | long ageMs = Math.max(0L, System.currentTimeMillis() - snapshot.getUpdateTime()); |
| | | if (ageMs > staleThresholdMs) { |
| | | warnLookup(taskNo, "stale", ageMs); |
| | | return null; |
| | | } |
| | | return snapshot; |
| | | } |
| | | |
| | | public void cleanupByDevice(Integer deviceNo, Iterable<Integer> activeTaskNoList) { |
| | | if (deviceNo == null) { |
| | | return; |
| | | } |
| | | ConcurrentHashMap<Integer, Boolean> activeMap = new ConcurrentHashMap<>(); |
| | | if (activeTaskNoList != null) { |
| | | for (Integer taskNo : activeTaskNoList) { |
| | | if (taskNo != null && taskNo > 0) { |
| | | activeMap.put(taskNo, Boolean.TRUE); |
| | | } |
| | | } |
| | | } |
| | | for (Map.Entry<Integer, TaskLocationSnapshot> entry : loadingTaskLocationMap.entrySet()) { |
| | | TaskLocationSnapshot snapshot = entry.getValue(); |
| | | if (snapshot == null || !Objects.equals(deviceNo, snapshot.getDeviceNo())) { |
| | | continue; |
| | | } |
| | | if (!activeMap.containsKey(entry.getKey())) { |
| | | loadingTaskLocationMap.remove(entry.getKey(), snapshot); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void warnLookup(Integer taskNo, String reason, Long ageMs) { |
| | | long now = System.currentTimeMillis(); |
| | | Long lastWarnAt = lookupWarnAtMap.get(taskNo); |
| | | if (lastWarnAt != null && now - lastWarnAt < LOOKUP_WARN_INTERVAL_MS) { |
| | | return; |
| | | } |
| | | lookupWarnAtMap.put(taskNo, now); |
| | | if (ageMs == null) { |
| | | log.warn("task-location-registry miss, taskNo={}", taskNo); |
| | | return; |
| | | } |
| | | log.warn("task-location-registry stale, taskNo={}, ageMs={}", taskNo, ageMs); |
| | | } |
| | | |
| | | public static class TaskLocationSnapshot { |
| | | private final Integer taskNo; |
| | | private final Integer deviceNo; |
| | | private final Integer stationId; |
| | | private final boolean loading; |
| | | private final boolean runBlock; |
| | | private final long updateTime; |
| | | |
| | | public TaskLocationSnapshot(Integer taskNo, |
| | | Integer deviceNo, |
| | | Integer stationId, |
| | | boolean loading, |
| | | boolean runBlock, |
| | | long updateTime) { |
| | | this.taskNo = taskNo; |
| | | this.deviceNo = deviceNo; |
| | | this.stationId = stationId; |
| | | this.loading = loading; |
| | | this.runBlock = runBlock; |
| | | this.updateTime = updateTime; |
| | | } |
| | | |
| | | public Integer getTaskNo() { |
| | | return taskNo; |
| | | } |
| | | |
| | | public Integer getDeviceNo() { |
| | | return deviceNo; |
| | | } |
| | | |
| | | public Integer getStationId() { |
| | | return stationId; |
| | | } |
| | | |
| | | public boolean isLoading() { |
| | | return loading; |
| | | } |
| | | |
| | | public boolean isRunBlock() { |
| | | return runBlock; |
| | | } |
| | | |
| | | public long getUpdateTime() { |
| | | return updateTime; |
| | | } |
| | | } |
| | | } |
| | |
| | | return tryAcquireLock(RedisKeyType.STATION_OUT_ORDER_DISPATCH_LIMIT_.key + wrkNo + "_" + stationId, seconds); |
| | | } |
| | | |
| | | public boolean tryAcquireRunBlockDirectReassignLock(Integer wrkNo, Integer stationId, int seconds) { |
| | | if (wrkNo == null || wrkNo <= 0 || stationId == null) { |
| | | return true; |
| | | } |
| | | return tryAcquireLock(RedisKeyType.STATION_RUN_BLOCK_DIRECT_REASSIGN_LIMIT_.key + wrkNo + "_" + stationId, seconds); |
| | | } |
| | | |
| | | public void signalSegmentReset(Integer taskNo, long waitMs) { |
| | | if (redisUtil == null || taskNo == null || taskNo <= 0) { |
| | | return; |
| | |
| | | public class StationRerouteProcessor { |
| | | private static final int OUT_ORDER_DISPATCH_LIMIT_SECONDS = 2; |
| | | private static final long STATION_MOVE_RESET_WAIT_MS = 1000L; |
| | | private static final int RUN_BLOCK_DIRECT_REASSIGN_LIMIT_SECONDS = 15 * 60; |
| | | |
| | | @Autowired |
| | | private BasDevpService basDevpService; |
| | |
| | | if (basDevp == null || stationThread == null || stationProtocol == null || wrkMast == null) { |
| | | return; |
| | | } |
| | | if (isDirectReassignContextStale(stationProtocol, wrkMast)) { |
| | | return; |
| | | } |
| | | int currentTaskBufferCommandCount = countCurrentTaskBufferCommands( |
| | | stationProtocol.getTaskBufferItems(), |
| | | stationProtocol.getTaskNo() |
| | | ); |
| | | if (currentTaskBufferCommandCount > 0) { |
| | | News.info("输送站点运行堵塞重分配已跳过,缓存区仍存在当前任务命令。站点号={},工作号={},当前任务命令数={}", |
| | | News.info("输送站点运行堵塞重分配检测到旧分段命令残留,将先重置本地分段状态后继续重发。站点号={},工作号={},当前任务命令数={}", |
| | | stationProtocol.getStationId(), |
| | | stationProtocol.getTaskNo(), |
| | | currentTaskBufferCommandCount); |
| | | } |
| | | if (!stationDispatchRuntimeStateSupport.tryAcquireRunBlockDirectReassignLock( |
| | | wrkMast.getWrkNo(), |
| | | stationProtocol.getStationId(), |
| | | RUN_BLOCK_DIRECT_REASSIGN_LIMIT_SECONDS)) { |
| | | News.info("输送站点运行堵塞重分配已跳过,15分钟内不允许重复申请。站点号={},工作号={}", |
| | | stationProtocol.getStationId(), |
| | | wrkMast.getWrkNo()); |
| | | return; |
| | | } |
| | | String response = wmsOperateUtils.applyReassignTaskLocNo(wrkMast.getWrkNo(), stationProtocol.getStationId()); |
| | |
| | | if (!wrkMastService.updateById(wrkMast)) { |
| | | return; |
| | | } |
| | | stationDispatchRuntimeStateSupport.signalSegmentReset(wrkMast.getWrkNo(), STATION_MOVE_RESET_WAIT_MS); |
| | | boolean offered = offerDevpCommandWithDedup(basDevp.getDevpNo(), command, "checkStationRunBlock_direct"); |
| | | if (!offered) { |
| | | return; |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean isDirectReassignContextStale(StationProtocol stationProtocol, WrkMast wrkMast) { |
| | | if (stationProtocol == null || wrkMast == null || stationMoveCoordinator == null) { |
| | | return false; |
| | | } |
| | | Integer taskNo = wrkMast.getWrkNo(); |
| | | Integer triggerStationId = stationProtocol.getStationId(); |
| | | if (taskNo == null || taskNo <= 0 || triggerStationId == null) { |
| | | return false; |
| | | } |
| | | StationMoveSession session = stationMoveCoordinator.loadSession(taskNo); |
| | | if (session == null || !session.isActive()) { |
| | | return false; |
| | | } |
| | | Integer currentStationId = session.getCurrentStationId(); |
| | | if (currentStationId == null || Objects.equals(currentStationId, triggerStationId)) { |
| | | return false; |
| | | } |
| | | News.info("输送站点运行堵塞重分配已跳过,任务已离开触发站点。触发站点={},当前站点={},工作号={}", |
| | | triggerStationId, |
| | | currentStationId, |
| | | taskNo); |
| | | return true; |
| | | } |
| | | |
| | | private int countCurrentTaskBufferCommands(List<StationTaskBufferItem> taskBufferItems, Integer currentTaskNo) { |
| | | if (taskBufferItems == null || taskBufferItems.isEmpty() || currentTaskNo == null || currentTaskNo <= 0) { |
| | | return 0; |