package com.zy.integration.iot.client.impl;
|
|
import com.core.common.Cools;
|
import com.zy.integration.iot.client.IotMqttClient;
|
import com.zy.integration.iot.handler.IotInboundMessageHandler;
|
import com.zy.integration.iot.util.IotMqttSslUtils;
|
import com.zy.iot.config.IotProperties;
|
import com.zy.iot.entity.IotTopicConfig;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.PostConstruct;
|
import javax.annotation.PreDestroy;
|
import java.io.File;
|
import java.nio.charset.StandardCharsets;
|
|
/**
|
* IoT MQTT 客户端。
|
* 兼容 AWS IoT 的 mTLS 模式,也兼容局域网模拟 broker 的明文 TCP 模式。
|
*/
|
@Slf4j
|
@Service
|
public class PahoIotMqttClient implements IotMqttClient {
|
|
@Autowired
|
private IotProperties iotProperties;
|
@Autowired
|
private IotInboundMessageHandler inboundMessageHandler;
|
|
private volatile MqttClient mqttClient;
|
|
@PostConstruct
|
public void init() {
|
if (!iotProperties.isEnabled()) {
|
return;
|
}
|
try {
|
ensureConnected();
|
} catch (Exception e) {
|
log.error("initial connect to iot mqtt failed", e);
|
}
|
}
|
|
@PreDestroy
|
public void destroy() {
|
if (mqttClient == null) {
|
return;
|
}
|
try {
|
if (mqttClient.isConnected()) {
|
mqttClient.disconnect();
|
}
|
mqttClient.close();
|
} catch (Exception e) {
|
log.warn("close iot mqtt client failed", e);
|
}
|
}
|
|
/**
|
* 懒加载建连,首次发送或启动时自动连接并补订阅主题。
|
*/
|
@Override
|
public synchronized void ensureConnected() throws Exception {
|
if (!iotProperties.isEnabled()) {
|
return;
|
}
|
if (!hasRequiredConfig()) {
|
log.warn("iot config incomplete, mqtt client will not connect");
|
return;
|
}
|
if (mqttClient == null) {
|
mqttClient = createClient();
|
}
|
if (mqttClient.isConnected()) {
|
return;
|
}
|
mqttClient.connect(buildOptions());
|
subscribeTopics();
|
}
|
|
/**
|
* 统一按 QoS1 发布,兼顾可靠性和实现复杂度。
|
*/
|
@Override
|
public void publish(String topic, String payload) throws Exception {
|
if (!iotProperties.isEnabled() || Cools.isEmpty(topic) || payload == null) {
|
return;
|
}
|
ensureConnected();
|
if (mqttClient == null || !mqttClient.isConnected()) {
|
throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
|
}
|
MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
|
mqttMessage.setQos(1);
|
mqttMessage.setRetained(false);
|
mqttClient.publish(topic, mqttMessage);
|
}
|
|
@Override
|
public boolean isConnected() {
|
return mqttClient != null && mqttClient.isConnected();
|
}
|
|
private boolean hasRequiredConfig() {
|
if (Cools.isEmpty(iotProperties.getServerUri()) || Cools.isEmpty(iotProperties.getResolvedClientId())) {
|
return false;
|
}
|
if (!iotProperties.isTlsEnabled()) {
|
return true;
|
}
|
return !Cools.isEmpty(iotProperties.getCaCertPath())
|
&& !Cools.isEmpty(iotProperties.getClientCertPath())
|
&& !Cools.isEmpty(iotProperties.getPrivateKeyPath());
|
}
|
|
private MqttClient createClient() throws Exception {
|
MqttClient client = new MqttClient(
|
iotProperties.getServerUri(),
|
iotProperties.getResolvedClientId(),
|
new MqttDefaultFilePersistence(resolvePersistenceDir())
|
);
|
client.setCallback(new MqttCallbackExtended() {
|
@Override
|
public void connectionLost(Throwable cause) {
|
log.warn("iot mqtt connection lost", cause);
|
}
|
|
@Override
|
public void messageArrived(String topic, MqttMessage message) {
|
inboundMessageHandler.handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8));
|
}
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken token) {
|
}
|
|
@Override
|
public void connectComplete(boolean reconnect, String serverURI) {
|
try {
|
subscribeTopics();
|
log.info("iot mqtt connected, reconnect={}, serverURI={}", reconnect, serverURI);
|
} catch (Exception e) {
|
log.error("subscribe iot topics failed", e);
|
}
|
}
|
});
|
return client;
|
}
|
|
private MqttConnectOptions buildOptions() throws Exception {
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setAutomaticReconnect(iotProperties.isAutomaticReconnect());
|
options.setCleanSession(iotProperties.isCleanSession());
|
options.setKeepAliveInterval(iotProperties.getKeepAliveSeconds());
|
options.setConnectionTimeout(iotProperties.getConnectionTimeoutSeconds());
|
if (iotProperties.isTlsEnabled()) {
|
options.setSocketFactory(IotMqttSslUtils.buildSocketFactory(
|
iotProperties.getCaCertPath(),
|
iotProperties.getClientCertPath(),
|
iotProperties.getPrivateKeyPath()
|
));
|
}
|
return options;
|
}
|
|
/**
|
* 只订阅 XBPS 下发的 stow/pick 指令和外部 feedback 回执。
|
*/
|
private synchronized void subscribeTopics() throws Exception {
|
if (mqttClient == null || !mqttClient.isConnected()) {
|
return;
|
}
|
IotTopicConfig topics = iotProperties.getTopics();
|
subscribe(topics.getEgressStow());
|
subscribe(topics.getEgressPick());
|
subscribe(topics.getEgressFeedback());
|
}
|
|
private void subscribe(String topic) throws Exception {
|
if (Cools.isEmpty(topic)) {
|
return;
|
}
|
mqttClient.subscribe(topic, 1);
|
}
|
|
private String resolvePersistenceDir() {
|
String configured = iotProperties.getPersistenceDir();
|
String dir = Cools.isEmpty(configured)
|
? System.getProperty("java.io.tmpdir") + File.separator + "asrs-iot-mqtt"
|
: configured;
|
File file = new File(dir);
|
if (!file.exists()) {
|
file.mkdirs();
|
}
|
return file.getAbsolutePath();
|
}
|
}
|