package com.zy.core.thread.impl.v5;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.core.common.Cools;
|
import com.core.common.SpringUtils;
|
import com.zy.asrs.domain.vo.StationTaskTraceSegmentVo;
|
import com.zy.asrs.entity.DeviceConfig;
|
import com.zy.common.utils.RedisUtil;
|
import com.zy.core.cache.SlaveConnection;
|
import com.zy.core.enums.RedisKeyType;
|
import com.zy.core.enums.SlaveType;
|
import com.zy.core.enums.StationCommandType;
|
import com.zy.core.model.CommandResponse;
|
import com.zy.core.model.command.StationCommand;
|
import com.zy.core.model.protocol.StationProtocol;
|
import com.zy.core.trace.StationTaskTraceRegistry;
|
import com.zy.system.entity.Config;
|
import com.zy.system.service.ConfigService;
|
|
import java.util.ArrayList;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.function.Function;
|
|
public class StationV5SegmentExecutor {
|
|
private static final String CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio";
|
private static final double DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = 0.3d;
|
private static final long CURRENT_STATION_TIMEOUT_MS = 1000L * 60L;
|
|
private final DeviceConfig deviceConfig;
|
private final RedisUtil redisUtil;
|
private final Function<StationCommand, CommandResponse> commandSender;
|
private final StationV5SegmentPlanner segmentPlanner = new StationV5SegmentPlanner();
|
|
public StationV5SegmentExecutor(DeviceConfig deviceConfig,
|
RedisUtil redisUtil,
|
Function<StationCommand, CommandResponse> commandSender) {
|
this.deviceConfig = deviceConfig;
|
this.redisUtil = redisUtil;
|
this.commandSender = commandSender;
|
}
|
|
public void execute(StationCommand original) {
|
if (original == null) {
|
return;
|
}
|
if (original.getCommandType() != StationCommandType.MOVE) {
|
commandSender.apply(original);
|
return;
|
}
|
|
StationV5SegmentExecutionPlan localPlan = segmentPlanner.buildPlan(original);
|
if (localPlan.getSegmentCommands().isEmpty()) {
|
return;
|
}
|
|
StationTaskTraceRegistry traceRegistry = SpringUtils.getBean(StationTaskTraceRegistry.class);
|
StationTaskTraceRegistry.TraceRegistration traceRegistration = traceRegistry == null
|
? new StationTaskTraceRegistry.TraceRegistration()
|
: traceRegistry.registerPlan(original.getTaskNo(), deviceConfig.getThreadImpl(),
|
original.getStationId(), original.getStationId(), original.getTargetStaNo(),
|
localPlan.getFullPathStationIds(), buildTraceSegments(localPlan.getSegmentCommands()));
|
int traceVersion = traceRegistration.getTraceVersion() == null ? 1 : traceRegistration.getTraceVersion();
|
int pathOffset = traceRegistration.getPathOffset() == null ? 0 : traceRegistration.getPathOffset();
|
bindCommands(localPlan.getSegmentCommands(), traceVersion, pathOffset);
|
List<Integer> effectiveFullPath = traceRegistration.getFullPathStationIds() == null
|
|| traceRegistration.getFullPathStationIds().isEmpty()
|
? copyIntegerList(localPlan.getFullPathStationIds())
|
: copyIntegerList(traceRegistration.getFullPathStationIds());
|
|
StationCommand firstCommand = localPlan.getSegmentCommands().get(0);
|
if (!sendSegmentWithRetry(firstCommand)) {
|
return;
|
}
|
if (traceRegistry != null) {
|
traceRegistry.markSegmentIssued(original.getTaskNo(), traceVersion, firstCommand,
|
"FIRST_SEGMENT_SENT", "输送任务首段下发成功", buildSegmentDetails(firstCommand));
|
}
|
|
long lastSeenAt = System.currentTimeMillis();
|
int segCursor = 0;
|
Integer lastCurrentStationId = null;
|
boolean firstRun = true;
|
double segmentAdvanceRatio = loadSegmentAdvanceRatio();
|
while (true) {
|
try {
|
Object cancel = redisUtil.get(RedisKeyType.DEVICE_STATION_MOVE_RESET.key + original.getTaskNo());
|
if (cancel != null) {
|
if (traceRegistry != null) {
|
traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
|
buildDetails("reason", "redis_cancel_signal"));
|
}
|
break;
|
}
|
|
StationProtocol currentStation = findCurrentStationByTask(original.getTaskNo());
|
if (currentStation == null) {
|
if (System.currentTimeMillis() - lastSeenAt > CURRENT_STATION_TIMEOUT_MS) {
|
if (traceRegistry != null) {
|
traceRegistry.markTimeout(original.getTaskNo(), traceVersion, lastCurrentStationId,
|
buildDetails("timeoutMs", CURRENT_STATION_TIMEOUT_MS));
|
}
|
break;
|
}
|
Thread.sleep(500L);
|
continue;
|
}
|
|
lastSeenAt = System.currentTimeMillis();
|
Integer previousCurrentStationId = lastCurrentStationId;
|
if (traceRegistry != null) {
|
traceRegistry.updateProgress(original.getTaskNo(), traceVersion, currentStation.getStationId(),
|
equalsInteger(previousCurrentStationId, currentStation.getStationId()) ? null : "CURRENT_STATION_CHANGE",
|
"输送任务当前位置已更新",
|
buildDetails("stationId", currentStation.getStationId()));
|
}
|
lastCurrentStationId = currentStation.getStationId();
|
if (!firstRun && currentStation.isRunBlock()) {
|
if (traceRegistry != null) {
|
traceRegistry.markBlocked(original.getTaskNo(), traceVersion, currentStation.getStationId(),
|
buildDetails("blockedStationId", currentStation.getStationId()));
|
}
|
break;
|
}
|
|
int currentIndex = effectiveFullPath.indexOf(currentStation.getStationId());
|
if (currentIndex < 0) {
|
Thread.sleep(500L);
|
firstRun = false;
|
continue;
|
}
|
|
int remaining = effectiveFullPath.size() - currentIndex - 1;
|
if (remaining <= 0) {
|
if (traceRegistry != null) {
|
traceRegistry.markFinished(original.getTaskNo(), traceVersion, currentStation.getStationId(),
|
buildDetails("targetStationId", original.getTargetStaNo()));
|
}
|
break;
|
}
|
|
StationCommand currentSegmentCommand = localPlan.getSegmentCommands().get(segCursor);
|
int currentSegEndIndex = safeIndex(currentSegmentCommand.getSegmentEndIndex());
|
int currentSegStartIndex = safeIndex(currentSegmentCommand.getSegmentStartIndex());
|
int segLen = Math.max(1, currentSegEndIndex - currentSegStartIndex + 1);
|
int remainingSegment = Math.max(0, currentSegEndIndex - currentIndex);
|
int thresholdSegment = (int) Math.ceil(segLen * segmentAdvanceRatio);
|
if (remainingSegment <= thresholdSegment && segCursor < localPlan.getSegmentCommands().size() - 1) {
|
StationCommand nextCommand = localPlan.getSegmentCommands().get(segCursor + 1);
|
if (sendSegmentWithRetry(nextCommand)) {
|
segCursor++;
|
if (traceRegistry != null) {
|
traceRegistry.markSegmentIssued(original.getTaskNo(), traceVersion, nextCommand,
|
"NEXT_SEGMENT_SENT", "输送任务下一段已提前下发",
|
buildSegmentDetails(nextCommand));
|
}
|
}
|
}
|
Thread.sleep(500L);
|
firstRun = false;
|
} catch (Exception ignore) {
|
break;
|
}
|
}
|
}
|
|
private boolean sendSegmentWithRetry(StationCommand command) {
|
while (true) {
|
CommandResponse commandResponse = commandSender.apply(command);
|
if (commandResponse == null) {
|
sleepQuietly(200L);
|
continue;
|
}
|
if (commandResponse.getResult()) {
|
return true;
|
}
|
sleepQuietly(200L);
|
}
|
}
|
|
private double loadSegmentAdvanceRatio() {
|
try {
|
ConfigService configService = SpringUtils.getBean(ConfigService.class);
|
if (configService == null) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
Config config = configService.getOne(new QueryWrapper<Config>()
|
.eq("code", CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO));
|
if (config == null || Cools.isEmpty(config.getValue())) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
return normalizeSegmentAdvanceRatio(config.getValue());
|
} catch (Exception ignore) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
}
|
|
private double normalizeSegmentAdvanceRatio(String valueText) {
|
if (valueText == null) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
String text = valueText.trim();
|
if (text.isEmpty()) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
if (text.endsWith("%")) {
|
text = text.substring(0, text.length() - 1).trim();
|
}
|
try {
|
double ratio = Double.parseDouble(text);
|
if (ratio > 1d && ratio <= 100d) {
|
ratio = ratio / 100d;
|
}
|
if (ratio < 0d) {
|
return 0d;
|
}
|
if (ratio > 1d) {
|
return 1d;
|
}
|
return ratio;
|
} catch (Exception ignore) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
}
|
|
private StationProtocol findCurrentStationByTask(Integer taskNo) {
|
try {
|
com.zy.asrs.service.DeviceConfigService deviceConfigService = SpringUtils.getBean(com.zy.asrs.service.DeviceConfigService.class);
|
if (deviceConfigService == null) {
|
return null;
|
}
|
List<DeviceConfig> devpList = deviceConfigService.list(new QueryWrapper<DeviceConfig>()
|
.eq("device_type", String.valueOf(SlaveType.Devp)));
|
for (DeviceConfig dc : devpList) {
|
com.zy.core.thread.StationThread thread = (com.zy.core.thread.StationThread) SlaveConnection.get(SlaveType.Devp, dc.getDeviceNo());
|
if (thread == null) {
|
continue;
|
}
|
Map<Integer, StationProtocol> statusMap = thread.getStatusMap();
|
if (statusMap == null || statusMap.isEmpty()) {
|
continue;
|
}
|
for (StationProtocol protocol : statusMap.values()) {
|
if (protocol.getTaskNo() != null && protocol.getTaskNo().equals(taskNo) && protocol.isLoading()) {
|
return protocol;
|
}
|
}
|
}
|
} catch (Exception ignore) {
|
return null;
|
}
|
return null;
|
}
|
|
private List<StationTaskTraceSegmentVo> buildTraceSegments(List<StationCommand> segmentCommands) {
|
List<StationTaskTraceSegmentVo> result = new ArrayList<>();
|
if (segmentCommands == null) {
|
return result;
|
}
|
for (StationCommand command : segmentCommands) {
|
if (command == null) {
|
continue;
|
}
|
StationTaskTraceSegmentVo item = new StationTaskTraceSegmentVo();
|
item.setSegmentNo(command.getSegmentNo());
|
item.setSegmentCount(command.getSegmentCount());
|
item.setStationId(command.getStationId());
|
item.setTargetStationId(command.getTargetStaNo());
|
item.setSegmentStartIndex(command.getSegmentStartIndex());
|
item.setSegmentEndIndex(command.getSegmentEndIndex());
|
item.setSegmentPath(copyIntegerList(command.getNavigatePath()));
|
item.setIssued(Boolean.FALSE);
|
result.add(item);
|
}
|
return result;
|
}
|
|
private void bindCommands(List<StationCommand> segmentCommands, int traceVersion, int pathOffset) {
|
if (segmentCommands == null) {
|
return;
|
}
|
for (StationCommand command : segmentCommands) {
|
if (command == null) {
|
continue;
|
}
|
command.setTraceVersion(traceVersion);
|
if (command.getSegmentStartIndex() != null) {
|
command.setSegmentStartIndex(command.getSegmentStartIndex() + pathOffset);
|
}
|
if (command.getSegmentEndIndex() != null) {
|
command.setSegmentEndIndex(command.getSegmentEndIndex() + pathOffset);
|
}
|
}
|
}
|
|
private Map<String, Object> buildSegmentDetails(StationCommand command) {
|
Map<String, Object> details = new LinkedHashMap<>();
|
if (command != null) {
|
details.put("segmentNo", command.getSegmentNo());
|
details.put("segmentCount", command.getSegmentCount());
|
details.put("segmentPath", copyIntegerList(command.getNavigatePath()));
|
details.put("segmentStartIndex", command.getSegmentStartIndex());
|
details.put("segmentEndIndex", command.getSegmentEndIndex());
|
details.put("traceVersion", command.getTraceVersion());
|
}
|
return details;
|
}
|
|
private Map<String, Object> buildDetails(Object... keyValues) {
|
Map<String, Object> details = new LinkedHashMap<>();
|
if (keyValues == null) {
|
return details;
|
}
|
for (int i = 0; i + 1 < keyValues.length; i += 2) {
|
Object key = keyValues[i];
|
if (key != null) {
|
details.put(String.valueOf(key), keyValues[i + 1]);
|
}
|
}
|
return details;
|
}
|
|
private List<Integer> copyIntegerList(List<Integer> source) {
|
List<Integer> result = new ArrayList<>();
|
if (source == null) {
|
return result;
|
}
|
result.addAll(source);
|
return result;
|
}
|
|
private int safeIndex(Integer value) {
|
return value == null ? -1 : value;
|
}
|
|
private boolean equalsInteger(Integer a, Integer b) {
|
return a != null && a.equals(b);
|
}
|
|
private void sleepQuietly(long millis) {
|
try {
|
Thread.sleep(millis);
|
} catch (Exception ignore) {
|
}
|
}
|
}
|