| | |
| | | private final Function<StationCommand, CommandResponse> commandSender; |
| | | private final StationSegmentPlanner segmentPlanner = new StationSegmentPlanner(); |
| | | |
| | | private enum SegmentSendResult { |
| | | DISPATCHED, |
| | | CANCELLED, |
| | | RETRY |
| | | } |
| | | |
| | | public StationSegmentExecutor(DeviceConfig deviceConfig, |
| | | RedisUtil redisUtil, |
| | | Function<StationCommand, CommandResponse> commandSender) { |
| | |
| | | boolean firstRun = true; |
| | | while (true) { |
| | | try { |
| | | if (!isRouteActive(original.getTaskNo(), original.getRouteVersion())) { |
| | | if (!isRouteDispatchable(original.getTaskNo(), original.getRouteVersion())) { |
| | | if (traceRegistry != null) { |
| | | traceRegistry.markCancelled(original.getTaskNo(), traceVersion, lastCurrentStationId, |
| | | buildDetails("reason", "route_version_replaced", "routeVersion", original.getRouteVersion())); |
| | |
| | | Integer traceVersion, |
| | | Integer currentStationId) { |
| | | while (true) { |
| | | if (!isRouteActive(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) { |
| | | if (traceRegistry != null && command != null) { |
| | | traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId, |
| | | buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion())); |
| | | } |
| | | markCancelled(command == null ? null : command.getTaskNo(), |
| | | command == null ? null : command.getRouteVersion(), |
| | | currentStationId, |
| | | "route_version_replaced"); |
| | | SegmentSendResult sendResult = executeLockedSegmentSend(command, traceRegistry, traceVersion, currentStationId); |
| | | if (sendResult == SegmentSendResult.CANCELLED) { |
| | | return false; |
| | | } |
| | | if (isTaskMoveReset(command == null ? null : command.getTaskNo())) { |
| | | if (traceRegistry != null && command != null) { |
| | | traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId, |
| | | buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion())); |
| | | } |
| | | markCancelled(command == null ? null : command.getTaskNo(), |
| | | command == null ? null : command.getRouteVersion(), |
| | | currentStationId, |
| | | "redis_cancel_signal"); |
| | | return false; |
| | | } |
| | | CommandResponse commandResponse = commandSender.apply(command); |
| | | if (commandResponse == null) { |
| | | if (sendResult == SegmentSendResult.RETRY) { |
| | | sleepQuietly(200L); |
| | | continue; |
| | | } |
| | | if (commandResponse.getResult()) { |
| | | markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion()); |
| | | return true; |
| | | } |
| | | sleepQuietly(200L); |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | private SegmentSendResult executeLockedSegmentSend(StationCommand command, |
| | | StationTaskTraceRegistry traceRegistry, |
| | | Integer traceVersion, |
| | | Integer currentStationId) { |
| | | Integer taskNo = command == null ? null : command.getTaskNo(); |
| | | StationMoveCoordinator moveCoordinator = loadMoveCoordinator(); |
| | | if (moveCoordinator != null) { |
| | | // 分段发送的最终检查和实际下发需要与 reroute 共用任务锁。 |
| | | // 这样切路线程一旦进入 CANCEL_PENDING/RESET,旧路线就不能再穿过最后这一步发到设备侧。 |
| | | return moveCoordinator.withTaskDispatchLock(taskNo, |
| | | () -> doSendSegment(command, traceRegistry, traceVersion, currentStationId)); |
| | | } |
| | | return doSendSegment(command, traceRegistry, traceVersion, currentStationId); |
| | | } |
| | | |
| | | private SegmentSendResult doSendSegment(StationCommand command, |
| | | StationTaskTraceRegistry traceRegistry, |
| | | Integer traceVersion, |
| | | Integer currentStationId) { |
| | | if (!isRouteDispatchable(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion())) { |
| | | if (traceRegistry != null && command != null) { |
| | | traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId, |
| | | buildDetails("reason", "route_version_replaced", "routeVersion", command.getRouteVersion())); |
| | | } |
| | | markCancelled(command == null ? null : command.getTaskNo(), |
| | | command == null ? null : command.getRouteVersion(), |
| | | currentStationId, |
| | | "route_version_replaced"); |
| | | return SegmentSendResult.CANCELLED; |
| | | } |
| | | if (isTaskMoveReset(command == null ? null : command.getTaskNo())) { |
| | | if (traceRegistry != null && command != null) { |
| | | traceRegistry.markCancelled(command.getTaskNo(), traceVersion, currentStationId, |
| | | buildDetails("reason", "redis_cancel_signal", "routeVersion", command.getRouteVersion())); |
| | | } |
| | | markCancelled(command == null ? null : command.getTaskNo(), |
| | | command == null ? null : command.getRouteVersion(), |
| | | currentStationId, |
| | | "redis_cancel_signal"); |
| | | return SegmentSendResult.CANCELLED; |
| | | } |
| | | CommandResponse commandResponse = commandSender.apply(command); |
| | | if (commandResponse == null || !Boolean.TRUE.equals(commandResponse.getResult())) { |
| | | return SegmentSendResult.RETRY; |
| | | } |
| | | markSegmentIssued(command == null ? null : command.getTaskNo(), command == null ? null : command.getRouteVersion()); |
| | | return SegmentSendResult.DISPATCHED; |
| | | } |
| | | |
| | | private double loadSegmentAdvanceRatio() { |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean isRouteActive(Integer taskNo, Integer routeVersion) { |
| | | private boolean isRouteDispatchable(Integer taskNo, Integer routeVersion) { |
| | | // Legacy direct-enqueue commands (for example FakeProcess/stationInExecute) |
| | | // do not register a move session and therefore have no routeVersion. |
| | | // They should keep the historical behavior and execute normally. |
| | | if (taskNo == null || routeVersion == null) { |
| | | return true; |
| | | } |
| | | StationMoveCoordinator moveCoordinator = SpringUtils.getBean(StationMoveCoordinator.class); |
| | | return moveCoordinator == null || moveCoordinator.isActiveRoute(taskNo, routeVersion); |
| | | StationMoveCoordinator moveCoordinator = loadMoveCoordinator(); |
| | | return moveCoordinator == null || moveCoordinator.canDispatchRoute(taskNo, routeVersion); |
| | | } |
| | | |
| | | private StationMoveCoordinator loadMoveCoordinator() { |
| | | return SpringUtils.getBean(StationMoveCoordinator.class); |
| | | } |
| | | |
| | | private void markSegmentIssued(Integer taskNo, Integer routeVersion) { |