package com.zy.influxdb.service;
|
|
import com.influxdb.annotations.Column;
|
import com.influxdb.annotations.Measurement;
|
import com.influxdb.client.InfluxDBClient;
|
import com.influxdb.client.WriteApiBlocking;
|
import com.influxdb.client.domain.WritePrecision;
|
import com.influxdb.client.write.Point;
|
import com.influxdb.query.FluxTable;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.time.Instant;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.stream.Collectors;
|
import java.util.stream.Stream;
|
|
|
@Service
|
public class InfluxDBService {
|
|
private static final Logger logger = LoggerFactory.getLogger(InfluxDBService.class);
|
|
@Autowired
|
private InfluxDBClient influxDBClient;
|
|
// 构造函数注入客户端
|
|
|
|
/**
|
* 写入数据
|
* @param measurement 表名
|
* @param tags 标签
|
* @param fields 字段
|
*/
|
public void writeData(String measurement, Map<String,String> tags,Map<String,Object> fields) {
|
WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking();
|
Point point = Point.measurement(measurement)
|
.addTags(tags)
|
.addFields(fields)
|
.time(Instant.now().toEpochMilli(), WritePrecision.MS);
|
try {
|
writeApiBlocking.writePoint(point);
|
System.out.println("Data written to the database.");
|
}
|
catch (Exception e) {
|
System.err.println("Failed to write data to the database.");
|
e.printStackTrace();
|
}
|
}
|
/**
|
* 查询数据
|
* @param sql sql语句
|
* @return 查询结果列表
|
*/
|
public List<FluxTable> queryData(String sql) {
|
try {
|
// 执行查询
|
return influxDBClient.getQueryApi().query(sql);
|
} catch (Exception e) {
|
System.err.println("Failed to query data from the database.");
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
|
/**
|
* 查询数据
|
* @param sql sql语句
|
* @return 查询结果列表
|
*/
|
public <T> List<T> queryData(String sql,T t) {
|
try {
|
// 执行查询
|
List<T> temperatures = influxDBClient.getQueryApi().q(sql, t.getClass());
|
|
|
} catch (Exception e) {
|
System.err.println("Failed to query data from the database.");
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
//POJO类定义如下:
|
@Measurement(name = "temperature")//表名
|
public static class Temperature {
|
|
@Column(tag = true)//tag
|
String location;
|
|
@Column//field
|
Double value;
|
|
@Column(timestamp = true)//timestamp
|
Instant time;
|
|
}
|
}
|