package com.zy.core.dispatch; import com.alibaba.fastjson.JSON; import com.core.common.Cools; import com.zy.common.utils.RedisUtil; import com.zy.core.News; import com.zy.core.cache.MessageQueue; import com.zy.core.enums.RedisKeyType; import com.zy.core.enums.SlaveType; import com.zy.core.model.Task; import com.zy.core.model.command.StationCommand; import com.zy.core.move.StationMoveCoordinator; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class StationCommandDispatcher { private static final int STATION_COMMAND_DISPATCH_DEDUP_SECONDS = 10; @Autowired(required = false) private RedisUtil redisUtil; @Autowired(required = false) private StationMoveCoordinator stationMoveCoordinator; public StationCommandDispatcher() { } public StationCommandDispatcher(RedisUtil redisUtil, StationMoveCoordinator stationMoveCoordinator) { this.redisUtil = redisUtil; this.stationMoveCoordinator = stationMoveCoordinator; } public StationCommandDispatchResult dispatch(Integer deviceNo, StationCommand command, String source, String scene) { String normalizedSource = Cools.isEmpty(source) ? "unknown" : source; String normalizedScene = Cools.isEmpty(scene) ? "default" : scene; if (deviceNo == null || command == null) { return reject("invalid-argument", 0, normalizedSource, normalizedScene, null); } if (!MessageQueue.hasExchange(SlaveType.Devp, deviceNo)) { return reject("queue-not-initialized", 0, normalizedSource, normalizedScene, command); } String dedupKey = buildDedupKey(deviceNo, command); if (!Cools.isEmpty(dedupKey) && redisUtil != null && redisUtil.get(dedupKey) != null) { return reject("dedup-suppressed", MessageQueue.size(SlaveType.Devp, deviceNo), normalizedSource, normalizedScene, command); } if (!Cools.isEmpty(dedupKey) && redisUtil != null) { redisUtil.set(dedupKey, "lock", STATION_COMMAND_DISPATCH_DEDUP_SECONDS); } boolean offered = MessageQueue.offer(SlaveType.Devp, deviceNo, new Task(2, command)); int queueDepth = MessageQueue.size(SlaveType.Devp, deviceNo); if (!offered) { if (!Cools.isEmpty(dedupKey) && redisUtil != null) { redisUtil.del(dedupKey); } return reject("queue-offer-failed", queueDepth, normalizedSource, normalizedScene, command); } News.info("输送站点命令入队成功。source={},scene={},deviceNo={},taskNo={},stationId={},targetStaNo={},commandType={},queueDepth={}", normalizedSource, normalizedScene, deviceNo, command.getTaskNo(), command.getStationId(), command.getTargetStaNo(), command.getCommandType(), queueDepth); return StationCommandDispatchResult.accepted("accepted", queueDepth, normalizedSource, normalizedScene); } private StationCommandDispatchResult reject(String reason, int queueDepth, String source, String scene, StationCommand command) { News.warn("输送站点命令入队失败。reason={},source={},scene={},deviceNo?=N/A,taskNo={},stationId={},targetStaNo={},commandType={},queueDepth={}", reason, source, scene, command == null ? null : command.getTaskNo(), command == null ? null : command.getStationId(), command == null ? null : command.getTargetStaNo(), command == null ? null : command.getCommandType(), queueDepth); return StationCommandDispatchResult.rejected(reason, queueDepth, source, scene); } private String buildDedupKey(Integer deviceNo, StationCommand command) { if (deviceNo == null || command == null) { return ""; } return RedisKeyType.STATION_COMMAND_DISPATCH_DEDUP_.key + deviceNo + "_" + command.getTaskNo() + "_" + command.getStationId() + "_" + buildCommandSignatureHash(command); } private String buildCommandSignatureHash(StationCommand command) { if (command == null) { return ""; } if (stationMoveCoordinator != null && command.getCommandType() != null && command.getCommandType().name().startsWith("MOVE")) { String pathSignatureHash = stationMoveCoordinator.buildPathSignatureHash(command); if (!Cools.isEmpty(pathSignatureHash)) { return pathSignatureHash; } } return Integer.toHexString(JSON.toJSONString(command).hashCode()); } }