package com.zy.integration.iot.client.impl; import com.core.common.Cools; import com.zy.integration.iot.client.IotMqttClient; import com.zy.integration.iot.handler.IotInboundMessageHandler; import com.zy.integration.iot.util.IotMqttSslUtils; import com.zy.iot.config.IotProperties; import com.zy.iot.entity.IotTopicConfig; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.File; import java.nio.charset.StandardCharsets; /** * IoT MQTT 客户端。 * 兼容 AWS IoT 的 mTLS 模式,也兼容局域网模拟 broker 的明文 TCP 模式。 */ @Slf4j @Service public class PahoIotMqttClient implements IotMqttClient { @Autowired private IotProperties iotProperties; @Autowired private IotInboundMessageHandler inboundMessageHandler; private volatile MqttClient mqttClient; @PostConstruct public void init() { if (!iotProperties.isEnabled()) { return; } try { ensureConnected(); } catch (Exception e) { log.error("initial connect to iot mqtt failed", e); } } @PreDestroy public void destroy() { if (mqttClient == null) { return; } try { if (mqttClient.isConnected()) { mqttClient.disconnect(); } mqttClient.close(); } catch (Exception e) { log.warn("close iot mqtt client failed", e); } } /** * 懒加载建连,首次发送或启动时自动连接并补订阅主题。 */ @Override public synchronized void ensureConnected() throws Exception { if (!iotProperties.isEnabled()) { return; } if (!hasRequiredConfig()) { log.warn("iot config incomplete, mqtt client will not connect"); return; } if (mqttClient == null) { mqttClient = createClient(); } if (mqttClient.isConnected()) { return; } mqttClient.connect(buildOptions()); subscribeTopics(); } /** * 统一按 QoS1 发布,兼顾可靠性和实现复杂度。 */ @Override public void publish(String topic, String payload) throws Exception { if (!iotProperties.isEnabled() || Cools.isEmpty(topic) || payload == null) { return; } ensureConnected(); if (mqttClient == null || !mqttClient.isConnected()) { throw new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); } MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); mqttMessage.setQos(1); mqttMessage.setRetained(false); mqttClient.publish(topic, mqttMessage); } @Override public boolean isConnected() { return mqttClient != null && mqttClient.isConnected(); } private boolean hasRequiredConfig() { if (Cools.isEmpty(iotProperties.getServerUri()) || Cools.isEmpty(iotProperties.getResolvedClientId())) { return false; } if (!iotProperties.isTlsEnabled()) { return true; } return !Cools.isEmpty(iotProperties.getCaCertPath()) && !Cools.isEmpty(iotProperties.getClientCertPath()) && !Cools.isEmpty(iotProperties.getPrivateKeyPath()); } private MqttClient createClient() throws Exception { MqttClient client = new MqttClient( iotProperties.getServerUri(), iotProperties.getResolvedClientId(), new MqttDefaultFilePersistence(resolvePersistenceDir()) ); client.setCallback(new MqttCallbackExtended() { @Override public void connectionLost(Throwable cause) { log.warn("iot mqtt connection lost", cause); } @Override public void messageArrived(String topic, MqttMessage message) { inboundMessageHandler.handleMessage(topic, new String(message.getPayload(), StandardCharsets.UTF_8)); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectComplete(boolean reconnect, String serverURI) { try { subscribeTopics(); log.info("iot mqtt connected, reconnect={}, serverURI={}", reconnect, serverURI); } catch (Exception e) { log.error("subscribe iot topics failed", e); } } }); return client; } private MqttConnectOptions buildOptions() throws Exception { MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(iotProperties.isAutomaticReconnect()); options.setCleanSession(iotProperties.isCleanSession()); options.setKeepAliveInterval(iotProperties.getKeepAliveSeconds()); options.setConnectionTimeout(iotProperties.getConnectionTimeoutSeconds()); if (iotProperties.isTlsEnabled()) { options.setSocketFactory(IotMqttSslUtils.buildSocketFactory( iotProperties.getCaCertPath(), iotProperties.getClientCertPath(), iotProperties.getPrivateKeyPath() )); } return options; } /** * 只订阅 XBPS 下发的 stow/pick 指令和外部 feedback 回执。 */ private synchronized void subscribeTopics() throws Exception { if (mqttClient == null || !mqttClient.isConnected()) { return; } IotTopicConfig topics = iotProperties.getTopics(); subscribe(topics.getEgressStow()); subscribe(topics.getEgressPick()); subscribe(topics.getEgressFeedback()); } private void subscribe(String topic) throws Exception { if (Cools.isEmpty(topic)) { return; } mqttClient.subscribe(topic, 1); } private String resolvePersistenceDir() { String configured = iotProperties.getPersistenceDir(); String dir = Cools.isEmpty(configured) ? System.getProperty("java.io.tmpdir") + File.separator + "asrs-iot-mqtt" : configured; File file = new File(dir); if (!file.exists()) { file.mkdirs(); } return file.getAbsolutePath(); } }