package com.zy.core.thread.impl.v5; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.core.common.Cools; import com.core.common.SpringUtils; import com.zy.asrs.domain.vo.StationTaskTraceSegmentVo; import com.zy.asrs.entity.DeviceConfig; import com.zy.common.utils.RedisUtil; import com.zy.core.cache.SlaveConnection; import com.zy.core.enums.RedisKeyType; import com.zy.core.enums.SlaveType; import com.zy.core.enums.StationCommandType; import com.zy.core.model.CommandResponse; import com.zy.core.model.command.StationCommand; import com.zy.core.model.protocol.StationProtocol; import com.zy.core.trace.StationTaskTraceRegistry; import com.zy.system.entity.Config; import com.zy.system.service.ConfigService; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.Function; public class StationV5SegmentExecutor { private static final String CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio"; private static final double DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = 0.3d; private static final long CURRENT_STATION_TIMEOUT_MS = 1000L * 60L; private final DeviceConfig deviceConfig; private final RedisUtil redisUtil; private final Function commandSender; private final StationV5SegmentPlanner segmentPlanner = new StationV5SegmentPlanner(); public StationV5SegmentExecutor(DeviceConfig deviceConfig, RedisUtil redisUtil, Function commandSender) { this.deviceConfig = deviceConfig; this.redisUtil = redisUtil; this.commandSender = commandSender; } public void execute(StationCommand original) { if (original == null) { return; } if (original.getCommandType() != StationCommandType.MOVE) { commandSender.apply(original); return; } StationV5SegmentExecutionPlan localPlan = segmentPlanner.buildPlan(original); if (localPlan.getSegmentCommands().isEmpty()) { return; } StationTaskTraceRegistry traceRegistry = SpringUtils.getBean(StationTaskTraceRegistry.class); StationTaskTraceRegistry.TraceRegistration traceRegistration = traceRegistry == null ? new StationTaskTraceRegistry.TraceRegistration() : traceRegistry.registerPlan(original.getTaskNo(), deviceConfig.getThreadImpl(), original.getStationId(), original.getStationId(), original.getTargetStaNo(), localPlan.getFullPathStationIds(), buildTraceSegments(localPlan.getSegmentCommands())); int traceVersion = traceRegistration.getTraceVersion() == null ? 1 : traceRegistration.getTraceVersion(); int pathOffset = traceRegistration.getPathOffset() == null ? 0 : traceRegistration.getPathOffset(); bindCommands(localPlan.getSegmentCommands(), traceVersion, pathOffset); List effectiveFullPath = traceRegistration.getFullPathStationIds() == null || traceRegistration.getFullPathStationIds().isEmpty() ? copyIntegerList(localPlan.getFullPathStationIds()) : copyIntegerList(traceRegistration.getFullPathStationIds()); StationCommand firstCommand = localPlan.getSegmentCommands().get(0); if (!sendSegmentWithRetry(firstCommand)) { return; } if (traceRegistry != null) { traceRegistry.markSegmentIssued(original.getTaskNo(), traceVersion, firstCommand, "FIRST_SEGMENT_SENT", "输送任务首段下发成功", buildSegmentDetails(firstCommand)); } long lastSeenAt = System.currentTimeMillis(); int segCursor = 0; Integer lastCurrentStationId = null; boolean firstRun = true; double segmentAdvanceRatio = loadSegmentAdvanceRatio(); while (true) { try { Object cancel = redisUtil.get(RedisKeyType.DEVICE_STATION_MOVE_RESET.key + original.getTaskNo()); if (cancel != null) { if (traceRegistry != null) { traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId, buildDetails("reason", "redis_cancel_signal")); } break; } StationProtocol currentStation = findCurrentStationByTask(original.getTaskNo()); if (currentStation == null) { if (System.currentTimeMillis() - lastSeenAt > CURRENT_STATION_TIMEOUT_MS) { if (traceRegistry != null) { traceRegistry.markTimeout(original.getTaskNo(), traceVersion, lastCurrentStationId, buildDetails("timeoutMs", CURRENT_STATION_TIMEOUT_MS)); } break; } Thread.sleep(500L); continue; } lastSeenAt = System.currentTimeMillis(); Integer previousCurrentStationId = lastCurrentStationId; if (traceRegistry != null) { traceRegistry.updateProgress(original.getTaskNo(), traceVersion, currentStation.getStationId(), equalsInteger(previousCurrentStationId, currentStation.getStationId()) ? null : "CURRENT_STATION_CHANGE", "输送任务当前位置已更新", buildDetails("stationId", currentStation.getStationId())); } lastCurrentStationId = currentStation.getStationId(); if (!firstRun && currentStation.isRunBlock()) { if (traceRegistry != null) { traceRegistry.markBlocked(original.getTaskNo(), traceVersion, currentStation.getStationId(), buildDetails("blockedStationId", currentStation.getStationId())); } break; } int currentIndex = effectiveFullPath.indexOf(currentStation.getStationId()); if (currentIndex < 0) { Thread.sleep(500L); firstRun = false; continue; } int remaining = effectiveFullPath.size() - currentIndex - 1; if (remaining <= 0) { if (traceRegistry != null) { traceRegistry.markFinished(original.getTaskNo(), traceVersion, currentStation.getStationId(), buildDetails("targetStationId", original.getTargetStaNo())); } break; } StationCommand currentSegmentCommand = localPlan.getSegmentCommands().get(segCursor); int currentSegEndIndex = safeIndex(currentSegmentCommand.getSegmentEndIndex()); int currentSegStartIndex = safeIndex(currentSegmentCommand.getSegmentStartIndex()); int segLen = Math.max(1, currentSegEndIndex - currentSegStartIndex + 1); int remainingSegment = Math.max(0, currentSegEndIndex - currentIndex); int thresholdSegment = (int) Math.ceil(segLen * segmentAdvanceRatio); if (remainingSegment <= thresholdSegment && segCursor < localPlan.getSegmentCommands().size() - 1) { StationCommand nextCommand = localPlan.getSegmentCommands().get(segCursor + 1); if (sendSegmentWithRetry(nextCommand)) { segCursor++; if (traceRegistry != null) { traceRegistry.markSegmentIssued(original.getTaskNo(), traceVersion, nextCommand, "NEXT_SEGMENT_SENT", "输送任务下一段已提前下发", buildSegmentDetails(nextCommand)); } } } Thread.sleep(500L); firstRun = false; } catch (Exception ignore) { break; } } } private boolean sendSegmentWithRetry(StationCommand command) { while (true) { CommandResponse commandResponse = commandSender.apply(command); if (commandResponse == null) { sleepQuietly(200L); continue; } if (commandResponse.getResult()) { return true; } sleepQuietly(200L); } } private double loadSegmentAdvanceRatio() { try { ConfigService configService = SpringUtils.getBean(ConfigService.class); if (configService == null) { return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO; } Config config = configService.getOne(new QueryWrapper() .eq("code", CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO)); if (config == null || Cools.isEmpty(config.getValue())) { return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO; } return normalizeSegmentAdvanceRatio(config.getValue()); } catch (Exception ignore) { return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO; } } private double normalizeSegmentAdvanceRatio(String valueText) { if (valueText == null) { return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO; } String text = valueText.trim(); if (text.isEmpty()) { return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO; } if (text.endsWith("%")) { text = text.substring(0, text.length() - 1).trim(); } try { double ratio = Double.parseDouble(text); if (ratio > 1d && ratio <= 100d) { ratio = ratio / 100d; } if (ratio < 0d) { return 0d; } if (ratio > 1d) { return 1d; } return ratio; } catch (Exception ignore) { return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO; } } private StationProtocol findCurrentStationByTask(Integer taskNo) { try { com.zy.asrs.service.DeviceConfigService deviceConfigService = SpringUtils.getBean(com.zy.asrs.service.DeviceConfigService.class); if (deviceConfigService == null) { return null; } List devpList = deviceConfigService.list(new QueryWrapper() .eq("device_type", String.valueOf(SlaveType.Devp))); for (DeviceConfig dc : devpList) { com.zy.core.thread.StationThread thread = (com.zy.core.thread.StationThread) SlaveConnection.get(SlaveType.Devp, dc.getDeviceNo()); if (thread == null) { continue; } Map statusMap = thread.getStatusMap(); if (statusMap == null || statusMap.isEmpty()) { continue; } for (StationProtocol protocol : statusMap.values()) { if (protocol.getTaskNo() != null && protocol.getTaskNo().equals(taskNo) && protocol.isLoading()) { return protocol; } } } } catch (Exception ignore) { return null; } return null; } private List buildTraceSegments(List segmentCommands) { List result = new ArrayList<>(); if (segmentCommands == null) { return result; } for (StationCommand command : segmentCommands) { if (command == null) { continue; } StationTaskTraceSegmentVo item = new StationTaskTraceSegmentVo(); item.setSegmentNo(command.getSegmentNo()); item.setSegmentCount(command.getSegmentCount()); item.setStationId(command.getStationId()); item.setTargetStationId(command.getTargetStaNo()); item.setSegmentStartIndex(command.getSegmentStartIndex()); item.setSegmentEndIndex(command.getSegmentEndIndex()); item.setSegmentPath(copyIntegerList(command.getNavigatePath())); item.setIssued(Boolean.FALSE); result.add(item); } return result; } private void bindCommands(List segmentCommands, int traceVersion, int pathOffset) { if (segmentCommands == null) { return; } for (StationCommand command : segmentCommands) { if (command == null) { continue; } command.setTraceVersion(traceVersion); if (command.getSegmentStartIndex() != null) { command.setSegmentStartIndex(command.getSegmentStartIndex() + pathOffset); } if (command.getSegmentEndIndex() != null) { command.setSegmentEndIndex(command.getSegmentEndIndex() + pathOffset); } } } private Map buildSegmentDetails(StationCommand command) { Map details = new LinkedHashMap<>(); if (command != null) { details.put("segmentNo", command.getSegmentNo()); details.put("segmentCount", command.getSegmentCount()); details.put("segmentPath", copyIntegerList(command.getNavigatePath())); details.put("segmentStartIndex", command.getSegmentStartIndex()); details.put("segmentEndIndex", command.getSegmentEndIndex()); details.put("traceVersion", command.getTraceVersion()); } return details; } private Map buildDetails(Object... keyValues) { Map details = new LinkedHashMap<>(); if (keyValues == null) { return details; } for (int i = 0; i + 1 < keyValues.length; i += 2) { Object key = keyValues[i]; if (key != null) { details.put(String.valueOf(key), keyValues[i + 1]); } } return details; } private List copyIntegerList(List source) { List result = new ArrayList<>(); if (source == null) { return result; } result.addAll(source); return result; } private int safeIndex(Integer value) { return value == null ? -1 : value; } private boolean equalsInteger(Integer a, Integer b) { return a != null && a.equals(b); } private void sleepQuietly(long millis) { try { Thread.sleep(millis); } catch (Exception ignore) { } } }