package com.zy.core.thread.impl.station;
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.core.common.Cools;
|
import com.core.common.SpringUtils;
|
import com.zy.asrs.domain.vo.StationTaskTraceSegmentVo;
|
import com.zy.asrs.entity.DeviceConfig;
|
import com.zy.common.utils.RedisUtil;
|
import com.zy.core.cache.SlaveConnection;
|
import com.zy.core.enums.RedisKeyType;
|
import com.zy.core.enums.SlaveType;
|
import com.zy.core.enums.StationCommandType;
|
import com.zy.core.model.CommandResponse;
|
import com.zy.core.model.command.StationCommand;
|
import com.zy.core.model.protocol.StationProtocol;
|
import com.zy.core.move.StationMoveCoordinator;
|
import com.zy.core.trace.StationTaskTraceRegistry;
|
import com.zy.system.entity.Config;
|
import com.zy.system.service.ConfigService;
|
|
import java.util.ArrayList;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.function.Function;
|
|
public class StationSegmentExecutor {
|
|
private static final String CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio";
|
private static final double DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO = 0.3d;
|
private static final long CURRENT_STATION_TIMEOUT_MS = 1000L * 60L;
|
|
private final DeviceConfig deviceConfig;
|
private final RedisUtil redisUtil;
|
private final Function<StationCommand, CommandResponse> commandSender;
|
private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner();
|
|
public StationSegmentExecutor(DeviceConfig deviceConfig,
|
RedisUtil redisUtil,
|
Function<StationCommand, CommandResponse> commandSender) {
|
this.deviceConfig = deviceConfig;
|
this.redisUtil = redisUtil;
|
this.commandSender = commandSender;
|
}
|
|
public void execute(StationCommand original) {
|
if (original == null) {
|
return;
|
}
|
if (original.getCommandType() != StationCommandType.MOVE) {
|
commandSender.apply(original);
|
return;
|
}
|
|
StationSegmentExecutionPlan localPlan = segmentPlanner.buildPlan(original);
|
if (localPlan.getSegmentCommands().isEmpty()) {
|
return;
|
}
|
|
StationTaskTraceRegistry traceRegistry = SpringUtils.getBean(StationTaskTraceRegistry.class);
|
StationTaskTraceRegistry.TraceRegistration traceRegistration = traceRegistry == null
|
? new StationTaskTraceRegistry.TraceRegistration()
|
: traceRegistry.registerPlan(original.getTaskNo(), deviceConfig.getThreadImpl(),
|
original.getStationId(), original.getStationId(), original.getTargetStaNo(),
|
localPlan.getFullPathStationIds(), buildTraceSegments(localPlan.getSegmentCommands()));
|
int traceVersion = traceRegistration.getTraceVersion() == null ? 1 : traceRegistration.getTraceVersion();
|
int pathOffset = traceRegistration.getPathOffset() == null ? 0 : traceRegistration.getPathOffset();
|
bindCommands(localPlan.getSegmentCommands(), traceVersion, pathOffset, original.getRouteVersion());
|
List<Integer> effectiveFullPath = traceRegistration.getFullPathStationIds() == null
|
|| traceRegistration.getFullPathStationIds().isEmpty()
|
? copyIntegerList(localPlan.getFullPathStationIds())
|
: copyIntegerList(traceRegistration.getFullPathStationIds());
|
|
StationCommand firstCommand = localPlan.getSegmentCommands().get(0);
|
if (!sendSegmentWithRetry(firstCommand, traceRegistry, traceVersion, null)) {
|
return;
|
}
|
if (traceRegistry != null) {
|
traceRegistry.markSegmentIssued(original.getTaskNo(), traceVersion, firstCommand,
|
"FIRST_SEGMENT_SENT", "输送任务首段下发成功", buildSegmentDetails(firstCommand));
|
}
|
|
long lastSeenAt = System.currentTimeMillis();
|
int segCursor = 0;
|
Integer lastCurrentStationId = null;
|
boolean firstRun = true;
|
while (true) {
|
try {
|
if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) {
|
if (traceRegistry != null) {
|
traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
|
buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion()));
|
}
|
markCancelled(original.getTaskNo(), original.getRouteVersion(), lastCurrentStationId, "route_version_replaced");
|
break;
|
}
|
|
Object cancel = redisUtil.get(RedisKeyType.DEVICE_STATION_MOVE_RESET.key + original.getTaskNo());
|
if (cancel != null) {
|
if (traceRegistry != null) {
|
traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId,
|
buildDetails("reason", "redis_cancel_signal", "routeVersion", original.getRouteVersion()));
|
}
|
markCancelled(original.getTaskNo(), original.getRouteVersion(), lastCurrentStationId, "redis_cancel_signal");
|
break;
|
}
|
|
StationProtocol currentStation = findCurrentStationByTask(original.getTaskNo());
|
if (currentStation == null) {
|
if (System.currentTimeMillis() - lastSeenAt > CURRENT_STATION_TIMEOUT_MS) {
|
if (traceRegistry != null) {
|
traceRegistry.markTimeout(original.getTaskNo(), traceVersion, lastCurrentStationId,
|
buildDetails("timeoutMs", CURRENT_STATION_TIMEOUT_MS, "routeVersion", original.getRouteVersion()));
|
}
|
markTimeout(original.getTaskNo(), original.getRouteVersion(), lastCurrentStationId);
|
break;
|
}
|
Thread.sleep(500L);
|
continue;
|
}
|
|
lastSeenAt = System.currentTimeMillis();
|
Integer previousCurrentStationId = lastCurrentStationId;
|
updateCurrentStation(original.getTaskNo(), original.getRouteVersion(), currentStation.getStationId());
|
if (traceRegistry != null) {
|
traceRegistry.updateProgress(original.getTaskNo(), traceVersion, currentStation.getStationId(),
|
equalsInteger(previousCurrentStationId, currentStation.getStationId()) ? null : "CURRENT_STATION_CHANGE",
|
"输送任务当前位置已更新",
|
buildDetails("stationId", currentStation.getStationId(), "routeVersion", original.getRouteVersion()));
|
}
|
lastCurrentStationId = currentStation.getStationId();
|
if (!firstRun && currentStation.isRunBlock()) {
|
if (traceRegistry != null) {
|
traceRegistry.markBlocked(original.getTaskNo(), traceVersion, currentStation.getStationId(),
|
buildDetails("blockedStationId", currentStation.getStationId(), "routeVersion", original.getRouteVersion()));
|
}
|
markBlocked(original.getTaskNo(), original.getRouteVersion(), currentStation.getStationId());
|
break;
|
}
|
|
int currentIndex = effectiveFullPath.indexOf(currentStation.getStationId());
|
if (currentIndex < 0) {
|
Thread.sleep(500L);
|
firstRun = false;
|
continue;
|
}
|
|
int remaining = effectiveFullPath.size() - currentIndex - 1;
|
if (remaining <= 0) {
|
if (traceRegistry != null) {
|
traceRegistry.markFinished(original.getTaskNo(), traceVersion, currentStation.getStationId(),
|
buildDetails("targetStationId", original.getTargetStaNo(), "routeVersion", original.getRouteVersion()));
|
}
|
markFinished(original.getTaskNo(), original.getRouteVersion(), currentStation.getStationId());
|
break;
|
}
|
|
StationCommand currentSegmentCommand = localPlan.getSegmentCommands().get(segCursor);
|
int currentSegEndIndex = safeIndex(currentSegmentCommand.getSegmentEndIndex());
|
int currentSegStartIndex = segCursor == 0
|
? 0
|
: safeIndex(localPlan.getSegmentCommands().get(segCursor - 1).getSegmentEndIndex());
|
int segLen = currentSegEndIndex - currentSegStartIndex + 1;
|
int remainingSegment = Math.max(0, currentSegEndIndex - currentIndex);
|
int thresholdSegment = (int) Math.ceil(segLen * loadSegmentAdvanceRatio());
|
thresholdSegment = Math.max(1, thresholdSegment);
|
if (remainingSegment <= thresholdSegment
|
&& segCursor < localPlan.getSegmentCommands().size() - 1) {
|
StationCommand nextCommand = localPlan.getSegmentCommands().get(segCursor + 1);
|
if (sendSegmentWithRetry(nextCommand, traceRegistry, traceVersion, currentStation.getStationId())) {
|
segCursor++;
|
if (traceRegistry != null) {
|
traceRegistry.markSegmentIssued(original.getTaskNo(), traceVersion, nextCommand,
|
"NEXT_SEGMENT_SENT", "输送任务下一段已提前下发",
|
buildSegmentDetails(nextCommand));
|
}
|
} else {
|
break;
|
}
|
}
|
Thread.sleep(500L);
|
firstRun = false;
|
} catch (Exception ignore) {
|
break;
|
}
|
}
|
}
|
|
private boolean sendSegmentWithRetry(StationCommand command,
|
StationTaskTraceRegistry traceRegistry,
|
Integer traceVersion,
|
Integer currentStationId) {
|
while (true) {
|
if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) {
|
if (traceRegistry != null && command != null) {
|
traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
|
buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion()));
|
}
|
markCancelled(command == null ? null : command.getTaskNo(),
|
command == null ? null : command.getRouteVersion(),
|
currentStationId,
|
"route_version_replaced");
|
return false;
|
}
|
if (isTaskMoveReset(command == null ? null : command.getTaskNo())) {
|
if (traceRegistry != null && command != null) {
|
traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId,
|
buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion()));
|
}
|
markCancelled(command == null ? null : command.getTaskNo(),
|
command == null ? null : command.getRouteVersion(),
|
currentStationId,
|
"redis_cancel_signal");
|
return false;
|
}
|
CommandResponse commandResponse = commandSender.apply(command);
|
if (commandResponse == null) {
|
sleepQuietly(200L);
|
continue;
|
}
|
if (commandResponse.getResult()) {
|
markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion());
|
return true;
|
}
|
sleepQuietly(200L);
|
}
|
}
|
|
private double loadSegmentAdvanceRatio() {
|
try {
|
ConfigService configService = SpringUtils.getBean(ConfigService.class);
|
if (configService == null) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
Config config = configService.getOne(new QueryWrapper<Config>()
|
.eq("code", CFG_STATION_COMMAND_SEGMENT_ADVANCE_RATIO));
|
if (config == null || Cools.isEmpty(config.getValue())) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
return normalizeSegmentAdvanceRatio(config.getValue());
|
} catch (Exception ignore) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
}
|
|
private double normalizeSegmentAdvanceRatio(String valueText) {
|
if (valueText == null) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
String text = valueText.trim();
|
if (text.isEmpty()) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
if (text.endsWith("%")) {
|
text = text.substring(0, text.length() - 1).trim();
|
}
|
try {
|
double ratio = Double.parseDouble(text);
|
if (ratio > 1d && ratio <= 100d) {
|
ratio = ratio / 100d;
|
}
|
if (ratio < 0d) {
|
return 0d;
|
}
|
if (ratio > 1d) {
|
return 1d;
|
}
|
return ratio;
|
} catch (Exception ignore) {
|
return DEFAULT_STATION_COMMAND_SEGMENT_ADVANCE_RATIO;
|
}
|
}
|
|
private boolean isTaskMoveReset(Integer taskNo) {
|
if (taskNo == null || redisUtil == null) {
|
return false;
|
}
|
Object cancel = redisUtil.get(RedisKeyType.DEVICE_STATION_MOVE_RESET.key + taskNo);
|
return cancel != null;
|
}
|
|
private StationProtocol findCurrentStationByTask(Integer taskNo) {
|
try {
|
com.zy.asrs.service.DeviceConfigService deviceConfigService = SpringUtils.getBean(com.zy.asrs.service.DeviceConfigService.class);
|
if (deviceConfigService == null) {
|
return null;
|
}
|
List<DeviceConfig> devpList = deviceConfigService.list(new QueryWrapper<DeviceConfig>()
|
.eq("device_type", String.valueOf(SlaveType.Devp)));
|
for (DeviceConfig dc : devpList) {
|
com.zy.core.thread.StationThread thread = (com.zy.core.thread.StationThread) SlaveConnection.get(SlaveType.Devp, dc.getDeviceNo());
|
if (thread == null) {
|
continue;
|
}
|
Map<Integer, StationProtocol> statusMap = thread.getStatusMap();
|
if (statusMap == null || statusMap.isEmpty()) {
|
continue;
|
}
|
for (StationProtocol protocol : statusMap.values()) {
|
if (protocol.getTaskNo() != null && protocol.getTaskNo().equals(taskNo) && protocol.isLoading()) {
|
return protocol;
|
}
|
}
|
}
|
} catch (Exception ignore) {
|
return null;
|
}
|
return null;
|
}
|
|
private List<StationTaskTraceSegmentVo> buildTraceSegments(List<StationCommand> segmentCommands) {
|
List<StationTaskTraceSegmentVo> result = new ArrayList<>();
|
if (segmentCommands == null) {
|
return result;
|
}
|
for (StationCommand command : segmentCommands) {
|
if (command == null) {
|
continue;
|
}
|
StationTaskTraceSegmentVo item = new StationTaskTraceSegmentVo();
|
item.setSegmentNo(command.getSegmentNo());
|
item.setSegmentCount(command.getSegmentCount());
|
item.setStationId(command.getStationId());
|
item.setTargetStationId(command.getTargetStaNo());
|
item.setSegmentStartIndex(command.getSegmentStartIndex());
|
item.setSegmentEndIndex(command.getSegmentEndIndex());
|
item.setSegmentPath(copyIntegerList(command.getNavigatePath()));
|
item.setIssued(Boolean.FALSE);
|
result.add(item);
|
}
|
return result;
|
}
|
|
private void bindCommands(List<StationCommand> segmentCommands, int traceVersion, int pathOffset, Integer routeVersion) {
|
if (segmentCommands == null) {
|
return;
|
}
|
for (StationCommand command : segmentCommands) {
|
if (command == null) {
|
continue;
|
}
|
command.setTraceVersion(traceVersion);
|
command.setRouteVersion(routeVersion);
|
if (command.getSegmentStartIndex() != null) {
|
command.setSegmentStartIndex(command.getSegmentStartIndex() + pathOffset);
|
}
|
if (command.getSegmentEndIndex() != null) {
|
command.setSegmentEndIndex(command.getSegmentEndIndex() + pathOffset);
|
}
|
}
|
}
|
|
private Map<String, Object> buildSegmentDetails(StationCommand command) {
|
Map<String, Object> details = new LinkedHashMap<>();
|
if (command != null) {
|
details.put("segmentNo", command.getSegmentNo());
|
details.put("segmentCount", command.getSegmentCount());
|
details.put("segmentPath", copyIntegerList(command.getNavigatePath()));
|
details.put("segmentStartIndex", command.getSegmentStartIndex());
|
details.put("segmentEndIndex", command.getSegmentEndIndex());
|
details.put("traceVersion", command.getTraceVersion());
|
details.put("routeVersion", command.getRouteVersion());
|
}
|
return details;
|
}
|
|
private Map<String, Object> buildDetails(Object... keyValues) {
|
Map<String, Object> details = new LinkedHashMap<>();
|
if (keyValues == null) {
|
return details;
|
}
|
for (int i = 0; i + 1 < keyValues.length; i += 2) {
|
Object key = keyValues[i];
|
if (key != null) {
|
details.put(String.valueOf(key), keyValues[i + 1]);
|
}
|
}
|
return details;
|
}
|
|
private List<Integer> copyIntegerList(List<Integer> source) {
|
List<Integer> result = new ArrayList<>();
|
if (source != null) {
|
result.addAll(source);
|
}
|
return result;
|
}
|
|
private int safeIndex(Integer value) {
|
return value == null ? -1 : value;
|
}
|
|
private boolean equalsInteger(Integer a, Integer b) {
|
return a != null && a.equals(b);
|
}
|
|
private void sleepQuietly(long millis) {
|
try {
|
Thread.sleep(millis);
|
} catch (Exception ignore) {
|
}
|
}
|
|
private boolean isRouteActive(Integer taskNo, Integer routeVersion) {
|
// Legacy direct-enqueue commands (for example FakeProcess/stationInExecute)
|
// do not register a move session and therefore have no routeVersion.
|
// They should keep the historical behavior and execute normally.
|
if (taskNo == null || routeVersion == null) {
|
return true;
|
}
|
StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
|
return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion);
|
}
|
|
private void markSegmentIssued(Integer taskNo, Integer routeVersion) {
|
StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
|
if (moveCoordinator != null) {
|
moveCoordinator.markSegmentIssued(taskNo, routeVersion);
|
}
|
}
|
|
private void updateCurrentStation(Integer taskNo, Integer routeVersion, Integer currentStationId) {
|
StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
|
if (moveCoordinator != null) {
|
moveCoordinator.updateCurrentStation(taskNo, routeVersion, currentStationId);
|
}
|
}
|
|
private void markCancelled(Integer taskNo, Integer routeVersion, Integer currentStationId, String cancelReason) {
|
StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
|
if (moveCoordinator != null) {
|
moveCoordinator.markCancelled(taskNo, routeVersion, currentStationId, cancelReason);
|
}
|
}
|
|
private void markBlocked(Integer taskNo, Integer routeVersion, Integer currentStationId) {
|
StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
|
if (moveCoordinator != null) {
|
moveCoordinator.markBlocked(taskNo, routeVersion, currentStationId);
|
}
|
}
|
|
private void markTimeout(Integer taskNo, Integer routeVersion, Integer currentStationId) {
|
StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
|
if (moveCoordinator != null) {
|
moveCoordinator.markTimeout(taskNo, routeVersion, currentStationId);
|
}
|
}
|
|
private void markFinished(Integer taskNo, Integer routeVersion, Integer currentStationId) {
|
StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class);
|
if (moveCoordinator != null) {
|
moveCoordinator.markFinished(taskNo, routeVersion, currentStationId);
|
}
|
}
|
}
|