| | |
| | | import com.zy.integration.iot.util.IotMqttSslUtils; |
| | | import com.zy.iot.config.IotProperties; |
| | | import com.zy.iot.entity.IotTopicConfig; |
| | | import com.zy.iot.service.IotDbConfigService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
| | | import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; |
| | |
| | | @Autowired |
| | | private IotProperties iotProperties; |
| | | @Autowired |
| | | private IotDbConfigService iotDbConfigService; |
| | | @Autowired |
| | | private IotInboundMessageHandler inboundMessageHandler; |
| | | |
| | | private volatile MqttClient mqttClient; |
| | | |
| | | @PostConstruct |
| | | public void init() { |
| | | if (!iotProperties.isEnabled()) { |
| | | if (!iotDbConfigService.isMqttEnabled()) { |
| | | return; |
| | | } |
| | | try { |
| | |
| | | * 懒加载建连,首次发送或启动时自动连接并补订阅主题。 |
| | | */ |
| | | @Override |
| | | public synchronized void reconnectFromDbConfig() { |
| | | iotDbConfigService.refreshCache(); |
| | | try { |
| | | if (mqttClient != null) { |
| | | try { |
| | | if (mqttClient.isConnected()) { |
| | | mqttClient.disconnect(); |
| | | } |
| | | mqttClient.close(); |
| | | } catch (Exception e) { |
| | | log.warn("IoT MQTT disconnect before reconnect", e); |
| | | } |
| | | mqttClient = null; |
| | | } |
| | | if (!iotDbConfigService.isMqttEnabled()) { |
| | | log.info("IoT MQTT 已由配置关闭,未建立连接"); |
| | | return; |
| | | } |
| | | ensureConnected(); |
| | | } catch (Exception e) { |
| | | log.error("IoT MQTT 重连失败", e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public synchronized void ensureConnected() throws Exception { |
| | | if (!iotProperties.isEnabled()) { |
| | | if (!iotDbConfigService.isMqttEnabled()) { |
| | | return; |
| | | } |
| | | if (!hasRequiredConfig()) { |
| | |
| | | } |
| | | mqttClient.connect(buildOptions()); |
| | | subscribeTopics(); |
| | | log.info("IoT MQTT 已连接 serverURI={} clientId={} tls={}", |
| | | iotProperties.getServerUri(), iotProperties.getResolvedClientId(), iotProperties.isTlsEnabled()); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | @Override |
| | | public void publish(String topic, String payload) throws Exception { |
| | | if (!iotProperties.isEnabled() || Cools.isEmpty(topic) || payload == null) { |
| | | if (!iotDbConfigService.isMqttEnabled() || Cools.isEmpty(topic) || payload == null) { |
| | | return; |
| | | } |
| | | ensureConnected(); |
| | |
| | | MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); |
| | | mqttMessage.setQos(1); |
| | | mqttMessage.setRetained(false); |
| | | log.info("IoT MQTT 发送 topic={} payload={}", topic, payload); |
| | | mqttClient.publish(topic, mqttMessage); |
| | | } |
| | | |
| | |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage message) { |
| | | inboundMessageHandler.handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8)); |
| | | String body = new String(message.getPayload(), StandardCharsets.UTF_8); |
| | | log.info("IoT MQTT 接收 topic={} payload={}", topic, body); |
| | | inboundMessageHandler.handleMessage(topic, body); |
| | | } |
| | | |
| | | @Override |
| | |
| | | public void connectComplete(boolean reconnect, String serverURI) { |
| | | try { |
| | | subscribeTopics(); |
| | | log.info("iot mqtt connected, reconnect={}, serverURI={}", reconnect, serverURI); |
| | | if (reconnect) { |
| | | log.info("IoT MQTT 自动重连成功 serverURI={} clientId={}", |
| | | serverURI, iotProperties.getResolvedClientId()); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("subscribe iot topics failed", e); |
| | | } |
| | |
| | | if (mqttClient == null || !mqttClient.isConnected()) { |
| | | return; |
| | | } |
| | | IotTopicConfig topics = iotProperties.getTopics(); |
| | | IotTopicConfig topics = iotDbConfigService.getEffectiveTopics(); |
| | | subscribe(topics.getEgressStow()); |
| | | subscribe(topics.getEgressPick()); |
| | | subscribe(topics.getEgressFeedback()); |