package com.zy.core.dispatch;
|
|
import com.alibaba.fastjson.JSON;
|
import com.core.common.Cools;
|
import com.zy.common.utils.RedisUtil;
|
import com.zy.core.News;
|
import com.zy.core.cache.MessageQueue;
|
import com.zy.core.enums.RedisKeyType;
|
import com.zy.core.enums.SlaveType;
|
import com.zy.core.model.Task;
|
import com.zy.core.model.command.StationCommand;
|
import com.zy.core.move.StationMoveCoordinator;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
@Component
|
public class StationCommandDispatcher {
|
|
private static final int STATION_COMMAND_DISPATCH_DEDUP_SECONDS = 10;
|
|
@Autowired(required = false)
|
private RedisUtil redisUtil;
|
@Autowired(required = false)
|
private StationMoveCoordinator stationMoveCoordinator;
|
|
public StationCommandDispatcher() {
|
}
|
|
public StationCommandDispatcher(RedisUtil redisUtil, StationMoveCoordinator stationMoveCoordinator) {
|
this.redisUtil = redisUtil;
|
this.stationMoveCoordinator = stationMoveCoordinator;
|
}
|
|
public StationCommandDispatchResult dispatch(Integer deviceNo,
|
StationCommand command,
|
String source,
|
String scene) {
|
String normalizedSource = Cools.isEmpty(source) ? "unknown" : source;
|
String normalizedScene = Cools.isEmpty(scene) ? "default" : scene;
|
if (deviceNo == null || command == null) {
|
return reject("invalid-argument", 0, normalizedSource, normalizedScene, null);
|
}
|
if (!MessageQueue.hasExchange(SlaveType.Devp, deviceNo)) {
|
return reject("queue-not-initialized", 0, normalizedSource, normalizedScene, command);
|
}
|
|
String dedupKey = buildDedupKey(deviceNo, command);
|
if (!Cools.isEmpty(dedupKey) && redisUtil != null && redisUtil.get(dedupKey) != null) {
|
return reject("dedup-suppressed",
|
MessageQueue.size(SlaveType.Devp, deviceNo),
|
normalizedSource,
|
normalizedScene,
|
command);
|
}
|
if (!Cools.isEmpty(dedupKey) && redisUtil != null) {
|
redisUtil.set(dedupKey, "lock", STATION_COMMAND_DISPATCH_DEDUP_SECONDS);
|
}
|
|
boolean offered = MessageQueue.offer(SlaveType.Devp, deviceNo, new Task(2, command));
|
int queueDepth = MessageQueue.size(SlaveType.Devp, deviceNo);
|
if (!offered) {
|
if (!Cools.isEmpty(dedupKey) && redisUtil != null) {
|
redisUtil.del(dedupKey);
|
}
|
return reject("queue-offer-failed", queueDepth, normalizedSource, normalizedScene, command);
|
}
|
|
News.info("输送站点命令入队成功。source={},scene={},deviceNo={},taskNo={},stationId={},targetStaNo={},commandType={},queueDepth={}",
|
normalizedSource,
|
normalizedScene,
|
deviceNo,
|
command.getTaskNo(),
|
command.getStationId(),
|
command.getTargetStaNo(),
|
command.getCommandType(),
|
queueDepth);
|
return StationCommandDispatchResult.accepted("accepted", queueDepth, normalizedSource, normalizedScene);
|
}
|
|
private StationCommandDispatchResult reject(String reason,
|
int queueDepth,
|
String source,
|
String scene,
|
StationCommand command) {
|
News.warn("输送站点命令入队失败。reason={},source={},scene={},deviceNo?=N/A,taskNo={},stationId={},targetStaNo={},commandType={},queueDepth={}",
|
reason,
|
source,
|
scene,
|
command == null ? null : command.getTaskNo(),
|
command == null ? null : command.getStationId(),
|
command == null ? null : command.getTargetStaNo(),
|
command == null ? null : command.getCommandType(),
|
queueDepth);
|
return StationCommandDispatchResult.rejected(reason, queueDepth, source, scene);
|
}
|
|
private String buildDedupKey(Integer deviceNo, StationCommand command) {
|
if (deviceNo == null || command == null) {
|
return "";
|
}
|
return RedisKeyType.STATION_COMMAND_DISPATCH_DEDUP_.key
|
+ deviceNo + "_"
|
+ command.getTaskNo() + "_"
|
+ command.getStationId() + "_"
|
+ buildCommandSignatureHash(command);
|
}
|
|
private String buildCommandSignatureHash(StationCommand command) {
|
if (command == null) {
|
return "";
|
}
|
if (stationMoveCoordinator != null && command.getCommandType() != null
|
&& command.getCommandType().name().startsWith("MOVE")) {
|
String pathSignatureHash = stationMoveCoordinator.buildPathSignatureHash(command);
|
if (!Cools.isEmpty(pathSignatureHash)) {
|
return pathSignatureHash;
|
}
|
}
|
return Integer.toHexString(JSON.toJSONString(command).hashCode());
|
}
|
}
|