| | |
| | | package com.zy.influxdb.service; |
| | | |
| | | import com.influxdb.annotations.Column; |
| | | import com.influxdb.annotations.Measurement; |
| | | import com.influxdb.client.InfluxDBClient; |
| | | import com.influxdb.client.WriteApiBlocking; |
| | | import com.influxdb.client.domain.WritePrecision; |
| | | import com.influxdb.client.write.Point; |
| | | import com.influxdb.query.FluxTable; |
| | | import com.influxdb.v3.client.InfluxDBClient; |
| | | import com.influxdb.v3.client.Point; |
| | | import com.influxdb.v3.client.write.WritePrecision; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | |
| | | @Autowired |
| | | private InfluxDBClient influxDBClient; |
| | | |
| | | // 构造函数注入客户端 |
| | | |
| | | |
| | | |
| | | /** |
| | | * 写入数据 |
| | | * |
| | | * @param measurement 表名 |
| | | * @param tags 标签 |
| | | * @param fields 字段 |
| | | */ |
| | | public void writeData(String measurement, Map<String,String> tags,Map<String,Object> fields) { |
| | | WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking(); |
| | | Point point = Point.measurement(measurement) |
| | | .addTags(tags) |
| | | .addFields(fields) |
| | | .time(Instant.now().toEpochMilli(), WritePrecision.MS); |
| | | .setTags(tags) |
| | | .setFields(fields) |
| | | .setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS); |
| | | try { |
| | | writeApiBlocking.writePoint(point); |
| | | influxDBClient.writePoint(point); |
| | | System.out.println("Data written to the database."); |
| | | } |
| | | catch (Exception e) { |
| | | System.err.println("Failed to write data to the database."); |
| | | } catch (Exception e) { |
| | | logger.error("Failed to write data to the database."); |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 查询数据 |
| | | * |
| | | * @param sql sql语句 |
| | | * @return 查询结果列表 |
| | | */ |
| | | public List<FluxTable> queryData(String sql) { |
| | | public List<Map<String, Object>> queryData(String sql) { |
| | | try { |
| | | // 执行查询 |
| | | return influxDBClient.getQueryApi().query(sql); |
| | | Stream<Object[]> query = influxDBClient.query(sql); |
| | | // 转换为 Map 列表(便于后续处理) |
| | | List<Map<String, Object>> collect = query.map(record -> { |
| | | Map<String, Object> map = new LinkedHashMap<>(); |
| | | for (int i = 0; i < record.length; i += 2) { |
| | | map.put((String) record[i], record[i + 1]); |
| | | } |
| | | return map; |
| | | }) |
| | | .collect(Collectors.toList()); |
| | | return collect; |
| | | } catch (Exception e) { |
| | | System.err.println("Failed to query data from the database."); |
| | | logger.error("Failed to query data from the database."); |
| | | e.printStackTrace(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 查询数据 |
| | | * @param sql sql语句 |
| | | * @return 查询结果列表 |
| | | */ |
| | | public <T> List<T> queryData(String sql,T t) { |
| | | try { |
| | | // 执行查询 |
| | | List<T> temperatures = influxDBClient.getQueryApi().q(sql, t.getClass()); |
| | | |
| | | |
| | | } catch (Exception e) { |
| | | System.err.println("Failed to query data from the database."); |
| | | e.printStackTrace(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | //POJO类定义如下: |
| | | @Measurement(name = "temperature")//表名 |
| | | public static class Temperature { |
| | | |
| | | @Column(tag = true)//tag |
| | | String location; |
| | | |
| | | @Column//field |
| | | Double value; |
| | | |
| | | @Column(timestamp = true)//timestamp |
| | | Instant time; |
| | | |
| | | } |
| | | } |