package com.zy.component.influxdb.service; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.query.QueryOptions; import com.influxdb.v3.client.query.QueryType; import com.influxdb.v3.client.write.WritePrecision; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @Service public class InfluxDBService { private static final Logger logger = LoggerFactory.getLogger(InfluxDBService.class); @Autowired private InfluxDBClient influxDBClient; /** * 写入数据 * * @param measurement 表名 * @param tags 标签 * @param fields 字段 */ public void writeData(String measurement, Map tags, Map fields) { Point point = Point.measurement(measurement) .setTags(tags) .setFields(fields) .setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS); try { influxDBClient.writePoint(point); } catch (Exception e) { logger.error("Failed to write data to the database."); e.printStackTrace(); } } /** * 查询数据 * * @param sql sql语句 * @return 查询结果列表 */ public List> queryData(String sql) { try { // 执行查询 Stream> mapStream = influxDBClient.queryRows(sql); List> result = mapStream.collect(Collectors.toList()); logger.info("查询数据:{}", result); return result; } catch (Exception e) { logger.error("Failed to query data from the database."); e.printStackTrace(); } return null; } /** * 查询数据 * * @param sql sql语句 * @return 查询结果列表 */ public List queryPoints(String sql, Class clazz) { try { // 执行查询 Stream queryPoints = influxDBClient.queryPoints(sql); Field[] declaredFields = clazz.getDeclaredFields(); // 创建一个列表用于存储结果 List result = queryPoints.map(point -> { T newInstance = null; try { // 使用无参构造函数创建新实例 newInstance = clazz.getDeclaredConstructor().newInstance(); } catch (NoSuchMethodException | InstantiationException e) { // 如果没有无参构造函数或无法实例化,则抛出有意义的异常 throw new RuntimeException("无法创建对象实例,请确保类有公共无参构造函数: " + clazz.getName(), e); } catch (InvocationTargetException e) { throw new RuntimeException(e); } catch (IllegalAccessException e) { throw new RuntimeException(e); } String[] tagNames = point.getTagNames(); for (String tagName : tagNames) { for (Field declaredField : declaredFields) { if (declaredField.getName().equals(tagName)) { declaredField.setAccessible(true); try { declaredField.set(newInstance, point.getTag(tagName)); } catch (IllegalAccessException e) { throw new RuntimeException(e); } break; } } } String[] fieldNames = point.getFieldNames(); for (String fieldName : fieldNames) { for (Field declaredField : declaredFields) { if (declaredField.getName().equals(fieldName)) { declaredField.setAccessible(true); try { declaredField.set(newInstance, point.getField(fieldName)); } catch (IllegalAccessException e) { throw new RuntimeException(e); } break; } } } return newInstance; }).collect(Collectors.toList()); logger.info("查询数据:{}", result); return result; } catch (Exception e) { logger.error("Failed to query data from the database."); e.printStackTrace(); } return null; } }