package com.zy.influxdb.service;
|
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.stereotype.Service;
|
|
import java.time.Instant;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.stream.Collectors;
|
import java.util.stream.Stream;
|
|
|
@Service
|
public class InfluxDBService {
|
|
private static final Logger logger = LoggerFactory.getLogger(InfluxDBService.class);
|
|
private final InfluxDBClient influxDBClient;
|
|
// 构造函数注入客户端
|
public InfluxDBService(InfluxDBClient influxDBClient ) {
|
this.influxDBClient = influxDBClient ;
|
}
|
|
|
/**
|
* 写入数据
|
* @param measurement 表名
|
* @param tags 标签
|
* @param fields 字段
|
*/
|
public void writeData(String measurement, Map<String,String> tags,Map<String,Object> fields) {
|
Point point = Point.measurement(measurement)
|
.setTags(tags)
|
.setFields(fields)
|
.setTimestamp(Instant.now().minusSeconds(10));
|
try {
|
influxDBClient.writePoint(point);
|
System.out.println("Data written to the database.");
|
}
|
catch (Exception e) {
|
System.err.println("Failed to write data to the database.");
|
e.printStackTrace();
|
}
|
}
|
/**
|
* 查询数据
|
* @param sql sql语句
|
* @return 查询结果列表
|
*/
|
public List<Map<String, Object>> queryData(String sql) {
|
|
try {
|
// 执行查询
|
Stream<Object[]> query = influxDBClient.query(sql);
|
// 转换为 Map 列表(便于后续处理)
|
List<Map<String, Object>> collect = query.map(record -> {
|
Map<String, Object> map = new LinkedHashMap<>();
|
for (int i = 0; i < record.length; i += 2) {
|
map.put((String) record[i], record[i + 1]);
|
}
|
return map;
|
})
|
.collect(Collectors.toList());
|
return collect;
|
} catch (Exception e) {
|
System.err.println("Failed to query data from the database.");
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
}
|