component/component-Influxdb/src/main/java/com/zy/component/influxdb/service/InfluxDBService.java
@@ -40,7 +40,6 @@ .setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS); try { influxDBClient.writePoint(point); System.out.println("Data written to the database."); } catch (Exception e) { logger.error("Failed to write data to the database."); e.printStackTrace(); component/component-Influxdb/target/classes/com/zy/component/influxdb/service/InfluxDBService.classBinary files differ
component/component-Influxdb/target/component-Influxdb-1.0.0.jarBinary files differ
component/component-Influxdb/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst
@@ -1,3 +1,3 @@ com\zy\component\influxdb\service\InfluxDBService.class com\zy\component\influxdb\properties\InfluxDBProperties.class com\zy\component\influxdb\config\InfluxDBAutoConfiguration.class com\zy\component\influxdb\properties\InfluxDBProperties.class zy-acs-hex/pom.xml
@@ -22,11 +22,11 @@ <dependencies> <dependency> <groupId>com.zy</groupId> <artifactId>acs-common</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>com.zy</groupId> <artifactId>acs-common</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>com.zy</groupId> <artifactId>component-Influxdb</artifactId> @@ -46,10 +46,10 @@ </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Lombok --> <dependency> zy-acs-hex/src/main/java/com/zy/acs/hex/HexApplication.java
@@ -5,7 +5,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @ComponentScan(basePackages = {"com.zy.component","com.zy.acs"}) @ComponentScan(basePackages = {"com.zy.component", "com.zy.acs"}) @SpringBootApplication public class HexApplication { zy-acs-hex/src/main/java/com/zy/acs/hex/constant/InfluxDBConstant.java
New file @@ -0,0 +1,28 @@ package com.zy.acs.hex.constant; /** * 时序数据库常量类 * * @author ken */ public class InfluxDBConstant { public static final String DEVICE_MEASUREMENT = "device"; public static final String DEVICE_MEASUREMENT_TAG_DEVICEID = "deviceId"; public static final String DEVICE_MEASUREMENT_TAG_EVENT = "event"; public static final String DEVICE_MEASUREMENT_TAG_TYPE = "type"; public static class DEVICE_MEASUREMENT_TAG_TYPE_FLAG { public static final String DEVICE_MEASUREMENT_TAG_TYPE_UP = "up"; public static final String DEVICE_MEASUREMENT_TAG_TYPE_DOWN = "down"; } } zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/DownMessageListener.java
@@ -1,12 +1,15 @@ package com.zy.acs.hex.consumer; import com.rabbitmq.client.Channel; import com.zy.acs.hex.constant.InfluxDBConstant; import com.zy.acs.hex.constant.RabbitConstant; import com.zy.acs.hex.consumer.listener.AbstractListener; import com.zy.acs.hex.domain.Device; import com.zy.acs.hex.utils.ReflectionUtils; import com.zy.acs.hex.utils.StrUtils; import com.zy.component.influxdb.service.InfluxDBService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; @@ -19,15 +22,10 @@ @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(name = RabbitConstant.TOPIC_QUEUE_DOWN, durable = RabbitConstant.DURABLE), exchange = @Exchange(name = RabbitConstant.TOPIC_EXCHANGE,type = RabbitConstant.TOPIC_EXCHANGE_TYPE), exchange = @Exchange(name = RabbitConstant.TOPIC_EXCHANGE, type = RabbitConstant.TOPIC_EXCHANGE_TYPE), key = RabbitConstant.ROUTING_KEY_DOWN )) public class DownMessageListener implements AbstractListener { @Autowired private RabbitTemplate rabbitTemplate; public class DownMessageListener implements AbstractListener { @Autowired @@ -35,9 +33,9 @@ @RabbitHandler public void handle(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("receive down message: {}" , msg); public void handle(Device msg, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey, Channel channel) { log.info("routingKey:{},receive down message:{}", routingKey, msg); influxDBService.writeData(InfluxDBConstant.DEVICE_MEASUREMENT, StrUtils.getTagsByRoutingKey(routingKey), ReflectionUtils.convertBean2Map(msg)); } zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/UpMessageListener.java
@@ -1,18 +1,19 @@ package com.zy.acs.hex.consumer; import com.rabbitmq.client.Channel; import com.zy.acs.hex.constant.InfluxDBConstant; import com.zy.acs.hex.constant.RabbitConstant; import com.zy.acs.hex.consumer.listener.AbstractListener; import com.zy.acs.hex.domain.Device; import com.zy.acs.hex.utils.ReflectionUtils; import com.zy.acs.hex.utils.StrUtils; import com.zy.component.influxdb.service.InfluxDBService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; /** * 消费者 @@ -27,30 +28,15 @@ public class UpMessageListener implements AbstractListener { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private InfluxDBService influxDBService; @RabbitHandler public void handle(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("receive up message: {}" , msg); try { channel.basicAck(tag,true); } catch (IOException e) { throw new RuntimeException(e); } // 进入消息消费业务逻辑。 //String data = new String(message.getBody()); //log.info("收到消息:{}" ,data ); //influxDBService.writeData(); public void handle(Device msg, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey, Channel channel) { log.info("routingKey:{},receive up message:{}", routingKey, msg); influxDBService.writeData(InfluxDBConstant.DEVICE_MEASUREMENT, StrUtils.getTagsByRoutingKey(routingKey), ReflectionUtils.convertBean2Map(msg)); } } zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/listener/AbstractListener.java
@@ -1,9 +1,10 @@ package com.zy.acs.hex.consumer.listener; import com.rabbitmq.client.Channel; import com.zy.acs.hex.domain.Device; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; public interface AbstractListener { void handle(String event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag); void handle(Device event, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey, Channel channel); } zy-acs-hex/src/main/java/com/zy/acs/hex/controller/TestController.java
@@ -1,10 +1,13 @@ package com.zy.acs.hex.controller; import com.zy.acs.hex.constant.RabbitConstant; import com.zy.acs.hex.domain.Device; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j @@ -16,21 +19,27 @@ /** * 发送消息test1 * * @return */ @GetMapping(value = "/test1") public void sendTest1() { String router = RabbitConstant.ROUTING_KEY_UP.replaceFirst("\\*", "1").replaceFirst("\\*", "2"); rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,router,"1212321323"); Device device = new Device(); //device.setEvent("online"); //device.setDeviceId("123"); device.setProtocol("212121212121212"); String router = RabbitConstant.ROUTING_KEY_UP.replaceFirst("\\*", "123").replaceFirst("\\*", "online"); rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE, router, device); } /** * 发送消息test2 * * @return */ @GetMapping(value = "/test2") public void sendTest2() { rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE, RabbitConstant.ROUTING_KEY_DOWN, "qswaqsaasas"); rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE, RabbitConstant.ROUTING_KEY_DOWN, "qswaqsaasas"); } } zy-acs-hex/src/main/java/com/zy/acs/hex/domain/Device.java
New file @@ -0,0 +1,10 @@ package com.zy.acs.hex.domain; import lombok.Data; import java.io.Serializable; @Data public class Device implements Serializable { private String protocol; } zy-acs-hex/src/main/java/com/zy/acs/hex/utils/ReflectionUtils.java
New file @@ -0,0 +1,171 @@ package com.zy.acs.hex.utils; import com.zy.acs.framework.common.Cools; import java.beans.PropertyDescriptor; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class ReflectionUtils { /** * 获取一个类和其父类的所有属性 * * @param clazz * @return */ public static List<Field> findAllFieldsOfSelfAndSuperClass(Class clazz) { Field[] fields = null; List fieldList = new ArrayList(); while (true) { if (clazz == null) { break; } else { fields = clazz.getDeclaredFields(); for (int i = 0; i < fields.length; i++) { fieldList.add(fields[i]); } clazz = clazz.getSuperclass(); } } return fieldList; } /** * 把一个Bean对象转换成Map对象</br> * * @param obj * @param ignores * @return * @throws IllegalAccessException */ public static Map convertBean2Map(Object obj, String[] ignores) { Map map = new HashMap(); Class clazz = obj.getClass(); List<Field> fieldList = findAllFieldsOfSelfAndSuperClass(clazz); Field field = null; try { for (int i = 0; i < fieldList.size(); i++) { field = fieldList.get(i); // 定义fieldName是否在拷贝忽略的范畴内 boolean flag = false; if (ignores != null && ignores.length != 0) { flag = isExistOfIgnores(field.getName(), ignores); } if (!flag) { Object value = getProperty(obj, field.getName()); if (null != value && !Cools.isEmpty(value.toString())) { map.put(field.getName(), getProperty(obj, field.getName())); } } } } catch (SecurityException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } return map; } /** * 把一个Bean对象转换成Map对象</br> * * @param obj * @return */ public static Map convertBean2Map(Object obj) { return convertBean2Map(obj, null); } public static Map convertBean2MapForIngoreserialVersionUID(Object obj) { return convertBean2Map(obj, new String[]{"serialVersionUID"}); } /** * 判断fieldName是否是ignores中排除的 * * @param fieldName * @param ignores * @return */ private static boolean isExistOfIgnores(String fieldName, String[] ignores) { boolean flag = false; for (String str : ignores) { if (str.equals(fieldName)) { flag = true; break; } } return flag; } public static PropertyDescriptor getPropertyDescriptor(Class clazz, String propertyName) { StringBuffer sb = new StringBuffer();// 构建一个可变字符串用来构建方法名称 Method setMethod = null; Method getMethod = null; PropertyDescriptor pd = null; try { Field f = clazz.getDeclaredField(propertyName);// 根据字段名来获取字段 if (f != null) { // 构建方法的后缀 String methodEnd = propertyName.substring(0, 1).toUpperCase() + propertyName.substring(1); sb.append("set" + methodEnd);// 构建set方法 setMethod = clazz.getDeclaredMethod(sb.toString(), new Class[]{f.getType()}); sb.delete(0, sb.length());// 清空整个可变字符串 sb.append("get" + methodEnd);// 构建get方法 // 构建get 方法 getMethod = clazz.getDeclaredMethod(sb.toString(), new Class[]{}); // 构建一个属性描述器 把对应属性 propertyName 的 get 和 set 方法保存到属性描述器中 pd = new PropertyDescriptor(propertyName, getMethod, setMethod); } } catch (Exception ex) { ex.printStackTrace(); } return pd; } @SuppressWarnings("unchecked") public static void setProperty(Object obj, String propertyName, Object value) { Class clazz = obj.getClass();// 获取对象的类型 PropertyDescriptor pd = getPropertyDescriptor(clazz, propertyName);// 获取 clazz // 类型中的 // propertyName // 的属性描述器 Method setMethod = pd.getWriteMethod();// 从属性描述器中获取 set 方法 try { setMethod.invoke(obj, new Object[]{value});// 调用 set 方法将传入的value值保存属性中去 } catch (Exception e) { e.printStackTrace(); } } @SuppressWarnings({"unchecked", "rawtypes"}) public static Object getProperty(Object obj, String propertyName) { Class clazz = obj.getClass();// 获取对象的类型 PropertyDescriptor pd = getPropertyDescriptor(clazz, propertyName);// 获取 clazz // 类型中的 // propertyName // 的属性描述器 Method getMethod = pd.getReadMethod();// 从属性描述器中获取 get 方法 Object value = null; try { value = getMethod.invoke(obj, new Object[]{});// 调用方法获取方法的返回值 } catch (Exception e) { e.printStackTrace(); } return value;// 返回值 } } zy-acs-hex/src/main/java/com/zy/acs/hex/utils/StrUtils.java
New file @@ -0,0 +1,62 @@ package com.zy.acs.hex.utils; import com.zy.acs.hex.constant.InfluxDBConstant; import java.util.HashMap; import java.util.Map; public class StrUtils { public static Map<String, String> getTagsByRoutingKey(String routingKey) { // 正则表达式匹配 rcs.up. 开头,后面跟着两个版本号部分 String regex = "^rcs\\.up\\.(\\*|[a-zA-Z0-9]+)\\.(\\*|[a-zA-Z0-9]+)$"; if (routingKey.matches(regex)) { // 分割字符串并返回版本号部分 String[] parts = routingKey.split("\\."); if (parts.length == 4) { Map<String, String> data = new HashMap<>(); data.put(InfluxDBConstant.DEVICE_MEASUREMENT_TAG_TYPE, parts[1]); data.put(InfluxDBConstant.DEVICE_MEASUREMENT_TAG_DEVICEID, parts[2]); data.put(InfluxDBConstant.DEVICE_MEASUREMENT_TAG_EVENT, parts[3]); return data; } } // 如果格式不符合,返回空数组 return null; } public static String getDeviceIdByRoutingKey(String routingKey) { // 正则表达式匹配 rcs.up. 开头,后面跟着两个版本号部分 String regex = "^rcs\\.up\\.(\\*|[a-zA-Z0-9]+)\\.(\\*|[a-zA-Z0-9]+)$"; if (routingKey.matches(regex)) { // 分割字符串并返回版本号部分 String[] parts = routingKey.split("\\."); if (parts.length == 4) { return parts[2]; } } // 如果格式不符合,返回空数组 return null; } public static String getEventByRoutingKey(String routingKey) { // 正则表达式匹配 rcs.up. 开头,后面跟着两个版本号部分 String regex = "^rcs\\.up\\.(\\*|[a-zA-Z0-9]+)\\.(\\*|[a-zA-Z0-9]+)$"; if (routingKey.matches(regex)) { // 分割字符串并返回版本号部分 String[] parts = routingKey.split("\\."); if (parts.length == 4) { return parts[2]; } } // 如果格式不符合,返回空数组 return null; } public static void main(String[] args) { System.out.println(StrUtils.getDeviceIdByRoutingKey("rcs.up.ds1233.2aads")); } } zy-acs-hex/src/main/resources/application.yml
@@ -1,4 +1,3 @@ spring: application: name: rcs-hex @@ -14,9 +13,10 @@ publisher-confirm-type: correlated publisher-returns: true # 消费者配置 listener: direct: acknowledge-mode: manual # listener: # direct: # 确认机制 # acknowledge-mode: manual influxdb3: enabled: true