package com.zy.acs.charge.protocol; import com.ghgande.j2mod.modbus.ModbusException; import com.ghgande.j2mod.modbus.io.ModbusTCPTransaction; import com.ghgande.j2mod.modbus.msg.*; import com.ghgande.j2mod.modbus.net.TCPMasterConnection; import com.ghgande.j2mod.modbus.procimg.SimpleRegister; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * 读取Modbus 协议工具类 */ @Slf4j @Component public class ModbusAdapter { @Value("${charger.modbus.host}") private String host; @Value("${charger.modbus.port}") private int port; @Value("${charger.modbus.unit-id}") private int unitId; @Value("${charger.modbus.connect-timeout:3000}") private int connectTimeout; @Value("${charger.modbus.transaction-timeout:5000}") private int transactionTimeout; @Value("${charger.modbus.heartbeat.enabled:false}") private boolean heartbeatEnabled; @Value("${charger.modbus.heartbeat.interval:30000}") private long heartbeatInterval; private TCPMasterConnection connection; private ModbusTCPTransaction transaction; private volatile boolean connected = false; private ScheduledExecutorService heartbeatScheduler; @PostConstruct public void init() { try { InetAddress address = InetAddress.getByName(host); connection = new TCPMasterConnection(address); connection.setPort(port); connection.setTimeout(connectTimeout); } catch (UnknownHostException e) { throw new RuntimeException("Invalid Modbus host: " + host, e); } } /** * 建立连接 */ public synchronized void connect() throws Exception { if (connected) { return; } if (connection == null) { throw new IllegalStateException("Modbus connection not initialized"); } connection.connect(); transaction = new ModbusTCPTransaction(connection); connected = true; log.info("Modbus TCP connected to {}:{}", host, port); if (heartbeatEnabled) { startHeartbeat(); } } /** * 断开连接 */ public synchronized void disconnect() { if (heartbeatScheduler != null && !heartbeatScheduler.isShutdown()) { heartbeatScheduler.shutdownNow(); } if (connection != null && connection.isConnected()) { connection.close(); } connected = false; log.info("Modbus TCP disconnected"); } /** * 确保连接可用(内部自动重连) */ private void ensureConnected() throws Exception { if (!connected) { connect(); } } /** * 读取线圈(功能码01) */ public boolean readCoil(int address) throws Exception { return executeWithRetry(() -> { ensureConnected(); ReadCoilsRequest request = new ReadCoilsRequest(address, 1); request.setUnitID(unitId); transaction.setRequest(request); transaction.execute(); ReadCoilsResponse response = (ReadCoilsResponse) transaction.getResponse(); return response.getCoilStatus(0); }); } /** * 批量读取线圈(功能码01) * * @param startAddress 起始地址 * @param quantity 数量(1~2000) * @return boolean数组,索引从0开始对应起始地址的线圈 */ public boolean[] readCoils(int startAddress, int quantity) throws Exception { return executeWithRetry(() -> { ensureConnected(); ReadCoilsRequest request = new ReadCoilsRequest(startAddress, quantity); request.setUnitID(unitId); transaction.setRequest(request); transaction.execute(); ReadCoilsResponse response = (ReadCoilsResponse) transaction.getResponse(); boolean[] result = new boolean[quantity]; for (int i = 0; i < quantity; i++) { result[i] = response.getCoilStatus(i); } return result; }); } /** * 写入单个线圈(功能码05) */ public void writeCoil(int address, boolean state) throws Exception { executeWithRetry(() -> { ensureConnected(); WriteCoilRequest request = new WriteCoilRequest(address, state); request.setUnitID(unitId); transaction.setRequest(request); transaction.execute(); return null; }); } /** * 读取保持寄存器(功能码03) */ public int readHoldingRegister(int address) throws Exception { return executeWithRetry(() -> { ensureConnected(); ReadMultipleRegistersRequest request = new ReadMultipleRegistersRequest(address, 1); request.setUnitID(unitId); transaction.setRequest(request); transaction.execute(); ReadMultipleRegistersResponse response = (ReadMultipleRegistersResponse) transaction.getResponse(); return response.getRegister(0).getValue(); }); } /** * 写入单个寄存器(功能码06) */ public void writeHoldingRegister(int address, int value) throws Exception { executeWithRetry(() -> { ensureConnected(); WriteSingleRegisterRequest request = new WriteSingleRegisterRequest(address, new SimpleRegister(value)); request.setUnitID(unitId); transaction.setRequest(request); transaction.execute(); return null; }); } /** * 批量读取多个寄存器 */ public int[] readHoldingRegisters(int address, int quantity) throws Exception { return executeWithRetry(() -> { ensureConnected(); ReadMultipleRegistersRequest request = new ReadMultipleRegistersRequest(address, quantity); request.setUnitID(unitId); transaction.setRequest(request); transaction.execute(); ReadMultipleRegistersResponse response = (ReadMultipleRegistersResponse) transaction.getResponse(); int[] values = new int[quantity]; for (int i = 0; i < quantity; i++) { values[i] = response.getRegister(i).getValue(); } return values; }); } /** * 带重试的执行器(自动处理网络异常并重连重试) */ private T executeWithRetry(Operation operation) throws Exception { Exception lastException = null; int maxRetries = 3; for (int i = 0; i < maxRetries; i++) { try { return operation.execute(); } catch (Exception e) { lastException = e; if (isNetworkException(e)) { log.warn("Modbus network error, retrying {}/{}", i + 1, maxRetries); tryReconnect(); } else { // 非网络异常不重试 throw e; } try { Thread.sleep(1000); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); throw new Exception("Retry interrupted", e); } } } throw new Exception("Modbus operation failed after " + maxRetries + " retries", lastException); } private boolean isNetworkException(Exception e) { return e instanceof java.net.SocketTimeoutException || e instanceof java.io.IOException || (e instanceof ModbusException && e.getMessage().toLowerCase().contains("timeout")); } private synchronized void tryReconnect() { try { disconnect(); connect(); } catch (Exception e) { log.error("Reconnect failed", e); } } /** * 心跳保活 */ private void startHeartbeat() { heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(); heartbeatScheduler.scheduleAtFixedRate(() -> { try { // 读取一个无副作用的寄存器,如充电机编号(地址113) int chargerId = readHoldingRegister(113); log.debug("Heartbeat success, chargerId={}", chargerId); } catch (Exception e) { log.warn("Heartbeat failed", e); tryReconnect(); } }, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); } @PreDestroy public void destroy() { disconnect(); } @FunctionalInterface private interface Operation { T execute() throws Exception; } }