| zy-acs-hk/zy-acs-hk-latent/pom.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/config/MqttProperties.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/controller/MqttDemoController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/demo/MqttDemoRunner.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/EmqxMqttClient.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hk/zy-acs-hk-latent/src/main/resources/application.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hk/zy-acs-hk-latent/target/classes/application.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
zy-acs-hk/zy-acs-hk-latent/pom.xml
@@ -37,6 +37,12 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> </dependencies> <build> zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/config/MqttProperties.java
New file @@ -0,0 +1,119 @@ package com.zy.acs.hk.latent.config; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private boolean enabled = true; private String brokerUrl = "tcp://127.0.0.1:1883"; private String clientId; private String username; private String password; private boolean cleanSession = true; private boolean automaticReconnect = true; private int connectionTimeout = 10; private int keepAliveInterval = 20; private int qos = 1; private String demoTopic = "rcs/hk/latent/demo"; public boolean isEnabled() { return enabled; } public void setEnabled(boolean enabled) { this.enabled = enabled; } public String getBrokerUrl() { return brokerUrl; } public void setBrokerUrl(String brokerUrl) { this.brokerUrl = brokerUrl; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public boolean isCleanSession() { return cleanSession; } public void setCleanSession(boolean cleanSession) { this.cleanSession = cleanSession; } public boolean isAutomaticReconnect() { return automaticReconnect; } public void setAutomaticReconnect(boolean automaticReconnect) { this.automaticReconnect = automaticReconnect; } public int getConnectionTimeout() { return connectionTimeout; } public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } public int getKeepAliveInterval() { return keepAliveInterval; } public void setKeepAliveInterval(int keepAliveInterval) { this.keepAliveInterval = keepAliveInterval; } public int getQos() { return qos; } public void setQos(int qos) { this.qos = qos; } public String getDemoTopic() { return demoTopic; } public void setDemoTopic(String demoTopic) { this.demoTopic = demoTopic; } } zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/controller/MqttDemoController.java
New file @@ -0,0 +1,39 @@ package com.zy.acs.hk.latent.controller; import com.zy.acs.hk.latent.config.MqttProperties; import com.zy.acs.hk.latent.mqtt.EmqxMqttClient; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.LinkedHashMap; import java.util.Map; @RestController @RequestMapping("/demo/mqtt") public class MqttDemoController { private final EmqxMqttClient emqxMqttClient; private final MqttProperties mqttProperties; public MqttDemoController(EmqxMqttClient emqxMqttClient, MqttProperties mqttProperties) { this.emqxMqttClient = emqxMqttClient; this.mqttProperties = mqttProperties; } @PostMapping("/publish") public Map<String, Object> publish(@RequestParam(required = false) String topic, @RequestParam String message) { String targetTopic = StringUtils.hasText(topic) ? topic : mqttProperties.getDemoTopic(); emqxMqttClient.publish(targetTopic, message); Map<String, Object> result = new LinkedHashMap<>(); result.put("success", true); result.put("topic", targetTopic); result.put("message", message); return result; } } zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/demo/MqttDemoRunner.java
New file @@ -0,0 +1,36 @@ package com.zy.acs.hk.latent.demo; import com.zy.acs.hk.latent.config.MqttProperties; import com.zy.acs.hk.latent.mqtt.EmqxMqttClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @Component public class MqttDemoRunner implements CommandLineRunner { private static final Logger log = LoggerFactory.getLogger(MqttDemoRunner.class); private final EmqxMqttClient emqxMqttClient; private final MqttProperties mqttProperties; public MqttDemoRunner(EmqxMqttClient emqxMqttClient, MqttProperties mqttProperties) { this.emqxMqttClient = emqxMqttClient; this.mqttProperties = mqttProperties; } @Override public void run(String... args) { if (!mqttProperties.isEnabled()) { return; } String payload = "demo message from latent at " + LocalDateTime.now(); emqxMqttClient.publish(mqttProperties.getDemoTopic(), payload); log.info("mqtt demo published to topic={}", mqttProperties.getDemoTopic()); } } zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/EmqxMqttClient.java
New file @@ -0,0 +1,149 @@ package com.zy.acs.hk.latent.mqtt; import com.zy.acs.hk.latent.config.MqttProperties; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.nio.charset.StandardCharsets; import java.util.UUID; @Component public class EmqxMqttClient { private static final Logger log = LoggerFactory.getLogger(EmqxMqttClient.class); private final MqttProperties mqttProperties; private MqttClient mqttClient; public EmqxMqttClient(MqttProperties mqttProperties) { this.mqttProperties = mqttProperties; } @PostConstruct public void connect() { if (!mqttProperties.isEnabled()) { log.info("mqtt disabled, skip emqx connect"); return; } try { String clientId = mqttProperties.getClientId(); if (!StringUtils.hasText(clientId)) { clientId = "zy-acs-hk-latent-" + UUID.randomUUID(); } mqttClient = new MqttClient(mqttProperties.getBrokerUrl(), clientId, new MemoryPersistence()); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("mqtt connected, reconnect={}, serverURI={}", reconnect, serverURI); subscribeDemoTopic(); } @Override public void connectionLost(Throwable cause) { log.error("mqtt connection lost", cause); } @Override public void messageArrived(String topic, MqttMessage message) { String payload = new String(message.getPayload(), StandardCharsets.UTF_8); log.info("mqtt message arrived, topic={}, qos={}, payload={}", topic, message.getQos(), payload); } @Override public void deliveryComplete(IMqttDeliveryToken token) { log.info("mqtt delivery complete, messageId={}", token.getMessageId()); } }); MqttConnectOptions options = buildConnectOptions(); mqttClient.connect(options); subscribeDemoTopic(); } catch (MqttException e) { throw new IllegalStateException("connect emqx failed", e); } } public void publish(String topic, String payload) { if (!StringUtils.hasText(topic)) { throw new IllegalArgumentException("topic can not be blank"); } if (!isConnected()) { throw new IllegalStateException("mqtt client is not connected"); } try { MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); mqttMessage.setQos(mqttProperties.getQos()); mqttClient.publish(topic, mqttMessage); log.info("mqtt message published, topic={}, payload={}", topic, payload); } catch (MqttException e) { throw new IllegalStateException("publish mqtt message failed", e); } } public boolean isConnected() { return mqttClient != null && mqttClient.isConnected(); } private MqttConnectOptions buildConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect()); options.setCleanSession(mqttProperties.isCleanSession()); options.setConnectionTimeout(mqttProperties.getConnectionTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); if (StringUtils.hasText(mqttProperties.getUsername())) { options.setUserName(mqttProperties.getUsername()); } if (StringUtils.hasText(mqttProperties.getPassword())) { options.setPassword(mqttProperties.getPassword().toCharArray()); } return options; } private void subscribeDemoTopic() { if (!isConnected()) { return; } String topic = mqttProperties.getDemoTopic(); if (!StringUtils.hasText(topic)) { return; } try { mqttClient.subscribe(topic, mqttProperties.getQos()); log.info("mqtt subscribed demo topic={}", topic); } catch (MqttException e) { throw new IllegalStateException("subscribe demo topic failed", e); } } @PreDestroy public void destroy() { if (mqttClient == null) { return; } try { if (mqttClient.isConnected()) { mqttClient.disconnect(); } mqttClient.close(); } catch (MqttException e) { log.warn("close mqtt client failed", e); } } } zy-acs-hk/zy-acs-hk-latent/src/main/resources/application.yml
@@ -1,3 +1,19 @@ server: port: 8096 spring: application: name: zy-acs-hk-latent mqtt: enabled: true broker-url: tcp://127.0.0.1:1883 client-id: ${spring.application.name}-${random.uuid} username: rcs password: xltys1995 clean-session: true automatic-reconnect: true connection-timeout: 10 keep-alive-interval: 20 qos: 1 demo-topic: rcs/hk/latent/demo zy-acs-hk/zy-acs-hk-latent/target/classes/application.yml
@@ -1,3 +1,19 @@ server: port: 8096 spring: application: name: zy-acs-hk-latent mqtt: enabled: true broker-url: tcp://127.0.0.1:1883 client-id: ${spring.application.name}-${random.uuid} username: rcs password: xltys1995 clean-session: true automatic-reconnect: true connection-timeout: 10 keep-alive-interval: 20 qos: 1 demo-topic: rcs/hk/latent/demo