1
zhang
5 天以前 e7e980a6276f2986564dfc0afaaf592c592b913b
1
1个文件已删除
5个文件已添加
1个文件已修改
294 ■■■■■ 已修改文件
zy-acs-hex/pom.xml 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/Main.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/HexApplication.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/config/MessageConverterConfig.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/config/RabbitMQConfig.java 106 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/MessageListener.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/src/main/resources/application.yml 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hex/pom.xml
@@ -12,9 +12,60 @@
    <artifactId>zy-acs-hex</artifactId>
    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.5.3</spring-boot.version>
        <fastjson.version>1.2.58</fastjson.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>com.zy</groupId>
        <artifactId>acs-common</artifactId>
        <version>1.0.0</version>
    </dependency>
        <!-- SpringBoot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- SpringBoot RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>rcs-hex</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
zy-acs-hex/src/main/java/com/zy/Main.java
File was deleted
zy-acs-hex/src/main/java/com/zy/acs/hex/HexApplication.java
New file
@@ -0,0 +1,14 @@
package com.zy.acs.hex;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class HexApplication {
    public static void main(String[] args) {
        SpringApplication.run(HexApplication.class, args);
    }
}
zy-acs-hex/src/main/java/com/zy/acs/hex/config/MessageConverterConfig.java
New file
@@ -0,0 +1,26 @@
package com.zy.acs.hex.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 消息转换器配置,用于消息的序列化和反序列化
 *
 * @author ken
 */
@Configuration
public class MessageConverterConfig {
    /**
     * 配置Fastjson2消息转换器
     *
     * @return 消息转换器
     */
    @Bean
    public MessageConverter fastJsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
zy-acs-hex/src/main/java/com/zy/acs/hex/config/RabbitMQConfig.java
New file
@@ -0,0 +1,106 @@
package com.zy.acs.hex.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * RabbitMQ核心配置类,定义交换机、队列和绑定关系
 *
 * @author ken
 */
@Configuration
public class RabbitMQConfig {
    // ========================== 主题模式 ==========================
    /**
     * 主题交换机名称
     */
    public static final String TOPIC_EXCHANGE = "rcs_topic_exchange";
    /**
     * 主题队列1(上行)
     */
    public static final String TOPIC_QUEUE_UP = "TOPIC_QUEUE_UP";
    /**
     * 主题队列2(下行)
     */
    public static final String TOPIC_QUEUE_DOWN = "TOPIC_QUEUE_DOWN";
    // 属性  服务  事件
    //Properties, services, and events
    /**
     * 上行路由键前缀,第一个代表设备编号,第二个代表类型
     */
    public static final String ROUTING_KEY_UP = "rcs.up.*.*";
    /**
     * 下行路由键前缀,第一个代表设备编号,第二个代表类型
     */
    public static final String ROUTING_KEY_DOWN = "rcs.down.*.*";
    /**
     * 创建主题交换机
     *
     * @return 交换机实例
     */
//    @Bean
//    public TopicExchange topicExchange() {
//        return new TopicExchange(TOPIC_EXCHANGE, true, false);
//    }
//    /**
//     * 创建上行主题队列
//     *
//     * @return 队列实例
//     */
//    @Bean
//    public Queue topicQueueUp() {
//        return QueueBuilder.durable(TOPIC_QUEUE_UP)
//                .autoDelete()
//                .exclusive()
//                .build();
//    }
//
//    /**
//     * 创建下行主题队列
//     *
//     * @return 队列实例
//     */
//    @Bean
//    public Queue topicQueueDown() {
//        return QueueBuilder.durable(TOPIC_QUEUE_DOWN)
//                .exclusive()
//                .build();
//    }
//
//
//
//    /**
//     * 绑定上行队列到主题交换机
//     *
//     * @return 绑定关系
//     */
//    @Bean
//    public Binding topicBindingOrder() {
//        return BindingBuilder.bind(topicQueueUp())
//                .to(topicExchange())
//                .with(ROUTING_KEY_UP);
//    }
//
//    /**
//     * 绑定下行队列到主题交换机
//     *
//     * @return 绑定关系
//     */
//    @Bean
//    public Binding topicBindingUser() {
//        return BindingBuilder.bind(topicQueueDown())
//                .to(topicExchange())
//                .with(ROUTING_KEY_DOWN);
//    }
}
zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/MessageListener.java
New file
@@ -0,0 +1,49 @@
package com.zy.acs.hex.consumer;
import com.zy.acs.hex.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * 消费者
 */
@Slf4j
@Component
public class MessageListener {
    public static final String durable = "true";
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = RabbitMQConfig.TOPIC_QUEUE_UP, durable = durable),
            exchange = @Exchange(name = RabbitMQConfig.TOPIC_EXCHANGE),
            key = RabbitMQConfig.ROUTING_KEY_UP
    ))
    public void receiveFromUpData(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, RabbitProperties.Cache.Channel channel) throws IOException {
        log.info("receive message: {}" + "msgId:" + message.getMessageProperties().getMessageId());
        // 进入消息消费业务逻辑。
        System.out.println("收到消息:" + new String(message.getBody()));
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = RabbitMQConfig.TOPIC_QUEUE_DOWN, durable = durable),
            exchange = @Exchange(name = RabbitMQConfig.TOPIC_EXCHANGE),
            key = RabbitMQConfig.ROUTING_KEY_DOWN
    ))
    public void receiveFromDownData(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, RabbitProperties.Cache.Channel channel) throws IOException {
        log.info("receive message: {}" + "msgId:" + message.getMessageProperties().getMessageId());
        // 进入消息消费业务逻辑。
        System.out.println("收到消息:" + new String(message.getBody()));
    }
}
zy-acs-hex/src/main/resources/application.yml
New file
@@ -0,0 +1,27 @@
spring:
  application:
    name: rcs-hex
  # RabbitMQ配置
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
    virtual-host: /
    # 生产者确认配置
    publisher-confirm-type: correlated
    publisher-returns: true
    # 消费者配置
    listener:
      simple:
        # 手动确认模式
        acknowledge-mode: manual
        # 消费者线程数
        concurrency: 1
        # 最大消费者线程数
        max-concurrency: 5
        # 每次从队列中获取的消息数量
        prefetch: 1
      direct:
        acknowledge-mode: manual