1
zhang
1 天以前 23182f2c951df5fa55e70e30ff70ddaf91199a2e
1
4个文件已添加
11个文件已修改
370 ■■■■ 已修改文件
component/component-Influxdb/src/main/java/com/zy/component/influxdb/service/InfluxDBService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
component/component-Influxdb/target/classes/com/zy/component/influxdb/service/InfluxDBService.class 补丁 | 查看 | 原始文档 | blame | 历史
component/component-Influxdb/target/component-Influxdb-1.0.0.jar 补丁 | 查看 | 原始文档 | blame | 历史
component/component-Influxdb/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/pom.xml 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/HexApplication.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/constant/InfluxDBConstant.java 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/DownMessageListener.java 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/UpMessageListener.java 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/listener/AbstractListener.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/controller/TestController.java 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/domain/Device.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/utils/ReflectionUtils.java 171 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/utils/StrUtils.java 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/resources/application.yml 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
component/component-Influxdb/src/main/java/com/zy/component/influxdb/service/InfluxDBService.java
@@ -40,7 +40,6 @@
                .setTimestamp(Instant.now().toEpochMilli(), WritePrecision.MS);
        try {
            influxDBClient.writePoint(point);
            System.out.println("Data written to the database.");
        } catch (Exception e) {
            logger.error("Failed to write data to the database.");
            e.printStackTrace();
component/component-Influxdb/target/classes/com/zy/component/influxdb/service/InfluxDBService.class
Binary files differ
component/component-Influxdb/target/component-Influxdb-1.0.0.jar
Binary files differ
component/component-Influxdb/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst
@@ -1,3 +1,3 @@
com\zy\component\influxdb\service\InfluxDBService.class
com\zy\component\influxdb\properties\InfluxDBProperties.class
com\zy\component\influxdb\config\InfluxDBAutoConfiguration.class
com\zy\component\influxdb\properties\InfluxDBProperties.class
zy-acs-hex/pom.xml
@@ -22,11 +22,11 @@
    <dependencies>
    <dependency>
        <groupId>com.zy</groupId>
        <artifactId>acs-common</artifactId>
        <version>1.0.0</version>
    </dependency>
        <dependency>
            <groupId>com.zy</groupId>
            <artifactId>acs-common</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.zy</groupId>
            <artifactId>component-Influxdb</artifactId>
@@ -46,10 +46,10 @@
        </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Lombok -->
        <dependency>
zy-acs-hex/src/main/java/com/zy/acs/hex/HexApplication.java
@@ -5,7 +5,7 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan(basePackages = {"com.zy.component","com.zy.acs"})
@ComponentScan(basePackages = {"com.zy.component", "com.zy.acs"})
@SpringBootApplication
public class HexApplication {
zy-acs-hex/src/main/java/com/zy/acs/hex/constant/InfluxDBConstant.java
New file
@@ -0,0 +1,28 @@
package com.zy.acs.hex.constant;
/**
 * 时序数据库常量类
 *
 * @author ken
 */
public class InfluxDBConstant {
    public static final String DEVICE_MEASUREMENT = "device";
    public static final String DEVICE_MEASUREMENT_TAG_DEVICEID = "deviceId";
    public static final String DEVICE_MEASUREMENT_TAG_EVENT = "event";
    public static final String DEVICE_MEASUREMENT_TAG_TYPE = "type";
    public static class DEVICE_MEASUREMENT_TAG_TYPE_FLAG {
        public static final String DEVICE_MEASUREMENT_TAG_TYPE_UP = "up";
        public static final String DEVICE_MEASUREMENT_TAG_TYPE_DOWN = "down";
    }
}
zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/DownMessageListener.java
@@ -1,12 +1,15 @@
package com.zy.acs.hex.consumer;
import com.rabbitmq.client.Channel;
import com.zy.acs.hex.constant.InfluxDBConstant;
import com.zy.acs.hex.constant.RabbitConstant;
import com.zy.acs.hex.consumer.listener.AbstractListener;
import com.zy.acs.hex.domain.Device;
import com.zy.acs.hex.utils.ReflectionUtils;
import com.zy.acs.hex.utils.StrUtils;
import com.zy.component.influxdb.service.InfluxDBService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
@@ -19,15 +22,10 @@
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = RabbitConstant.TOPIC_QUEUE_DOWN, durable = RabbitConstant.DURABLE),
        exchange = @Exchange(name = RabbitConstant.TOPIC_EXCHANGE,type = RabbitConstant.TOPIC_EXCHANGE_TYPE),
        exchange = @Exchange(name = RabbitConstant.TOPIC_EXCHANGE, type = RabbitConstant.TOPIC_EXCHANGE_TYPE),
        key = RabbitConstant.ROUTING_KEY_DOWN
))
public class DownMessageListener  implements AbstractListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
public class DownMessageListener implements AbstractListener {
    @Autowired
@@ -35,9 +33,9 @@
    @RabbitHandler
    public void handle(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        log.info("receive down message: {}" ,  msg);
    public void handle(Device msg, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey, Channel channel) {
        log.info("routingKey:{},receive down message:{}", routingKey, msg);
        influxDBService.writeData(InfluxDBConstant.DEVICE_MEASUREMENT, StrUtils.getTagsByRoutingKey(routingKey), ReflectionUtils.convertBean2Map(msg));
    }
zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/UpMessageListener.java
@@ -1,18 +1,19 @@
package com.zy.acs.hex.consumer;
import com.rabbitmq.client.Channel;
import com.zy.acs.hex.constant.InfluxDBConstant;
import com.zy.acs.hex.constant.RabbitConstant;
import com.zy.acs.hex.consumer.listener.AbstractListener;
import com.zy.acs.hex.domain.Device;
import com.zy.acs.hex.utils.ReflectionUtils;
import com.zy.acs.hex.utils.StrUtils;
import com.zy.component.influxdb.service.InfluxDBService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * 消费者
@@ -27,30 +28,15 @@
public class UpMessageListener implements AbstractListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private InfluxDBService influxDBService;
    @RabbitHandler
    public void handle(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        log.info("receive up message: {}" ,  msg);
        try {
            channel.basicAck(tag,true);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        // 进入消息消费业务逻辑。
        //String data = new String(message.getBody());
        //log.info("收到消息:{}" ,data );
        //influxDBService.writeData();
    public void handle(Device msg, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey, Channel channel) {
        log.info("routingKey:{},receive up message:{}", routingKey, msg);
        influxDBService.writeData(InfluxDBConstant.DEVICE_MEASUREMENT, StrUtils.getTagsByRoutingKey(routingKey), ReflectionUtils.convertBean2Map(msg));
    }
}
zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/listener/AbstractListener.java
@@ -1,9 +1,10 @@
package com.zy.acs.hex.consumer.listener;
import com.rabbitmq.client.Channel;
import com.zy.acs.hex.domain.Device;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
public interface AbstractListener {
    void handle(String event, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag);
    void handle(Device event, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey, Channel channel);
}
zy-acs-hex/src/main/java/com/zy/acs/hex/controller/TestController.java
@@ -1,10 +1,13 @@
package com.zy.acs.hex.controller;
import com.zy.acs.hex.constant.RabbitConstant;
import com.zy.acs.hex.domain.Device;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
@@ -16,21 +19,27 @@
    /**
     * 发送消息test1
     *
     * @return
     */
    @GetMapping(value = "/test1")
    public void sendTest1() {
        String router = RabbitConstant.ROUTING_KEY_UP.replaceFirst("\\*", "1").replaceFirst("\\*", "2");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE,router,"1212321323");
        Device device = new Device();
        //device.setEvent("online");
        //device.setDeviceId("123");
        device.setProtocol("212121212121212");
        String router = RabbitConstant.ROUTING_KEY_UP.replaceFirst("\\*", "123").replaceFirst("\\*", "online");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE, router, device);
    }
    /**
     * 发送消息test2
     *
     * @return
     */
    @GetMapping(value = "/test2")
    public void sendTest2() {
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE, RabbitConstant.ROUTING_KEY_DOWN,  "qswaqsaasas");
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE, RabbitConstant.ROUTING_KEY_DOWN, "qswaqsaasas");
    }
}
zy-acs-hex/src/main/java/com/zy/acs/hex/domain/Device.java
New file
@@ -0,0 +1,10 @@
package com.zy.acs.hex.domain;
import lombok.Data;
import java.io.Serializable;
@Data
public class Device implements Serializable {
    private String protocol;
}
zy-acs-hex/src/main/java/com/zy/acs/hex/utils/ReflectionUtils.java
New file
@@ -0,0 +1,171 @@
package com.zy.acs.hex.utils;
import com.zy.acs.framework.common.Cools;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReflectionUtils {
    /**
     * 获取一个类和其父类的所有属性
     *
     * @param clazz
     * @return
     */
    public static List<Field> findAllFieldsOfSelfAndSuperClass(Class clazz) {
        Field[] fields = null;
        List fieldList = new ArrayList();
        while (true) {
            if (clazz == null) {
                break;
            } else {
                fields = clazz.getDeclaredFields();
                for (int i = 0; i < fields.length; i++) {
                    fieldList.add(fields[i]);
                }
                clazz = clazz.getSuperclass();
            }
        }
        return fieldList;
    }
    /**
     * 把一个Bean对象转换成Map对象</br>
     *
     * @param obj
     * @param ignores
     * @return
     * @throws IllegalAccessException
     */
    public static Map convertBean2Map(Object obj, String[] ignores) {
        Map map = new HashMap();
        Class clazz = obj.getClass();
        List<Field> fieldList = findAllFieldsOfSelfAndSuperClass(clazz);
        Field field = null;
        try {
            for (int i = 0; i < fieldList.size(); i++) {
                field = fieldList.get(i);
                // 定义fieldName是否在拷贝忽略的范畴内
                boolean flag = false;
                if (ignores != null && ignores.length != 0) {
                    flag = isExistOfIgnores(field.getName(), ignores);
                }
                if (!flag) {
                    Object value = getProperty(obj, field.getName());
                    if (null != value
                            && !Cools.isEmpty(value.toString())) {
                        map.put(field.getName(),
                                getProperty(obj, field.getName()));
                    }
                }
            }
        } catch (SecurityException e) {
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        }
        return map;
    }
    /**
     * 把一个Bean对象转换成Map对象</br>
     *
     * @param obj
     * @return
     */
    public static Map convertBean2Map(Object obj) {
        return convertBean2Map(obj, null);
    }
    public static Map convertBean2MapForIngoreserialVersionUID(Object obj) {
        return convertBean2Map(obj, new String[]{"serialVersionUID"});
    }
    /**
     * 判断fieldName是否是ignores中排除的
     *
     * @param fieldName
     * @param ignores
     * @return
     */
    private static boolean isExistOfIgnores(String fieldName,
                                            String[] ignores) {
        boolean flag = false;
        for (String str : ignores) {
            if (str.equals(fieldName)) {
                flag = true;
                break;
            }
        }
        return flag;
    }
    public static PropertyDescriptor getPropertyDescriptor(Class clazz,
                                                           String propertyName) {
        StringBuffer sb = new StringBuffer();// 构建一个可变字符串用来构建方法名称
        Method setMethod = null;
        Method getMethod = null;
        PropertyDescriptor pd = null;
        try {
            Field f = clazz.getDeclaredField(propertyName);// 根据字段名来获取字段
            if (f != null) {
                // 构建方法的后缀
                String methodEnd = propertyName.substring(0, 1).toUpperCase()
                        + propertyName.substring(1);
                sb.append("set" + methodEnd);// 构建set方法
                setMethod = clazz.getDeclaredMethod(sb.toString(),
                        new Class[]{f.getType()});
                sb.delete(0, sb.length());// 清空整个可变字符串
                sb.append("get" + methodEnd);// 构建get方法
                // 构建get 方法
                getMethod =
                        clazz.getDeclaredMethod(sb.toString(), new Class[]{});
                // 构建一个属性描述器 把对应属性 propertyName 的 get 和 set 方法保存到属性描述器中
                pd = new PropertyDescriptor(propertyName, getMethod, setMethod);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return pd;
    }
    @SuppressWarnings("unchecked")
    public static void setProperty(Object obj, String propertyName,
                                   Object value) {
        Class clazz = obj.getClass();// 获取对象的类型
        PropertyDescriptor pd = getPropertyDescriptor(clazz, propertyName);// 获取 clazz
        // 类型中的
        // propertyName
        // 的属性描述器
        Method setMethod = pd.getWriteMethod();// 从属性描述器中获取 set 方法
        try {
            setMethod.invoke(obj, new Object[]{value});// 调用 set 方法将传入的value值保存属性中去
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @SuppressWarnings({"unchecked", "rawtypes"})
    public static Object getProperty(Object obj, String propertyName) {
        Class clazz = obj.getClass();// 获取对象的类型
        PropertyDescriptor pd = getPropertyDescriptor(clazz, propertyName);// 获取 clazz
        // 类型中的
        // propertyName
        // 的属性描述器
        Method getMethod = pd.getReadMethod();// 从属性描述器中获取 get 方法
        Object value = null;
        try {
            value = getMethod.invoke(obj, new Object[]{});// 调用方法获取方法的返回值
        } catch (Exception e) {
            e.printStackTrace();
        }
        return value;// 返回值
    }
}
zy-acs-hex/src/main/java/com/zy/acs/hex/utils/StrUtils.java
New file
@@ -0,0 +1,62 @@
package com.zy.acs.hex.utils;
import com.zy.acs.hex.constant.InfluxDBConstant;
import java.util.HashMap;
import java.util.Map;
public class StrUtils {
    public static Map<String, String> getTagsByRoutingKey(String routingKey) {
        // 正则表达式匹配 rcs.up. 开头,后面跟着两个版本号部分
        String regex = "^rcs\\.up\\.(\\*|[a-zA-Z0-9]+)\\.(\\*|[a-zA-Z0-9]+)$";
        if (routingKey.matches(regex)) {
            // 分割字符串并返回版本号部分
            String[] parts = routingKey.split("\\.");
            if (parts.length == 4) {
                Map<String, String> data = new HashMap<>();
                data.put(InfluxDBConstant.DEVICE_MEASUREMENT_TAG_TYPE, parts[1]);
                data.put(InfluxDBConstant.DEVICE_MEASUREMENT_TAG_DEVICEID, parts[2]);
                data.put(InfluxDBConstant.DEVICE_MEASUREMENT_TAG_EVENT, parts[3]);
                return data;
            }
        }
        // 如果格式不符合,返回空数组
        return null;
    }
    public static String getDeviceIdByRoutingKey(String routingKey) {
        // 正则表达式匹配 rcs.up. 开头,后面跟着两个版本号部分
        String regex = "^rcs\\.up\\.(\\*|[a-zA-Z0-9]+)\\.(\\*|[a-zA-Z0-9]+)$";
        if (routingKey.matches(regex)) {
            // 分割字符串并返回版本号部分
            String[] parts = routingKey.split("\\.");
            if (parts.length == 4) {
                return parts[2];
            }
        }
        // 如果格式不符合,返回空数组
        return null;
    }
    public static String getEventByRoutingKey(String routingKey) {
        // 正则表达式匹配 rcs.up. 开头,后面跟着两个版本号部分
        String regex = "^rcs\\.up\\.(\\*|[a-zA-Z0-9]+)\\.(\\*|[a-zA-Z0-9]+)$";
        if (routingKey.matches(regex)) {
            // 分割字符串并返回版本号部分
            String[] parts = routingKey.split("\\.");
            if (parts.length == 4) {
                return parts[2];
            }
        }
        // 如果格式不符合,返回空数组
        return null;
    }
    public static void main(String[] args) {
        System.out.println(StrUtils.getDeviceIdByRoutingKey("rcs.up.ds1233.2aads"));
    }
}
zy-acs-hex/src/main/resources/application.yml
@@ -1,4 +1,3 @@
spring:
  application:
    name: rcs-hex
@@ -14,9 +13,10 @@
    publisher-confirm-type: correlated
    publisher-returns: true
    # 消费者配置
    listener:
      direct:
        acknowledge-mode: manual
#    listener:
#      direct:
#    确认机制
#        acknowledge-mode: manual
influxdb3:
  enabled: true