package com.zy.component.influxdb.service;
|
|
import com.influxdb.v3.client.InfluxDBClient;
|
import com.influxdb.v3.client.Point;
|
import com.influxdb.v3.client.PointValues;
|
import com.influxdb.v3.client.query.QueryOptions;
|
import com.influxdb.v3.client.query.QueryType;
|
import com.influxdb.v3.client.write.WritePrecision;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.lang.reflect.Field;
|
import java.lang.reflect.InvocationTargetException;
|
import java.time.Instant;
|
import java.util.Arrays;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.stream.Collectors;
|
import java.util.stream.Stream;
|
|
|
@Service
|
public class InfluxDBService {
|
|
private static final Logger logger = LoggerFactory.getLogger(InfluxDBService.class);
|
|
@Autowired
|
private InfluxDBClient influxDBClient;
|
|
|
|
/**
|
* 写入数据
|
*
|
* @param measurement 表名
|
* @param tags 标签
|
* @param fields 字段
|
*/
|
public void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields) {
|
Point point = Point.measurement(measurement)
|
.setTags(tags)
|
.setFields(fields)
|
.setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS);
|
try {
|
influxDBClient.writePoint(point);
|
} catch (Exception e) {
|
logger.error("Failed to write data to the database.");
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 查询数据
|
*
|
* @param sql sql语句
|
* @return 查询结果列表
|
*/
|
public List<Map<String, Object>> queryData(String sql) {
|
try {
|
// 执行查询
|
Stream<Map<String, Object>> mapStream = influxDBClient.queryRows(sql);
|
List<Map<String, Object>> result = mapStream.collect(Collectors.toList());
|
logger.info("查询数据:{}", result);
|
return result;
|
} catch (Exception e) {
|
logger.error("Failed to query data from the database.");
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
/**
|
* 查询数据
|
*
|
* @param sql sql语句
|
* @return 查询结果列表
|
*/
|
public <T> List<T> queryPoints(String sql, Class<T> clazz) {
|
try {
|
// 执行查询
|
Stream<PointValues> queryPoints = influxDBClient.queryPoints(sql);
|
Field[] declaredFields = clazz.getDeclaredFields();
|
|
// 创建一个列表用于存储结果
|
List<T> result = queryPoints.map(point -> {
|
T newInstance = null;
|
try {
|
// 使用无参构造函数创建新实例
|
newInstance = clazz.getDeclaredConstructor().newInstance();
|
} catch (NoSuchMethodException | InstantiationException e) {
|
// 如果没有无参构造函数或无法实例化,则抛出有意义的异常
|
throw new RuntimeException("无法创建对象实例,请确保类有公共无参构造函数: " + clazz.getName(), e);
|
} catch (InvocationTargetException e) {
|
throw new RuntimeException(e);
|
} catch (IllegalAccessException e) {
|
throw new RuntimeException(e);
|
}
|
|
String[] tagNames = point.getTagNames();
|
for (String tagName : tagNames) {
|
for (Field declaredField : declaredFields) {
|
if (declaredField.getName().equals(tagName)) {
|
declaredField.setAccessible(true);
|
try {
|
declaredField.set(newInstance, point.getTag(tagName));
|
} catch (IllegalAccessException e) {
|
throw new RuntimeException(e);
|
}
|
break;
|
}
|
}
|
}
|
String[] fieldNames = point.getFieldNames();
|
for (String fieldName : fieldNames) {
|
for (Field declaredField : declaredFields) {
|
if (declaredField.getName().equals(fieldName)) {
|
declaredField.setAccessible(true);
|
try {
|
declaredField.set(newInstance, point.getField(fieldName));
|
} catch (IllegalAccessException e) {
|
throw new RuntimeException(e);
|
}
|
break;
|
}
|
}
|
}
|
return newInstance;
|
}).collect(Collectors.toList());
|
|
logger.info("查询数据:{}", result);
|
return result;
|
} catch (Exception e) {
|
logger.error("Failed to query data from the database.");
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
|
|
}
|