package com.zy.integration.iot.handler.impl; import com.alibaba.fastjson.JSON; import com.core.common.Cools; import com.zy.integration.iot.biz.IotInstructionService; import com.zy.integration.iot.handler.IotInboundMessageHandler; import com.zy.integration.iot.publish.IotPublishService; import com.zy.iot.config.IotProperties; import com.zy.iot.entity.IotFeedbackMessage; import com.zy.iot.entity.IotInstructionMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * MQTT 入站分发器。 * 按 topic 识别 stow/pick/feedback,并在接单后立即尝试回发 feedback。 */ @Slf4j @Service public class IotInboundMessageHandlerImpl implements IotInboundMessageHandler { @Autowired private IotProperties iotProperties; @Autowired private IotInstructionService iotInstructionService; @Autowired private IotPublishService iotPublishService; /** * MQTT 回调层只做 topic 分发和报文反序列化,业务动作仍下沉到 service。 */ @Override public void handleMessage(String topic, String payload) { if (!iotProperties.isEnabled() || Cools.isEmpty(topic) || payload == null) { return; } try { Long recordId = null; if (topic.equals(iotProperties.getTopics().getEgressStow())) { recordId = iotInstructionService.handleStowInstruction(JSON.parseObject(payload, IotInstructionMessage.class), topic, payload); } else if (topic.equals(iotProperties.getTopics().getEgressPick())) { recordId = iotInstructionService.handlePickInstruction(JSON.parseObject(payload, IotInstructionMessage.class), topic, payload); } else if (topic.equals(iotProperties.getTopics().getEgressFeedback())) { iotInstructionService.handleFeedbackAck(JSON.parseObject(payload, IotFeedbackMessage.class), topic, payload); } else { log.warn("ignore unknown iot topic={}, payload={}", topic, payload); } if (recordId != null) { // feedback 先写本地记录,再立即尝试发送;传输失败由定时任务补偿。 iotPublishService.publishRecordNow(recordId); } } catch (Exception e) { log.error("handle iot inbound message failed, topic={}, payload={}", topic, payload, e); } } }