package com.zy.integration.iot.handler.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONException; import com.core.common.Cools; import com.zy.integration.iot.biz.IotInstructionService; import com.zy.integration.iot.handler.IotInboundMessageHandler; import com.zy.integration.iot.publish.IotPublishService; import com.zy.iot.service.IotDbConfigService; import com.zy.iot.entity.IotFeedbackMessage; import com.zy.iot.entity.IotInstructionMessage; import com.zy.iot.entity.IotTopicConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * MQTT 入站分发器。 * 按 topic 识别 stow/pick/feedback,并在接单后立即尝试回发 feedback。 */ @Slf4j @Service public class IotInboundMessageHandlerImpl implements IotInboundMessageHandler { @Autowired private IotDbConfigService iotDbConfigService; @Autowired private IotInstructionService iotInstructionService; @Autowired private IotPublishService iotPublishService; /** * MQTT 回调层只做 topic 分发和报文反序列化,业务动作仍下沉到 service。 */ @Override public void handleMessage(String topic, String payload) { if (!iotDbConfigService.isMqttEnabled() || Cools.isEmpty(topic) || payload == null) { return; } try { IotTopicConfig topics = iotDbConfigService.getEffectiveTopics(); Long recordId = null; if (topic.equals(topics.getEgressStow())) { IotInstructionMessage stow = parseJsonPayload(payload, topic, IotInstructionMessage.class); if (stow == null) { return; } recordId = iotInstructionService.handleStowInstruction(stow, topic, payload); } else if (topic.equals(topics.getEgressPick())) { IotInstructionMessage pick = parseJsonPayload(payload, topic, IotInstructionMessage.class); if (pick == null) { return; } recordId = iotInstructionService.handlePickInstruction(pick, topic, payload); } else if (topic.equals(topics.getEgressFeedback())) { IotFeedbackMessage ack = parseJsonPayload(payload, topic, IotFeedbackMessage.class); if (ack == null) { return; } iotInstructionService.handleFeedbackAck(ack, topic, payload); } else { log.warn("ignore unknown iot topic={}, payload={}", topic, payload); } if (recordId != null) { // feedback 先写本地记录,再立即尝试发送;传输失败由定时任务补偿。 iotPublishService.publishRecordNow(recordId); } } catch (Exception e) { log.error("handle iot inbound message failed, topic={}, payload={}", topic, payload, e); } } private T parseJsonPayload(String payload, String topic, Class type) { try { return JSON.parseObject(payload, type); } catch (JSONException e) { log.warn("IoT MQTT 非合法 JSON topic={} payload={}", topic, payload, e); return null; } } }