1
zhang
3 天以前 9671fa60b69a5b749bfbd989f0aa281aa284dde6
component/component-Influxdb/src/main/java/com/zy/component/influxdb/service/InfluxDBService.java
@@ -6,6 +6,7 @@
import com.influxdb.v3.client.query.QueryOptions;
import com.influxdb.v3.client.query.QueryType;
import com.influxdb.v3.client.write.WritePrecision;
import com.zy.component.influxdb.domain.BaseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -15,6 +16,7 @@
import java.lang.reflect.InvocationTargetException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -29,8 +31,6 @@
    @Autowired
    private InfluxDBClient influxDBClient;
    /**
     * 写入数据
     *
@@ -38,11 +38,11 @@
     * @param tags        标签
     * @param fields      字段
     */
    public void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields) {
    public void writeData(String measurement, Map<String, String> tags, Map<String, Object> fields,Long timestamp) {
        Point point = Point.measurement(measurement)
                .setTags(tags)
                .setFields(fields)
                .setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS);
                .setTimestamp(timestamp, WritePrecision.MS);
        try {
            influxDBClient.writePoint(point);
        } catch (Exception e) {
@@ -70,17 +70,25 @@
        }
        return null;
    }
    /**
     * 查询数据
     *
     * @param sql sql语句
     * @return 查询结果列表
     */
    public <T> List<T> queryPoints(String sql, Class<T> clazz) {
    public <T  extends BaseMessage> List<T> queryPoints(String sql, Class<T> clazz) {
       return queryPoints(sql,new HashMap<>(),clazz);
    }
    /**
     * 查询数据
     *
     * @param sql sql语句
     * @return 查询结果列表
     */
    public <T  extends BaseMessage> List<T> queryPoints(String sql,Map<String,Object> queryParams, Class<T> clazz) {
        try {
            // 执行查询
            Stream<PointValues> queryPoints = influxDBClient.queryPoints(sql);
            Stream<PointValues> queryPoints = influxDBClient.queryPoints(sql, queryParams);
            Field[] declaredFields = clazz.getDeclaredFields();
                
            // 创建一个列表用于存储结果
@@ -126,10 +134,9 @@
                        }
                    }
                }
                newInstance.setTimestamp(point.getTimestamp().longValue());
                return newInstance;
            }).collect(Collectors.toList());
            logger.info("查询数据:{}", result);
            return result;
        } catch (Exception e) {
            logger.error("Failed to query data from the database.");