#
vincentlu
2026-03-24 568ad48a58cda90612c340fcd65811af9349345b
#
4个文件已添加
3个文件已修改
381 ■■■■■ 已修改文件
zy-acs-hk/zy-acs-hk-latent/pom.xml 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/config/MqttProperties.java 119 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/controller/MqttDemoController.java 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/demo/MqttDemoRunner.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/EmqxMqttClient.java 149 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/resources/application.yml 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/target/classes/application.yml 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/pom.xml
@@ -37,6 +37,12 @@
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
    </dependencies>
    <build>
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/config/MqttProperties.java
New file
@@ -0,0 +1,119 @@
package com.zy.acs.hk.latent.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    private boolean enabled = true;
    private String brokerUrl = "tcp://127.0.0.1:1883";
    private String clientId;
    private String username;
    private String password;
    private boolean cleanSession = true;
    private boolean automaticReconnect = true;
    private int connectionTimeout = 10;
    private int keepAliveInterval = 20;
    private int qos = 1;
    private String demoTopic = "rcs/hk/latent/demo";
    public boolean isEnabled() {
        return enabled;
    }
    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }
    public String getBrokerUrl() {
        return brokerUrl;
    }
    public void setBrokerUrl(String brokerUrl) {
        this.brokerUrl = brokerUrl;
    }
    public String getClientId() {
        return clientId;
    }
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public boolean isCleanSession() {
        return cleanSession;
    }
    public void setCleanSession(boolean cleanSession) {
        this.cleanSession = cleanSession;
    }
    public boolean isAutomaticReconnect() {
        return automaticReconnect;
    }
    public void setAutomaticReconnect(boolean automaticReconnect) {
        this.automaticReconnect = automaticReconnect;
    }
    public int getConnectionTimeout() {
        return connectionTimeout;
    }
    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }
    public int getKeepAliveInterval() {
        return keepAliveInterval;
    }
    public void setKeepAliveInterval(int keepAliveInterval) {
        this.keepAliveInterval = keepAliveInterval;
    }
    public int getQos() {
        return qos;
    }
    public void setQos(int qos) {
        this.qos = qos;
    }
    public String getDemoTopic() {
        return demoTopic;
    }
    public void setDemoTopic(String demoTopic) {
        this.demoTopic = demoTopic;
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/controller/MqttDemoController.java
New file
@@ -0,0 +1,39 @@
package com.zy.acs.hk.latent.controller;
import com.zy.acs.hk.latent.config.MqttProperties;
import com.zy.acs.hk.latent.mqtt.EmqxMqttClient;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.LinkedHashMap;
import java.util.Map;
@RestController
@RequestMapping("/demo/mqtt")
public class MqttDemoController {
    private final EmqxMqttClient emqxMqttClient;
    private final MqttProperties mqttProperties;
    public MqttDemoController(EmqxMqttClient emqxMqttClient, MqttProperties mqttProperties) {
        this.emqxMqttClient = emqxMqttClient;
        this.mqttProperties = mqttProperties;
    }
    @PostMapping("/publish")
    public Map<String, Object> publish(@RequestParam(required = false) String topic,
                                       @RequestParam String message) {
        String targetTopic = StringUtils.hasText(topic) ? topic : mqttProperties.getDemoTopic();
        emqxMqttClient.publish(targetTopic, message);
        Map<String, Object> result = new LinkedHashMap<>();
        result.put("success", true);
        result.put("topic", targetTopic);
        result.put("message", message);
        return result;
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/demo/MqttDemoRunner.java
New file
@@ -0,0 +1,36 @@
package com.zy.acs.hk.latent.demo;
import com.zy.acs.hk.latent.config.MqttProperties;
import com.zy.acs.hk.latent.mqtt.EmqxMqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class MqttDemoRunner implements CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(MqttDemoRunner.class);
    private final EmqxMqttClient emqxMqttClient;
    private final MqttProperties mqttProperties;
    public MqttDemoRunner(EmqxMqttClient emqxMqttClient, MqttProperties mqttProperties) {
        this.emqxMqttClient = emqxMqttClient;
        this.mqttProperties = mqttProperties;
    }
    @Override
    public void run(String... args) {
        if (!mqttProperties.isEnabled()) {
            return;
        }
        String payload = "demo message from latent at " + LocalDateTime.now();
        emqxMqttClient.publish(mqttProperties.getDemoTopic(), payload);
        log.info("mqtt demo published to topic={}", mqttProperties.getDemoTopic());
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/EmqxMqttClient.java
New file
@@ -0,0 +1,149 @@
package com.zy.acs.hk.latent.mqtt;
import com.zy.acs.hk.latent.config.MqttProperties;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
public class EmqxMqttClient {
    private static final Logger log = LoggerFactory.getLogger(EmqxMqttClient.class);
    private final MqttProperties mqttProperties;
    private MqttClient mqttClient;
    public EmqxMqttClient(MqttProperties mqttProperties) {
        this.mqttProperties = mqttProperties;
    }
    @PostConstruct
    public void connect() {
        if (!mqttProperties.isEnabled()) {
            log.info("mqtt disabled, skip emqx connect");
            return;
        }
        try {
            String clientId = mqttProperties.getClientId();
            if (!StringUtils.hasText(clientId)) {
                clientId = "zy-acs-hk-latent-" + UUID.randomUUID();
            }
            mqttClient = new MqttClient(mqttProperties.getBrokerUrl(), clientId, new MemoryPersistence());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    log.info("mqtt connected, reconnect={}, serverURI={}", reconnect, serverURI);
                    subscribeDemoTopic();
                }
                @Override
                public void connectionLost(Throwable cause) {
                    log.error("mqtt connection lost", cause);
                }
                @Override
                public void messageArrived(String topic, MqttMessage message) {
                    String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
                    log.info("mqtt message arrived, topic={}, qos={}, payload={}", topic, message.getQos(), payload);
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    log.info("mqtt delivery complete, messageId={}", token.getMessageId());
                }
            });
            MqttConnectOptions options = buildConnectOptions();
            mqttClient.connect(options);
            subscribeDemoTopic();
        } catch (MqttException e) {
            throw new IllegalStateException("connect emqx failed", e);
        }
    }
    public void publish(String topic, String payload) {
        if (!StringUtils.hasText(topic)) {
            throw new IllegalArgumentException("topic can not be blank");
        }
        if (!isConnected()) {
            throw new IllegalStateException("mqtt client is not connected");
        }
        try {
            MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
            mqttMessage.setQos(mqttProperties.getQos());
            mqttClient.publish(topic, mqttMessage);
            log.info("mqtt message published, topic={}, payload={}", topic, payload);
        } catch (MqttException e) {
            throw new IllegalStateException("publish mqtt message failed", e);
        }
    }
    public boolean isConnected() {
        return mqttClient != null && mqttClient.isConnected();
    }
    private MqttConnectOptions buildConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
        options.setCleanSession(mqttProperties.isCleanSession());
        options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        if (StringUtils.hasText(mqttProperties.getUsername())) {
            options.setUserName(mqttProperties.getUsername());
        }
        if (StringUtils.hasText(mqttProperties.getPassword())) {
            options.setPassword(mqttProperties.getPassword().toCharArray());
        }
        return options;
    }
    private void subscribeDemoTopic() {
        if (!isConnected()) {
            return;
        }
        String topic = mqttProperties.getDemoTopic();
        if (!StringUtils.hasText(topic)) {
            return;
        }
        try {
            mqttClient.subscribe(topic, mqttProperties.getQos());
            log.info("mqtt subscribed demo topic={}", topic);
        } catch (MqttException e) {
            throw new IllegalStateException("subscribe demo topic failed", e);
        }
    }
    @PreDestroy
    public void destroy() {
        if (mqttClient == null) {
            return;
        }
        try {
            if (mqttClient.isConnected()) {
                mqttClient.disconnect();
            }
            mqttClient.close();
        } catch (MqttException e) {
            log.warn("close mqtt client failed", e);
        }
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/resources/application.yml
@@ -1,3 +1,19 @@
server:
  port: 8096
spring:
  application:
    name: zy-acs-hk-latent
mqtt:
  enabled: true
  broker-url: tcp://127.0.0.1:1883
  client-id: ${spring.application.name}-${random.uuid}
  username: rcs
  password: xltys1995
  clean-session: true
  automatic-reconnect: true
  connection-timeout: 10
  keep-alive-interval: 20
  qos: 1
  demo-topic: rcs/hk/latent/demo
zy-acs-hk/zy-acs-hk-latent/target/classes/application.yml
@@ -1,3 +1,19 @@
server:
  port: 8096
spring:
  application:
    name: zy-acs-hk-latent
mqtt:
  enabled: true
  broker-url: tcp://127.0.0.1:1883
  client-id: ${spring.application.name}-${random.uuid}
  username: rcs
  password: xltys1995
  clean-session: true
  automatic-reconnect: true
  connection-timeout: 10
  keep-alive-interval: 20
  qos: 1
  demo-topic: rcs/hk/latent/demo