| zy-acs-hex/pom.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hex/src/main/java/com/zy/Main.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hex/src/main/java/com/zy/acs/hex/HexApplication.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hex/src/main/java/com/zy/acs/hex/config/MessageConverterConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hex/src/main/java/com/zy/acs/hex/config/RabbitMQConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/MessageListener.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| zy-acs-hex/src/main/resources/application.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
zy-acs-hex/pom.xml
@@ -12,9 +12,60 @@ <artifactId>zy-acs-hex</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.5.3</spring-boot.version> <fastjson.version>1.2.58</fastjson.version> </properties> <dependencies> <dependency> <groupId>com.zy</groupId> <artifactId>acs-common</artifactId> <version>1.0.0</version> </dependency> <!-- SpringBoot Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- SpringBoot RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> </dependencies> <build> <finalName>rcs-hex</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> zy-acs-hex/src/main/java/com/zy/Main.java
File was deleted zy-acs-hex/src/main/java/com/zy/acs/hex/HexApplication.java
New file @@ -0,0 +1,14 @@ package com.zy.acs.hex; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class HexApplication { public static void main(String[] args) { SpringApplication.run(HexApplication.class, args); } } zy-acs-hex/src/main/java/com/zy/acs/hex/config/MessageConverterConfig.java
New file @@ -0,0 +1,26 @@ package com.zy.acs.hex.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息转换器配置,用于消息的序列化和反序列化 * * @author ken */ @Configuration public class MessageConverterConfig { /** * 配置Fastjson2消息转换器 * * @return 消息转换器 */ @Bean public MessageConverter fastJsonMessageConverter() { return new Jackson2JsonMessageConverter(); } } zy-acs-hex/src/main/java/com/zy/acs/hex/config/RabbitMQConfig.java
New file @@ -0,0 +1,106 @@ package com.zy.acs.hex.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ核心配置类,定义交换机、队列和绑定关系 * * @author ken */ @Configuration public class RabbitMQConfig { // ========================== 主题模式 ========================== /** * 主题交换机名称 */ public static final String TOPIC_EXCHANGE = "rcs_topic_exchange"; /** * 主题队列1(上行) */ public static final String TOPIC_QUEUE_UP = "TOPIC_QUEUE_UP"; /** * 主题队列2(下行) */ public static final String TOPIC_QUEUE_DOWN = "TOPIC_QUEUE_DOWN"; // 属性 服务 事件 //Properties, services, and events /** * 上行路由键前缀,第一个代表设备编号,第二个代表类型 */ public static final String ROUTING_KEY_UP = "rcs.up.*.*"; /** * 下行路由键前缀,第一个代表设备编号,第二个代表类型 */ public static final String ROUTING_KEY_DOWN = "rcs.down.*.*"; /** * 创建主题交换机 * * @return 交换机实例 */ // @Bean // public TopicExchange topicExchange() { // return new TopicExchange(TOPIC_EXCHANGE, true, false); // } // /** // * 创建上行主题队列 // * // * @return 队列实例 // */ // @Bean // public Queue topicQueueUp() { // return QueueBuilder.durable(TOPIC_QUEUE_UP) // .autoDelete() // .exclusive() // .build(); // } // // /** // * 创建下行主题队列 // * // * @return 队列实例 // */ // @Bean // public Queue topicQueueDown() { // return QueueBuilder.durable(TOPIC_QUEUE_DOWN) // .exclusive() // .build(); // } // // // // /** // * 绑定上行队列到主题交换机 // * // * @return 绑定关系 // */ // @Bean // public Binding topicBindingOrder() { // return BindingBuilder.bind(topicQueueUp()) // .to(topicExchange()) // .with(ROUTING_KEY_UP); // } // // /** // * 绑定下行队列到主题交换机 // * // * @return 绑定关系 // */ // @Bean // public Binding topicBindingUser() { // return BindingBuilder.bind(topicQueueDown()) // .to(topicExchange()) // .with(ROUTING_KEY_DOWN); // } } zy-acs-hex/src/main/java/com/zy/acs/hex/consumer/MessageListener.java
New file @@ -0,0 +1,49 @@ package com.zy.acs.hex.consumer; import com.zy.acs.hex.config.RabbitMQConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; /** * 消费者 */ @Slf4j @Component public class MessageListener { public static final String durable = "true"; @RabbitListener(bindings = @QueueBinding( value = @Queue(name = RabbitMQConfig.TOPIC_QUEUE_UP, durable = durable), exchange = @Exchange(name = RabbitMQConfig.TOPIC_EXCHANGE), key = RabbitMQConfig.ROUTING_KEY_UP )) public void receiveFromUpData(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, RabbitProperties.Cache.Channel channel) throws IOException { log.info("receive message: {}" + "msgId:" + message.getMessageProperties().getMessageId()); // 进入消息消费业务逻辑。 System.out.println("收到消息:" + new String(message.getBody())); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = RabbitMQConfig.TOPIC_QUEUE_DOWN, durable = durable), exchange = @Exchange(name = RabbitMQConfig.TOPIC_EXCHANGE), key = RabbitMQConfig.ROUTING_KEY_DOWN )) public void receiveFromDownData(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, RabbitProperties.Cache.Channel channel) throws IOException { log.info("receive message: {}" + "msgId:" + message.getMessageProperties().getMessageId()); // 进入消息消费业务逻辑。 System.out.println("收到消息:" + new String(message.getBody())); } } zy-acs-hex/src/main/resources/application.yml
New file @@ -0,0 +1,27 @@ spring: application: name: rcs-hex # RabbitMQ配置 rabbitmq: host: localhost port: 5672 username: admin password: 123456 virtual-host: / # 生产者确认配置 publisher-confirm-type: correlated publisher-returns: true # 消费者配置 listener: simple: # 手动确认模式 acknowledge-mode: manual # 消费者线程数 concurrency: 1 # 最大消费者线程数 max-concurrency: 5 # 每次从队列中获取的消息数量 prefetch: 1 direct: acknowledge-mode: manual