package com.zy.integration.iot.handler.impl;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONException;
|
import com.core.common.Cools;
|
import com.zy.integration.iot.biz.IotInstructionService;
|
import com.zy.integration.iot.handler.IotInboundMessageHandler;
|
import com.zy.integration.iot.publish.IotPublishService;
|
import com.zy.iot.service.IotDbConfigService;
|
import com.zy.iot.entity.IotFeedbackMessage;
|
import com.zy.iot.entity.IotInstructionMessage;
|
import com.zy.iot.entity.IotTopicConfig;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
/**
|
* MQTT 入站分发器。
|
* 按 topic 识别 stow/pick/feedback,并在接单后立即尝试回发 feedback。
|
*/
|
@Slf4j
|
@Service
|
public class IotInboundMessageHandlerImpl implements IotInboundMessageHandler {
|
|
@Autowired
|
private IotDbConfigService iotDbConfigService;
|
@Autowired
|
private IotInstructionService iotInstructionService;
|
@Autowired
|
private IotPublishService iotPublishService;
|
|
/**
|
* MQTT 回调层只做 topic 分发和报文反序列化,业务动作仍下沉到 service。
|
*/
|
@Override
|
public void handleMessage(String topic, String payload) {
|
if (!iotDbConfigService.isMqttEnabled() || Cools.isEmpty(topic) || payload == null) {
|
return;
|
}
|
try {
|
IotTopicConfig topics = iotDbConfigService.getEffectiveTopics();
|
Long recordId = null;
|
if (topic.equals(topics.getEgressStow())) {
|
IotInstructionMessage stow = parseJsonPayload(payload, topic, IotInstructionMessage.class);
|
if (stow == null) {
|
return;
|
}
|
recordId = iotInstructionService.handleStowInstruction(stow, topic, payload);
|
} else if (topic.equals(topics.getEgressPick())) {
|
IotInstructionMessage pick = parseJsonPayload(payload, topic, IotInstructionMessage.class);
|
if (pick == null) {
|
return;
|
}
|
recordId = iotInstructionService.handlePickInstruction(pick, topic, payload);
|
} else if (topic.equals(topics.getEgressFeedback())) {
|
IotFeedbackMessage ack = parseJsonPayload(payload, topic, IotFeedbackMessage.class);
|
if (ack == null) {
|
return;
|
}
|
iotInstructionService.handleFeedbackAck(ack, topic, payload);
|
} else {
|
log.warn("ignore unknown iot topic={}, payload={}", topic, payload);
|
}
|
if (recordId != null) {
|
// feedback 先写本地记录,再立即尝试发送;传输失败由定时任务补偿。
|
iotPublishService.publishRecordNow(recordId);
|
}
|
} catch (Exception e) {
|
log.error("handle iot inbound message failed, topic={}, payload={}", topic, payload, e);
|
}
|
}
|
|
private <T> T parseJsonPayload(String payload, String topic, Class<T> type) {
|
try {
|
return JSON.parseObject(payload, type);
|
} catch (JSONException e) {
|
log.warn("IoT MQTT 非合法 JSON topic={} payload={}", topic, payload, e);
|
return null;
|
}
|
}
|
}
|