自动化立体仓库 - WMS系统
cl
3 天以前 79eb9321e21e873a373c10480f8066cbc0407eb8
src/main/java/com/zy/integration/iot/client/impl/PahoIotMqttClient.java
@@ -6,6 +6,7 @@
import com.zy.integration.iot.util.IotMqttSslUtils;
import com.zy.iot.config.IotProperties;
import com.zy.iot.entity.IotTopicConfig;
import com.zy.iot.service.IotDbConfigService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
@@ -33,13 +34,15 @@
    @Autowired
    private IotProperties iotProperties;
    @Autowired
    private IotDbConfigService iotDbConfigService;
    @Autowired
    private IotInboundMessageHandler inboundMessageHandler;
    private volatile MqttClient mqttClient;
    @PostConstruct
    public void init() {
        if (!iotProperties.isEnabled()) {
        if (!iotDbConfigService.isMqttEnabled()) {
            return;
        }
        try {
@@ -68,8 +71,33 @@
     * 懒加载建连,首次发送或启动时自动连接并补订阅主题。
     */
    @Override
    public synchronized void reconnectFromDbConfig() {
        iotDbConfigService.refreshCache();
        try {
            if (mqttClient != null) {
                try {
                    if (mqttClient.isConnected()) {
                        mqttClient.disconnect();
                    }
                    mqttClient.close();
                } catch (Exception e) {
                    log.warn("IoT MQTT disconnect before reconnect", e);
                }
                mqttClient = null;
            }
            if (!iotDbConfigService.isMqttEnabled()) {
                log.info("IoT MQTT 已由配置关闭,未建立连接");
                return;
            }
            ensureConnected();
        } catch (Exception e) {
            log.error("IoT MQTT 重连失败", e);
        }
    }
    @Override
    public synchronized void ensureConnected() throws Exception {
        if (!iotProperties.isEnabled()) {
        if (!iotDbConfigService.isMqttEnabled()) {
            return;
        }
        if (!hasRequiredConfig()) {
@@ -84,6 +112,8 @@
        }
        mqttClient.connect(buildOptions());
        subscribeTopics();
        log.info("IoT MQTT 已连接 serverURI={} clientId={} tls={}",
                iotProperties.getServerUri(), iotProperties.getResolvedClientId(), iotProperties.isTlsEnabled());
    }
    /**
@@ -91,7 +121,7 @@
     */
    @Override
    public void publish(String topic, String payload) throws Exception {
        if (!iotProperties.isEnabled() || Cools.isEmpty(topic) || payload == null) {
        if (!iotDbConfigService.isMqttEnabled() || Cools.isEmpty(topic) || payload == null) {
            return;
        }
        ensureConnected();
@@ -101,6 +131,7 @@
        MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
        mqttMessage.setQos(1);
        mqttMessage.setRetained(false);
        log.info("IoT MQTT 发送 topic={} payload={}", topic, payload);
        mqttClient.publish(topic, mqttMessage);
    }
@@ -135,7 +166,9 @@
            @Override
            public void messageArrived(String topic, MqttMessage message) {
                inboundMessageHandler.handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8));
                String body = new String(message.getPayload(), StandardCharsets.UTF_8);
                log.info("IoT MQTT 接收 topic={} payload={}", topic, body);
                inboundMessageHandler.handleMessage(topic, body);
            }
            @Override
@@ -146,7 +179,10 @@
            public void connectComplete(boolean reconnect, String serverURI) {
                try {
                    subscribeTopics();
                    log.info("iot mqtt connected, reconnect={}, serverURI={}", reconnect, serverURI);
                    if (reconnect) {
                        log.info("IoT MQTT 自动重连成功 serverURI={} clientId={}",
                                serverURI, iotProperties.getResolvedClientId());
                    }
                } catch (Exception e) {
                    log.error("subscribe iot topics failed", e);
                }
@@ -178,7 +214,7 @@
        if (mqttClient == null || !mqttClient.isConnected()) {
            return;
        }
        IotTopicConfig topics = iotProperties.getTopics();
        IotTopicConfig topics = iotDbConfigService.getEffectiveTopics();
        subscribe(topics.getEgressStow());
        subscribe(topics.getEgressPick());
        subscribe(topics.getEgressFeedback());