| | |
| | | package com.zy.integration.iot.handler.impl; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONException; |
| | | import com.core.common.Cools; |
| | | import com.zy.integration.iot.biz.IotInstructionService; |
| | | import com.zy.integration.iot.handler.IotInboundMessageHandler; |
| | | import com.zy.integration.iot.publish.IotPublishService; |
| | | import com.zy.iot.config.IotProperties; |
| | | import com.zy.iot.service.IotDbConfigService; |
| | | import com.zy.iot.entity.IotFeedbackMessage; |
| | | import com.zy.iot.entity.IotInstructionMessage; |
| | | import com.zy.iot.entity.IotTopicConfig; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | |
| | | public class IotInboundMessageHandlerImpl implements IotInboundMessageHandler { |
| | | |
| | | @Autowired |
| | | private IotProperties iotProperties; |
| | | private IotDbConfigService iotDbConfigService; |
| | | @Autowired |
| | | private IotInstructionService iotInstructionService; |
| | | @Autowired |
| | |
| | | */ |
| | | @Override |
| | | public void handleMessage(String topic, String payload) { |
| | | if (!iotProperties.isEnabled() || Cools.isEmpty(topic) || payload == null) { |
| | | if (!iotDbConfigService.isMqttEnabled() || Cools.isEmpty(topic) || payload == null) { |
| | | return; |
| | | } |
| | | try { |
| | | IotTopicConfig topics = iotDbConfigService.getEffectiveTopics(); |
| | | Long recordId = null; |
| | | if (topic.equals(iotProperties.getTopics().getEgressStow())) { |
| | | recordId = iotInstructionService.handleStowInstruction(JSON.parseObject(payload, IotInstructionMessage.class), topic, payload); |
| | | } else if (topic.equals(iotProperties.getTopics().getEgressPick())) { |
| | | recordId = iotInstructionService.handlePickInstruction(JSON.parseObject(payload, IotInstructionMessage.class), topic, payload); |
| | | } else if (topic.equals(iotProperties.getTopics().getEgressFeedback())) { |
| | | iotInstructionService.handleFeedbackAck(JSON.parseObject(payload, IotFeedbackMessage.class), topic, payload); |
| | | if (topic.equals(topics.getEgressStow())) { |
| | | IotInstructionMessage stow = parseJsonPayload(payload, topic, IotInstructionMessage.class); |
| | | if (stow == null) { |
| | | return; |
| | | } |
| | | recordId = iotInstructionService.handleStowInstruction(stow, topic, payload); |
| | | } else if (topic.equals(topics.getEgressPick())) { |
| | | IotInstructionMessage pick = parseJsonPayload(payload, topic, IotInstructionMessage.class); |
| | | if (pick == null) { |
| | | return; |
| | | } |
| | | recordId = iotInstructionService.handlePickInstruction(pick, topic, payload); |
| | | } else if (topic.equals(topics.getEgressFeedback())) { |
| | | IotFeedbackMessage ack = parseJsonPayload(payload, topic, IotFeedbackMessage.class); |
| | | if (ack == null) { |
| | | return; |
| | | } |
| | | iotInstructionService.handleFeedbackAck(ack, topic, payload); |
| | | } else { |
| | | log.warn("ignore unknown iot topic={}, payload={}", topic, payload); |
| | | } |
| | |
| | | log.error("handle iot inbound message failed, topic={}, payload={}", topic, payload, e); |
| | | } |
| | | } |
| | | |
| | | private <T> T parseJsonPayload(String payload, String topic, Class<T> type) { |
| | | try { |
| | | return JSON.parseObject(payload, type); |
| | | } catch (JSONException e) { |
| | | log.warn("IoT MQTT 非合法 JSON topic={} payload={}", topic, payload, e); |
| | | return null; |
| | | } |
| | | } |
| | | } |