From 1a0bdd8df58435ec37e9d8345e67cd092902b5e4 Mon Sep 17 00:00:00 2001
From: cl <1442464845@qq.com>
Date: 星期六, 04 四月 2026 00:27:18 +0800
Subject: [PATCH] 序号控制
---
src/main/java/com/zy/integration/iot/client/impl/PahoIotMqttClient.java | 48 ++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 42 insertions(+), 6 deletions(-)
diff --git a/src/main/java/com/zy/integration/iot/client/impl/PahoIotMqttClient.java b/src/main/java/com/zy/integration/iot/client/impl/PahoIotMqttClient.java
index 231666b..21b7f8d 100644
--- a/src/main/java/com/zy/integration/iot/client/impl/PahoIotMqttClient.java
+++ b/src/main/java/com/zy/integration/iot/client/impl/PahoIotMqttClient.java
@@ -6,6 +6,7 @@
import com.zy.integration.iot.util.IotMqttSslUtils;
import com.zy.iot.config.IotProperties;
import com.zy.iot.entity.IotTopicConfig;
+import com.zy.iot.service.IotDbConfigService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
@@ -33,13 +34,15 @@
@Autowired
private IotProperties iotProperties;
@Autowired
+ private IotDbConfigService iotDbConfigService;
+ @Autowired
private IotInboundMessageHandler inboundMessageHandler;
private volatile MqttClient mqttClient;
@PostConstruct
public void init() {
- if (!iotProperties.isEnabled()) {
+ if (!iotDbConfigService.isMqttEnabled()) {
return;
}
try {
@@ -68,8 +71,33 @@
* 鎳掑姞杞藉缓杩烇紝棣栨鍙戦�佹垨鍚姩鏃惰嚜鍔ㄨ繛鎺ュ苟琛ヨ闃呬富棰樸��
*/
@Override
+ public synchronized void reconnectFromDbConfig() {
+ iotDbConfigService.refreshCache();
+ try {
+ if (mqttClient != null) {
+ try {
+ if (mqttClient.isConnected()) {
+ mqttClient.disconnect();
+ }
+ mqttClient.close();
+ } catch (Exception e) {
+ log.warn("IoT MQTT disconnect before reconnect", e);
+ }
+ mqttClient = null;
+ }
+ if (!iotDbConfigService.isMqttEnabled()) {
+ log.info("IoT MQTT 宸茬敱閰嶇疆鍏抽棴锛屾湭寤虹珛杩炴帴");
+ return;
+ }
+ ensureConnected();
+ } catch (Exception e) {
+ log.error("IoT MQTT 閲嶈繛澶辫触", e);
+ }
+ }
+
+ @Override
public synchronized void ensureConnected() throws Exception {
- if (!iotProperties.isEnabled()) {
+ if (!iotDbConfigService.isMqttEnabled()) {
return;
}
if (!hasRequiredConfig()) {
@@ -84,6 +112,8 @@
}
mqttClient.connect(buildOptions());
subscribeTopics();
+ log.info("IoT MQTT 宸茶繛鎺� serverURI={} clientId={} tls={}",
+ iotProperties.getServerUri(), iotProperties.getResolvedClientId(), iotProperties.isTlsEnabled());
}
/**
@@ -91,7 +121,7 @@
*/
@Override
public void publish(String topic, String payload) throws Exception {
- if (!iotProperties.isEnabled() || Cools.isEmpty(topic) || payload == null) {
+ if (!iotDbConfigService.isMqttEnabled() || Cools.isEmpty(topic) || payload == null) {
return;
}
ensureConnected();
@@ -101,6 +131,7 @@
MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
mqttMessage.setQos(1);
mqttMessage.setRetained(false);
+ log.info("IoT MQTT 鍙戦�� topic={} payload={}", topic, payload);
mqttClient.publish(topic, mqttMessage);
}
@@ -135,7 +166,9 @@
@Override
public void messageArrived(String topic, MqttMessage message) {
- inboundMessageHandler.handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8));
+ String body = new String(message.getPayload(), StandardCharsets.UTF_8);
+ log.info("IoT MQTT 鎺ユ敹 topic={} payload={}", topic, body);
+ inboundMessageHandler.handleMessage(topic, body);
}
@Override
@@ -146,7 +179,10 @@
public void connectComplete(boolean reconnect, String serverURI) {
try {
subscribeTopics();
- log.info("iot mqtt connected, reconnect={}, serverURI={}", reconnect, serverURI);
+ if (reconnect) {
+ log.info("IoT MQTT 鑷姩閲嶈繛鎴愬姛 serverURI={} clientId={}",
+ serverURI, iotProperties.getResolvedClientId());
+ }
} catch (Exception e) {
log.error("subscribe iot topics failed", e);
}
@@ -178,7 +214,7 @@
if (mqttClient == null || !mqttClient.isConnected()) {
return;
}
- IotTopicConfig topics = iotProperties.getTopics();
+ IotTopicConfig topics = iotDbConfigService.getEffectiveTopics();
subscribe(topics.getEgressStow());
subscribe(topics.getEgressPick());
subscribe(topics.getEgressFeedback());
--
Gitblit v1.9.1