#
vincentlu
2026-03-25 bdf160e2e17a6fcb013da88a7eb8366596c489c5
#
10个文件已添加
4个文件已修改
1个文件已删除
447 ■■■■ 已修改文件
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/config/MqttProperties.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/controller/HkMqttDemoController.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/controller/MqttDemoController.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/EmqxMqttClient.java 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/HkMessageDispatcher.java 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/HkMessageHandler.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/HkTopicBuilder.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/HkTopicInfo.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/handler/ConnectionMessageHandler.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/handler/FactsheetMessageHandler.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/handler/StateMessageHandler.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/handler/VisualizationMessageHandler.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/type/HkSubTopicType.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/resources/application.yml 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/target/classes/application.yml 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/config/MqttProperties.java
@@ -9,26 +9,30 @@
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    private boolean enabled;
    private boolean enabled = true;
    private String brokerUrl;
    private String brokerUrl = "tcp://127.0.0.1:1883";
    private String clientId;
    private String clientId = "zy-acs-hk-latent-${random.uuid}";
    private String username;
    private String username = "rcs";
    private String password;
    private String password = "xltys1995";
    private boolean cleanSession;
    private boolean cleanSession = true;
    private boolean automaticReconnect;
    private boolean automaticReconnect = true;
    private int connectionTimeout;
    private int connectionTimeout = 10;
    private int keepAliveInterval;
    private int keepAliveInterval = 20;
    private int qos;
    private int qos = 1;
    private String demoTopic = "rcs/hk/latent/demo";
    private String interfaceName = "VDA";
    private String majorVersion = "V2.0.0";
    private String manufacturer = "HikRobot";
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/controller/HkMqttDemoController.java
New file
@@ -0,0 +1,49 @@
package com.zy.acs.hk.latent.controller;
import com.zy.acs.hk.latent.mqtt.EmqxMqttClient;
import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType;
import com.zy.acs.hk.latent.mqtt.HkTopicBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.LinkedHashMap;
import java.util.Map;
@RestController
@RequestMapping("/demo/mqtt")
public class HkMqttDemoController {
    @Autowired
    private EmqxMqttClient emqxMqttClient;
    @Autowired
    private HkTopicBuilder hkTopicBuilder;
    @PostMapping("/publish")
    public Map<String, Object> publish(@RequestParam String agvNo,
                                       @RequestParam String subTopic,
                                       @RequestParam String message) {
        HkSubTopicType subTopicEnum = HkSubTopicType.fromCode(subTopic);
        if (subTopicEnum == null) {
            throw new IllegalArgumentException("unsupported subTopic: " + subTopic);
        }
        if (!StringUtils.hasText(agvNo)) {
            throw new IllegalArgumentException("agvNo can not be blank");
        }
        String topic = hkTopicBuilder.buildTopic(agvNo, subTopicEnum.getCode());
        emqxMqttClient.publish(topic, message);
        Map<String, Object> result = new LinkedHashMap<>();
        result.put("success", true);
        result.put("topic", topic);
        result.put("agvNo", agvNo);
        result.put("subTopic", subTopicEnum.getCode());
        result.put("message", message);
        return result;
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/controller/MqttDemoController.java
File was deleted
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/EmqxMqttClient.java
@@ -22,6 +22,12 @@
    @Autowired
    private MqttProperties mqttProperties;
    @Autowired
    private HkMessageDispatcher hkMessageDispatcher;
    @Autowired
    private HkTopicBuilder hkTopicBuilder;
    @PostConstruct
    public void connect() {
        if (!mqttProperties.isEnabled()) {
@@ -40,7 +46,7 @@
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    log.info("mqtt connected, reconnect={}, serverURI={}", reconnect, serverURI);
                    subscribeDemoTopic();
                    subscribeTopics();
                }
                @Override
@@ -52,6 +58,7 @@
                public void messageArrived(String topic, MqttMessage message) {
                    String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
                    log.info("mqtt message arrived, topic={}, qos={}, payload={}", topic, message.getQos(), payload);
                    hkMessageDispatcher.dispatch(topic, payload);
                }
                @Override
@@ -62,11 +69,32 @@
            MqttConnectOptions options = buildConnectOptions();
            mqttClient.connect(options);
            subscribeDemoTopic();
        } catch (MqttException e) {
            throw new IllegalStateException("connect emqx failed", e);
        }
    }
    private MqttConnectOptions buildConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
        options.setCleanSession(mqttProperties.isCleanSession());
        options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        if (StringUtils.hasText(mqttProperties.getUsername())) {
            options.setUserName(mqttProperties.getUsername());
        }
        if (StringUtils.hasText(mqttProperties.getPassword())) {
            options.setPassword(mqttProperties.getPassword().toCharArray());
        }
        return options;
    }
    public boolean isConnected() {
        return mqttClient != null && mqttClient.isConnected();
    }
    // ------------------------------------------------------------------------
    public void publish(String topic, String payload) {
        if (!StringUtils.hasText(topic)) {
@@ -86,40 +114,21 @@
        }
    }
    public boolean isConnected() {
        return mqttClient != null && mqttClient.isConnected();
    }
    private MqttConnectOptions buildConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
        options.setCleanSession(mqttProperties.isCleanSession());
        options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        if (StringUtils.hasText(mqttProperties.getUsername())) {
            options.setUserName(mqttProperties.getUsername());
        }
        if (StringUtils.hasText(mqttProperties.getPassword())) {
            options.setPassword(mqttProperties.getPassword().toCharArray());
        }
        return options;
    }
    private void subscribeDemoTopic() {
    private void subscribeTopics() {
        if (!isConnected()) {
            return;
        }
        String topic = mqttProperties.getDemoTopic();
        if (!StringUtils.hasText(topic)) {
            return;
        }
        try {
            mqttClient.subscribe(topic, mqttProperties.getQos());
            log.info("mqtt subscribed demo topic={}", topic);
        } catch (MqttException e) {
            throw new IllegalStateException("subscribe demo topic failed", e);
        for (String topicPattern : hkTopicBuilder.buildSubscribeTopics()) {
            if (!StringUtils.hasText(topicPattern)) {
                continue;
            }
            try {
                mqttClient.subscribe(topicPattern, mqttProperties.getQos());
                log.info("mqtt subscribed topic={}", topicPattern);
            } catch (MqttException e) {
                throw new IllegalStateException("subscribe topic failed: " + topicPattern, e);
            }
        }
    }
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/HkMessageDispatcher.java
New file
@@ -0,0 +1,72 @@
package com.zy.acs.hk.latent.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@Slf4j
@Component
public class HkMessageDispatcher {
    private final Map<String, HkMessageHandler> handlerMap = new HashMap<>();
    @Autowired
    private List<HkMessageHandler> hkMessageHandlers;
    @PostConstruct
    public void init() {
        for (HkMessageHandler hkMessageHandler : hkMessageHandlers) {
            handlerMap.put(hkMessageHandler.supportSubTopic().toLowerCase(Locale.ROOT), hkMessageHandler);
        }
    }
    public void dispatch(String topic, String payload) {
        HkTopicInfo topicInfo = this.parseTopic(topic);
        if (topicInfo == null) {
            log.warn("ignore mqtt topic, unsupported topic format, topic={}", topic);
            return;
        }
        if (!StringUtils.hasText(topicInfo.getAgvNo())) {
            log.warn("ignore mqtt topic, agvNo is blank, topic={}", topic);
            return;
        }
        HkMessageHandler hkMessageHandler = handlerMap.get(topicInfo.getSubTopic().toLowerCase(Locale.ROOT));
        if (hkMessageHandler == null) {
            log.warn("ignore mqtt topic, unsupported subTopic={}, topic={}", topicInfo.getSubTopic(), topic);
            return;
        }
        hkMessageHandler.handle(topicInfo, payload);
    }
    public HkTopicInfo parseTopic(String topic) {
        if (!StringUtils.hasText(topic)) {
            return null;
        }
        String[] parts = topic.split("/");
        if (parts.length != 5) {
            return null;
        }
        HkTopicInfo topicInfo = new HkTopicInfo();
        topicInfo.setRawTopic(topic);
        topicInfo.setInterfaceName(parts[0]);
        topicInfo.setMajorVersion(parts[1]);
        topicInfo.setManufacturer(parts[2]);
        topicInfo.setAgvNo(parts[3]);
        topicInfo.setSubTopic(parts[4]);
        return topicInfo;
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/HkMessageHandler.java
New file
@@ -0,0 +1,8 @@
package com.zy.acs.hk.latent.mqtt;
public interface HkMessageHandler {
    String supportSubTopic();
    void handle(HkTopicInfo topicInfo, String payload);
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/HkTopicBuilder.java
New file
@@ -0,0 +1,32 @@
package com.zy.acs.hk.latent.mqtt;
import com.zy.acs.hk.latent.config.MqttProperties;
import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
public class HkTopicBuilder {
    @Autowired
    private MqttProperties mqttProperties;
    public List<String> buildSubscribeTopics() {
        List<String> topics = new ArrayList<>();
        for (HkSubTopicType subTopicEnum : HkSubTopicType.values()) {
            topics.add(buildTopic("+", subTopicEnum.getCode()));
        }
        return topics;
    }
    public String buildTopic(String agvNo, String subTopic) {
        return mqttProperties.getInterfaceName()
                + "/" + mqttProperties.getMajorVersion()
                + "/" + mqttProperties.getManufacturer()
                + "/" + agvNo
                + "/" + subTopic;
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/HkTopicInfo.java
New file
@@ -0,0 +1,19 @@
package com.zy.acs.hk.latent.mqtt;
import lombok.Data;
@Data
public class HkTopicInfo {
    private String rawTopic;
    private String interfaceName;
    private String majorVersion;
    private String manufacturer;
    private String agvNo;
    private String subTopic;
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/handler/ConnectionMessageHandler.java
New file
@@ -0,0 +1,23 @@
package com.zy.acs.hk.latent.mqtt.handler;
import com.zy.acs.hk.latent.mqtt.HkMessageHandler;
import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType;
import com.zy.acs.hk.latent.mqtt.HkTopicInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConnectionMessageHandler implements HkMessageHandler {
    @Override
    public String supportSubTopic() {
        return HkSubTopicType.CONNECTION.getCode();
    }
    @Override
    public void handle(HkTopicInfo topicInfo, String payload) {
        log.info("handle connection message, agvNo={}, topic={}, payload={}",
                topicInfo.getAgvNo(), topicInfo.getRawTopic(), payload);
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/handler/FactsheetMessageHandler.java
New file
@@ -0,0 +1,23 @@
package com.zy.acs.hk.latent.mqtt.handler;
import com.zy.acs.hk.latent.mqtt.HkMessageHandler;
import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType;
import com.zy.acs.hk.latent.mqtt.HkTopicInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class FactsheetMessageHandler implements HkMessageHandler {
    @Override
    public String supportSubTopic() {
        return HkSubTopicType.FACTSHEET.getCode();
    }
    @Override
    public void handle(HkTopicInfo topicInfo, String payload) {
        log.info("handle factsheet message, agvNo={}, topic={}, payload={}",
                topicInfo.getAgvNo(), topicInfo.getRawTopic(), payload);
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/handler/StateMessageHandler.java
New file
@@ -0,0 +1,23 @@
package com.zy.acs.hk.latent.mqtt.handler;
import com.zy.acs.hk.latent.mqtt.HkMessageHandler;
import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType;
import com.zy.acs.hk.latent.mqtt.HkTopicInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class StateMessageHandler implements HkMessageHandler {
    @Override
    public String supportSubTopic() {
        return HkSubTopicType.STATE.getCode();
    }
    @Override
    public void handle(HkTopicInfo topicInfo, String payload) {
        log.info("handle state message, agvNo={}, topic={}, payload={}",
                topicInfo.getAgvNo(), topicInfo.getRawTopic(), payload);
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/handler/VisualizationMessageHandler.java
New file
@@ -0,0 +1,23 @@
package com.zy.acs.hk.latent.mqtt.handler;
import com.zy.acs.hk.latent.mqtt.HkMessageHandler;
import com.zy.acs.hk.latent.mqtt.type.HkSubTopicType;
import com.zy.acs.hk.latent.mqtt.HkTopicInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class VisualizationMessageHandler implements HkMessageHandler {
    @Override
    public String supportSubTopic() {
        return HkSubTopicType.VISUALIZATION.getCode();
    }
    @Override
    public void handle(HkTopicInfo topicInfo, String payload) {
        log.info("handle visualization message, agvNo={}, topic={}, payload={}",
                topicInfo.getAgvNo(), topicInfo.getRawTopic(), payload);
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/mqtt/type/HkSubTopicType.java
New file
@@ -0,0 +1,32 @@
package com.zy.acs.hk.latent.mqtt.type;
import lombok.Getter;
import org.springframework.util.StringUtils;
@Getter
public enum HkSubTopicType {
    STATE("state"),
    VISUALIZATION("visualization"),
    CONNECTION("connection"),
    FACTSHEET("factsheet");
    private final String code;
    HkSubTopicType(String code) {
        this.code = code;
    }
    public static HkSubTopicType fromCode(String code) {
        if (!StringUtils.hasText(code)) {
            return null;
        }
        for (HkSubTopicType subTopicEnum : values()) {
            if (subTopicEnum.code.equalsIgnoreCase(code)) {
                return subTopicEnum;
            }
        }
        return null;
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/resources/application.yml
@@ -16,3 +16,6 @@
  connection-timeout: 10
  keep-alive-interval: 20
  qos: 1
  interface-name: VDA
  major-version: V2.0.0
  manufacturer: HikRobot
zy-acs-hk/zy-acs-hk-latent/target/classes/application.yml
@@ -16,4 +16,6 @@
  connection-timeout: 10
  keep-alive-interval: 20
  qos: 1
  demo-topic: rcs/hk/latent/demo
  interface-name: VDA
  major-version: V2.0.0
  manufacturer: HikRobot