package com.zy.component.influxdb.service;
|
|
import com.influxdb.v3.client.InfluxDBClient;
|
import com.influxdb.v3.client.Point;
|
import com.influxdb.v3.client.write.WritePrecision;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.time.Instant;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.stream.Collectors;
|
import java.util.stream.Stream;
|
|
|
@Service
|
public class InfluxDBService {
|
|
private static final Logger logger = LoggerFactory.getLogger(InfluxDBService.class);
|
|
@Autowired
|
private InfluxDBClient influxDBClient;
|
|
|
|
/**
|
* 写入数据
|
*
|
* @param measurement 表名
|
* @param tags 标签
|
* @param fields 字段
|
*/
|
public void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields) {
|
Point point = Point.measurement(measurement)
|
.setTags(tags)
|
.setFields(fields)
|
.setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS);
|
try {
|
influxDBClient.writePoint(point);
|
} catch (Exception e) {
|
logger.error("Failed to write data to the database.");
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 查询数据
|
*
|
* @param sql sql语句
|
* @return 查询结果列表
|
*/
|
public List<Map<String, Object>> queryData(String sql) {
|
try {
|
// 执行查询
|
Stream<Object[]> query = influxDBClient.query(sql);
|
logger.info("查询数据:{}", query);
|
// 转换为 Map 列表(便于后续处理)
|
List<Map<String, Object>> collect = query.map(record -> {
|
Map<String, Object> map = new LinkedHashMap<>();
|
for (int i = 0; i < record.length; i += 2) {
|
map.put((String) record[i], record[i + 1]);
|
}
|
return map;
|
})
|
.collect(Collectors.toList());
|
return collect;
|
} catch (Exception e) {
|
logger.error("Failed to query data from the database.");
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
|
}
|