package com.zy.influxdb.service; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.write.WritePrecision; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.Instant; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @Service public class InfluxDBService { private static final Logger logger = LoggerFactory.getLogger(InfluxDBService.class); @Autowired private InfluxDBClient influxDBClient; /** * 写入数据 * * @param measurement 表名 * @param tags 标签 * @param fields 字段 */ public void writeData(String measurement, Map tags, Map fields) { Point point = Point.measurement(measurement) .setTags(tags) .setFields(fields) .setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS); try { influxDBClient.writePoint(point); System.out.println("Data written to the database."); } catch (Exception e) { logger.error("Failed to write data to the database."); e.printStackTrace(); } } /** * 查询数据 * * @param sql sql语句 * @return 查询结果列表 */ public List> queryData(String sql) { try { // 执行查询 Stream query = influxDBClient.query(sql); // 转换为 Map 列表(便于后续处理) List> collect = query.map(record -> { Map map = new LinkedHashMap<>(); for (int i = 0; i < record.length; i += 2) { map.put((String) record[i], record[i + 1]); } return map; }) .collect(Collectors.toList()); return collect; } catch (Exception e) { logger.error("Failed to query data from the database."); e.printStackTrace(); } return null; } }