package com.zy.component.influxdb.service; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.Point; import com.influxdb.v3.client.write.WritePrecision; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.Instant; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @Service public class InfluxDBService { private static final Logger logger = LoggerFactory.getLogger(InfluxDBService.class); @Autowired private InfluxDBClient influxDBClient; /** * 写入数据 * * @param measurement 表名 * @param tags 标签 * @param fields 字段 */ public void writeData(String measurement, Map tags, Map fields) { Point point = Point.measurement(measurement) .setTags(tags) .setFields(fields) .setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS); try { influxDBClient.writePoint(point); } catch (Exception e) { logger.error("Failed to write data to the database."); e.printStackTrace(); } } /** * 查询数据 * * @param sql sql语句 * @return 查询结果列表 */ public List> queryData(String sql) { try { // 执行查询 Stream> mapStream = influxDBClient.queryRows(sql); logger.info("查询数据:{}", mapStream.collect(Collectors.toList())); //query.forEach(row -> System.out.printf("| %-8s | %-8s | %-30s |%n", row[1], row[2], row[0])); // 转换为 Map 列表(便于后续处理) // List> collect = query.map(record -> { // Map map = new LinkedHashMap<>(); // for (int i = 0; i < record.length; i += 2) { // map.put((String) record[i], record[i + 1]); // } // return map; // }) // .collect(Collectors.toList()); return null; } catch (Exception e) { logger.error("Failed to query data from the database."); e.printStackTrace(); } return null; } }