| | |
| | | |
| | | import com.influxdb.v3.client.InfluxDBClient; |
| | | import com.influxdb.v3.client.Point; |
| | | import com.influxdb.v3.client.PointValues; |
| | | import com.influxdb.v3.client.query.QueryOptions; |
| | | import com.influxdb.v3.client.query.QueryType; |
| | | import com.influxdb.v3.client.write.WritePrecision; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import java.lang.reflect.Field; |
| | | import java.lang.reflect.InvocationTargetException; |
| | | import java.time.Instant; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.Arrays; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.stream.Collectors; |
| | |
| | | .setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS); |
| | | try { |
| | | influxDBClient.writePoint(point); |
| | | System.out.println("Data written to the database."); |
| | | } catch (Exception e) { |
| | | logger.error("Failed to write data to the database."); |
| | | e.printStackTrace(); |
| | |
| | | public List<Map<String, Object>> queryData(String sql) { |
| | | try { |
| | | // 执行查询 |
| | | Stream<Object[]> query = influxDBClient.query(sql); |
| | | logger.info("查询数据:{}", query); |
| | | // 转换为 Map 列表(便于后续处理) |
| | | List<Map<String, Object>> collect = query.map(record -> { |
| | | Map<String, Object> map = new LinkedHashMap<>(); |
| | | for (int i = 0; i < record.length; i += 2) { |
| | | map.put((String) record[i], record[i + 1]); |
| | | } |
| | | return map; |
| | | }) |
| | | .collect(Collectors.toList()); |
| | | return collect; |
| | | Stream<Map<String, Object>> mapStream = influxDBClient.queryRows(sql); |
| | | List<Map<String, Object>> result = mapStream.collect(Collectors.toList()); |
| | | logger.info("查询数据:{}", result); |
| | | return result; |
| | | } catch (Exception e) { |
| | | logger.error("Failed to query data from the database."); |
| | | e.printStackTrace(); |
| | |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * 查询数据 |
| | | * |
| | | * @param sql sql语句 |
| | | * @return 查询结果列表 |
| | | */ |
| | | public <T> List<T> queryPoints(String sql, Class<T> clazz) { |
| | | try { |
| | | // 执行查询 |
| | | Stream<PointValues> queryPoints = influxDBClient.queryPoints(sql); |
| | | Field[] declaredFields = clazz.getDeclaredFields(); |
| | | |
| | | // 创建一个列表用于存储结果 |
| | | List<T> result = queryPoints.map(point -> { |
| | | T newInstance = null; |
| | | try { |
| | | // 使用无参构造函数创建新实例 |
| | | newInstance = clazz.getDeclaredConstructor().newInstance(); |
| | | } catch (NoSuchMethodException | InstantiationException e) { |
| | | // 如果没有无参构造函数或无法实例化,则抛出有意义的异常 |
| | | throw new RuntimeException("无法创建对象实例,请确保类有公共无参构造函数: " + clazz.getName(), e); |
| | | } catch (InvocationTargetException e) { |
| | | throw new RuntimeException(e); |
| | | } catch (IllegalAccessException e) { |
| | | throw new RuntimeException(e); |
| | | } |
| | | |
| | | String[] tagNames = point.getTagNames(); |
| | | for (String tagName : tagNames) { |
| | | for (Field declaredField : declaredFields) { |
| | | if (declaredField.getName().equals(tagName)) { |
| | | declaredField.setAccessible(true); |
| | | try { |
| | | declaredField.set(newInstance, point.getTag(tagName)); |
| | | } catch (IllegalAccessException e) { |
| | | throw new RuntimeException(e); |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | String[] fieldNames = point.getFieldNames(); |
| | | for (String fieldName : fieldNames) { |
| | | for (Field declaredField : declaredFields) { |
| | | if (declaredField.getName().equals(fieldName)) { |
| | | declaredField.setAccessible(true); |
| | | try { |
| | | declaredField.set(newInstance, point.getField(fieldName)); |
| | | } catch (IllegalAccessException e) { |
| | | throw new RuntimeException(e); |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | return newInstance; |
| | | }).collect(Collectors.toList()); |
| | | |
| | | logger.info("查询数据:{}", result); |
| | | return result; |
| | | } catch (Exception e) { |
| | | logger.error("Failed to query data from the database."); |
| | | e.printStackTrace(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | |
| | | |
| | | } |