| | |
| | | @ConfigurationProperties(prefix = "mqtt") |
| | | public class MqttProperties { |
| | | |
| | | private boolean enabled; |
| | | private boolean enabled = true; |
| | | |
| | | private String brokerUrl; |
| | | private String brokerUrl = "tcp://127.0.0.1:1883"; |
| | | |
| | | private String clientId; |
| | | private String clientId = "zy-acs-hk-latent-${random.uuid}"; |
| | | |
| | | private String username; |
| | | private String username = "rcs"; |
| | | |
| | | private String password; |
| | | private String password = "xltys1995"; |
| | | |
| | | private boolean cleanSession; |
| | | private boolean cleanSession = true; |
| | | |
| | | private boolean automaticReconnect; |
| | | private boolean automaticReconnect = true; |
| | | |
| | | private int connectionTimeout; |
| | | private int connectionTimeout = 10; |
| | | |
| | | private int keepAliveInterval; |
| | | private int keepAliveInterval = 20; |
| | | |
| | | private int qos; |
| | | private int qos = 1; |
| | | |
| | | private String demoTopic = "rcs/hk/latent/demo"; |
| | | private String interfaceName = "VDA"; |
| | | |
| | | private String majorVersion = "V2.0.0"; |
| | | |
| | | private String manufacturer = "HikRobot"; |
| | | |
| | | } |
| New file |
| | |
| | | package com.zy.acs.hk.latent.controller; |
| | | |
| | | import com.zy.acs.hk.latent.mqtt.EmqxMqttClient; |
| | | import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType; |
| | | import com.zy.acs.hk.latent.mqtt.HkTopicBuilder; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.util.StringUtils; |
| | | import org.springframework.web.bind.annotation.PostMapping; |
| | | import org.springframework.web.bind.annotation.RequestMapping; |
| | | import org.springframework.web.bind.annotation.RequestParam; |
| | | import org.springframework.web.bind.annotation.RestController; |
| | | |
| | | import java.util.LinkedHashMap; |
| | | import java.util.Map; |
| | | |
| | | @RestController |
| | | @RequestMapping("/demo/mqtt") |
| | | public class HkMqttDemoController { |
| | | |
| | | @Autowired |
| | | private EmqxMqttClient emqxMqttClient; |
| | | |
| | | @Autowired |
| | | private HkTopicBuilder hkTopicBuilder; |
| | | |
| | | @PostMapping("/publish") |
| | | public Map<String, Object> publish(@RequestParam String agvNo, |
| | | @RequestParam String subTopic, |
| | | @RequestParam String message) { |
| | | HkSubTopicType subTopicEnum = HkSubTopicType.fromCode(subTopic); |
| | | if (subTopicEnum == null) { |
| | | throw new IllegalArgumentException("unsupported subTopic: " + subTopic); |
| | | } |
| | | if (!StringUtils.hasText(agvNo)) { |
| | | throw new IllegalArgumentException("agvNo can not be blank"); |
| | | } |
| | | |
| | | String topic = hkTopicBuilder.buildTopic(agvNo, subTopicEnum.getCode()); |
| | | emqxMqttClient.publish(topic, message); |
| | | |
| | | Map<String, Object> result = new LinkedHashMap<>(); |
| | | result.put("success", true); |
| | | result.put("topic", topic); |
| | | result.put("agvNo", agvNo); |
| | | result.put("subTopic", subTopicEnum.getCode()); |
| | | result.put("message", message); |
| | | return result; |
| | | } |
| | | } |
| | |
| | | @Autowired |
| | | private MqttProperties mqttProperties; |
| | | |
| | | @Autowired |
| | | private HkMessageDispatcher hkMessageDispatcher; |
| | | |
| | | @Autowired |
| | | private HkTopicBuilder hkTopicBuilder; |
| | | |
| | | @PostConstruct |
| | | public void connect() { |
| | | if (!mqttProperties.isEnabled()) { |
| | |
| | | @Override |
| | | public void connectComplete(boolean reconnect, String serverURI) { |
| | | log.info("mqtt connected, reconnect={}, serverURI={}", reconnect, serverURI); |
| | | subscribeDemoTopic(); |
| | | subscribeTopics(); |
| | | } |
| | | |
| | | @Override |
| | |
| | | public void messageArrived(String topic, MqttMessage message) { |
| | | String payload = new String(message.getPayload(), StandardCharsets.UTF_8); |
| | | log.info("mqtt message arrived, topic={}, qos={}, payload={}", topic, message.getQos(), payload); |
| | | hkMessageDispatcher.dispatch(topic, payload); |
| | | } |
| | | |
| | | @Override |
| | |
| | | |
| | | MqttConnectOptions options = buildConnectOptions(); |
| | | mqttClient.connect(options); |
| | | subscribeDemoTopic(); |
| | | } catch (MqttException e) { |
| | | throw new IllegalStateException("connect emqx failed", e); |
| | | } |
| | | } |
| | | |
| | | private MqttConnectOptions buildConnectOptions() { |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | | options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect()); |
| | | options.setCleanSession(mqttProperties.isCleanSession()); |
| | | options.setConnectionTimeout(mqttProperties.getConnectionTimeout()); |
| | | options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); |
| | | |
| | | if (StringUtils.hasText(mqttProperties.getUsername())) { |
| | | options.setUserName(mqttProperties.getUsername()); |
| | | } |
| | | if (StringUtils.hasText(mqttProperties.getPassword())) { |
| | | options.setPassword(mqttProperties.getPassword().toCharArray()); |
| | | } |
| | | return options; |
| | | } |
| | | |
| | | public boolean isConnected() { |
| | | return mqttClient != null && mqttClient.isConnected(); |
| | | } |
| | | |
| | | // ------------------------------------------------------------------------ |
| | | |
| | | public void publish(String topic, String payload) { |
| | | if (!StringUtils.hasText(topic)) { |
| | |
| | | } |
| | | } |
| | | |
| | | public boolean isConnected() { |
| | | return mqttClient != null && mqttClient.isConnected(); |
| | | } |
| | | |
| | | private MqttConnectOptions buildConnectOptions() { |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | | options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect()); |
| | | options.setCleanSession(mqttProperties.isCleanSession()); |
| | | options.setConnectionTimeout(mqttProperties.getConnectionTimeout()); |
| | | options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); |
| | | |
| | | if (StringUtils.hasText(mqttProperties.getUsername())) { |
| | | options.setUserName(mqttProperties.getUsername()); |
| | | } |
| | | if (StringUtils.hasText(mqttProperties.getPassword())) { |
| | | options.setPassword(mqttProperties.getPassword().toCharArray()); |
| | | } |
| | | return options; |
| | | } |
| | | |
| | | private void subscribeDemoTopic() { |
| | | private void subscribeTopics() { |
| | | if (!isConnected()) { |
| | | return; |
| | | } |
| | | String topic = mqttProperties.getDemoTopic(); |
| | | if (!StringUtils.hasText(topic)) { |
| | | return; |
| | | } |
| | | |
| | | try { |
| | | mqttClient.subscribe(topic, mqttProperties.getQos()); |
| | | log.info("mqtt subscribed demo topic={}", topic); |
| | | } catch (MqttException e) { |
| | | throw new IllegalStateException("subscribe demo topic failed", e); |
| | | for (String topicPattern : hkTopicBuilder.buildSubscribeTopics()) { |
| | | if (!StringUtils.hasText(topicPattern)) { |
| | | continue; |
| | | } |
| | | try { |
| | | mqttClient.subscribe(topicPattern, mqttProperties.getQos()); |
| | | log.info("mqtt subscribed topic={}", topicPattern); |
| | | } catch (MqttException e) { |
| | | throw new IllegalStateException("subscribe topic failed: " + topicPattern, e); |
| | | } |
| | | } |
| | | } |
| | | |
| New file |
| | |
| | | package com.zy.acs.hk.latent.mqtt; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Locale; |
| | | import java.util.Map; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class HkMessageDispatcher { |
| | | |
| | | private final Map<String, HkMessageHandler> handlerMap = new HashMap<>(); |
| | | |
| | | @Autowired |
| | | private List<HkMessageHandler> hkMessageHandlers; |
| | | |
| | | @PostConstruct |
| | | public void init() { |
| | | for (HkMessageHandler hkMessageHandler : hkMessageHandlers) { |
| | | handlerMap.put(hkMessageHandler.supportSubTopic().toLowerCase(Locale.ROOT), hkMessageHandler); |
| | | } |
| | | } |
| | | |
| | | public void dispatch(String topic, String payload) { |
| | | HkTopicInfo topicInfo = this.parseTopic(topic); |
| | | if (topicInfo == null) { |
| | | log.warn("ignore mqtt topic, unsupported topic format, topic={}", topic); |
| | | return; |
| | | } |
| | | |
| | | if (!StringUtils.hasText(topicInfo.getAgvNo())) { |
| | | log.warn("ignore mqtt topic, agvNo is blank, topic={}", topic); |
| | | return; |
| | | } |
| | | |
| | | HkMessageHandler hkMessageHandler = handlerMap.get(topicInfo.getSubTopic().toLowerCase(Locale.ROOT)); |
| | | if (hkMessageHandler == null) { |
| | | log.warn("ignore mqtt topic, unsupported subTopic={}, topic={}", topicInfo.getSubTopic(), topic); |
| | | return; |
| | | } |
| | | |
| | | hkMessageHandler.handle(topicInfo, payload); |
| | | } |
| | | |
| | | |
| | | public HkTopicInfo parseTopic(String topic) { |
| | | if (!StringUtils.hasText(topic)) { |
| | | return null; |
| | | } |
| | | |
| | | String[] parts = topic.split("/"); |
| | | if (parts.length != 5) { |
| | | return null; |
| | | } |
| | | |
| | | HkTopicInfo topicInfo = new HkTopicInfo(); |
| | | topicInfo.setRawTopic(topic); |
| | | topicInfo.setInterfaceName(parts[0]); |
| | | topicInfo.setMajorVersion(parts[1]); |
| | | topicInfo.setManufacturer(parts[2]); |
| | | topicInfo.setAgvNo(parts[3]); |
| | | topicInfo.setSubTopic(parts[4]); |
| | | return topicInfo; |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | package com.zy.acs.hk.latent.mqtt; |
| | | |
| | | public interface HkMessageHandler { |
| | | |
| | | String supportSubTopic(); |
| | | |
| | | void handle(HkTopicInfo topicInfo, String payload); |
| | | } |
| New file |
| | |
| | | package com.zy.acs.hk.latent.mqtt; |
| | | |
| | | import com.zy.acs.hk.latent.config.MqttProperties; |
| | | import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | @Component |
| | | public class HkTopicBuilder { |
| | | |
| | | @Autowired |
| | | private MqttProperties mqttProperties; |
| | | |
| | | public List<String> buildSubscribeTopics() { |
| | | List<String> topics = new ArrayList<>(); |
| | | for (HkSubTopicType subTopicEnum : HkSubTopicType.values()) { |
| | | topics.add(buildTopic("+", subTopicEnum.getCode())); |
| | | } |
| | | return topics; |
| | | } |
| | | |
| | | public String buildTopic(String agvNo, String subTopic) { |
| | | return mqttProperties.getInterfaceName() |
| | | + "/" + mqttProperties.getMajorVersion() |
| | | + "/" + mqttProperties.getManufacturer() |
| | | + "/" + agvNo |
| | | + "/" + subTopic; |
| | | } |
| | | } |
| New file |
| | |
| | | package com.zy.acs.hk.latent.mqtt; |
| | | |
| | | import lombok.Data; |
| | | |
| | | @Data |
| | | public class HkTopicInfo { |
| | | |
| | | private String rawTopic; |
| | | |
| | | private String interfaceName; |
| | | |
| | | private String majorVersion; |
| | | |
| | | private String manufacturer; |
| | | |
| | | private String agvNo; |
| | | |
| | | private String subTopic; |
| | | } |
| New file |
| | |
| | | package com.zy.acs.hk.latent.mqtt.handler; |
| | | |
| | | import com.zy.acs.hk.latent.mqtt.HkMessageHandler; |
| | | import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType; |
| | | import com.zy.acs.hk.latent.mqtt.HkTopicInfo; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class ConnectionMessageHandler implements HkMessageHandler { |
| | | |
| | | @Override |
| | | public String supportSubTopic() { |
| | | return HkSubTopicType.CONNECTION.getCode(); |
| | | } |
| | | |
| | | @Override |
| | | public void handle(HkTopicInfo topicInfo, String payload) { |
| | | log.info("handle connection message, agvNo={}, topic={}, payload={}", |
| | | topicInfo.getAgvNo(), topicInfo.getRawTopic(), payload); |
| | | } |
| | | } |
| New file |
| | |
| | | package com.zy.acs.hk.latent.mqtt.handler; |
| | | |
| | | import com.zy.acs.hk.latent.mqtt.HkMessageHandler; |
| | | import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType; |
| | | import com.zy.acs.hk.latent.mqtt.HkTopicInfo; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class FactsheetMessageHandler implements HkMessageHandler { |
| | | |
| | | @Override |
| | | public String supportSubTopic() { |
| | | return HkSubTopicType.FACTSHEET.getCode(); |
| | | } |
| | | |
| | | @Override |
| | | public void handle(HkTopicInfo topicInfo, String payload) { |
| | | log.info("handle factsheet message, agvNo={}, topic={}, payload={}", |
| | | topicInfo.getAgvNo(), topicInfo.getRawTopic(), payload); |
| | | } |
| | | } |
| New file |
| | |
| | | package com.zy.acs.hk.latent.mqtt.handler; |
| | | |
| | | import com.zy.acs.hk.latent.mqtt.HkMessageHandler; |
| | | import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType; |
| | | import com.zy.acs.hk.latent.mqtt.HkTopicInfo; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class StateMessageHandler implements HkMessageHandler { |
| | | |
| | | @Override |
| | | public String supportSubTopic() { |
| | | return HkSubTopicType.STATE.getCode(); |
| | | } |
| | | |
| | | @Override |
| | | public void handle(HkTopicInfo topicInfo, String payload) { |
| | | log.info("handle state message, agvNo={}, topic={}, payload={}", |
| | | topicInfo.getAgvNo(), topicInfo.getRawTopic(), payload); |
| | | } |
| | | } |
| New file |
| | |
| | | package com.zy.acs.hk.latent.mqtt.handler; |
| | | |
| | | import com.zy.acs.hk.latent.mqtt.HkMessageHandler; |
| | | import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType; |
| | | import com.zy.acs.hk.latent.mqtt.HkTopicInfo; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class VisualizationMessageHandler implements HkMessageHandler { |
| | | |
| | | @Override |
| | | public String supportSubTopic() { |
| | | return HkSubTopicType.VISUALIZATION.getCode(); |
| | | } |
| | | |
| | | @Override |
| | | public void handle(HkTopicInfo topicInfo, String payload) { |
| | | log.info("handle visualization message, agvNo={}, topic={}, payload={}", |
| | | topicInfo.getAgvNo(), topicInfo.getRawTopic(), payload); |
| | | } |
| | | } |
| New file |
| | |
| | | package com.zy.acs.hk.latent.mqtt.type; |
| | | |
| | | import lombok.Getter; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | @Getter |
| | | public enum HkSubTopicType { |
| | | |
| | | STATE("state"), |
| | | VISUALIZATION("visualization"), |
| | | CONNECTION("connection"), |
| | | FACTSHEET("factsheet"); |
| | | |
| | | private final String code; |
| | | |
| | | HkSubTopicType(String code) { |
| | | this.code = code; |
| | | } |
| | | |
| | | public static HkSubTopicType fromCode(String code) { |
| | | if (!StringUtils.hasText(code)) { |
| | | return null; |
| | | } |
| | | |
| | | for (HkSubTopicType subTopicEnum : values()) { |
| | | if (subTopicEnum.code.equalsIgnoreCase(code)) { |
| | | return subTopicEnum; |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | } |
| | |
| | | connection-timeout: 10 |
| | | keep-alive-interval: 20 |
| | | qos: 1 |
| | | interface-name: VDA |
| | | major-version: V2.0.0 |
| | | manufacturer: HikRobot |
| | |
| | | connection-timeout: 10 |
| | | keep-alive-interval: 20 |
| | | qos: 1 |
| | | demo-topic: rcs/hk/latent/demo |
| | | interface-name: VDA |
| | | major-version: V2.0.0 |
| | | manufacturer: HikRobot |