1.扇形交换机
定义扇形交换机和队列
- package com.macro.mall.portal.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * 扇形交换机测试
- */
- @Configuration
- public class RabbitMqFanoutQueueConfig {
-
- //=================== fanout 模式下,所发的消息属于广播模式 ====================
-
- /**
- * 定义队列 fanout.a fanout.b fanout.c
- */
- @Bean
- public Queue fanoutA() {
- return new Queue("fanout.a");
- }
-
- @Bean
- public Queue fanoutB() {
- return new Queue("fanout.b");
- }
-
- @Bean
- public Queue fanoutC() {
- return new Queue("fanout.c");
- }
-
-
- /**
- * 定义个fanout交换器
- */
- @Bean
- FanoutExchange fanoutExchange() {
- // 定义一个名为fanoutExchange的fanout交换器
- return new FanoutExchange("fanoutExchange");
- }
-
- /**
- * 将队列fanout.a fanout.b fanout.c 分别 与fanout交换器绑定
- */
- @Bean
- public Binding bindingExchangeWithA() {
- return BindingBuilder.bind(fanoutA()).to(fanoutExchange());
- }
-
- @Bean
- public Binding bindingExchangeWithB() {
- return BindingBuilder.bind(fanoutB()).to(fanoutExchange());
- }
-
- @Bean
- public Binding bindingExchangeWithC() {
- return BindingBuilder.bind(fanoutC()).to(fanoutExchange());
- }
-
- }
定义扇形交换机发送端,发送时,第二个参数是路由,不需要设置
- @Autowired
- private AmqpTemplate amqpTemplate;
-
- @PostMapping("/fanoutMsg")
- @Operation(summary = "发送扇形消息", description = "发送扇形消息")
- public String sendMsg() {
- amqpTemplate.convertAndSend("fanoutExchange","","扇形交换机消息");
- return "ok";
- }
定义扇形交换机接收端
- package com.macro.mall.portal.component;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * @Description 扇形消息接收
- * @Author clj
- * @Date 2023/11/3 16:57
- */
- @Component
- @Slf4j
- public class FanoutReceive {
-
- @RabbitListener(queues = "fanout.a")
- public void consumers(String msg) {
- log.info("[faount.a] recvice,{}",msg);
- }
-
- @RabbitListener(queues = "fanout.b")
- public void consumers2(String msg) {
- log.info("[faount.b] recvice,{}",msg);
- }
-
- @RabbitListener(queues = "fanout.c")
- public void consumers3(String msg) {
- log.info("[faount.c] recvice,{}",msg);
- }
- }
当点击发送后,以上三个方法都会接受到消息,不需要路由。
2主题交换机
定义交换机和队列,其中路由可以根据规则匹配,*表示匹配一个任意字符,#表示一个或多个
- package com.macro.mall.portal.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * topic消息测试
- * @Author clj
- * @Date 2023/11/6 16:52
- */
- @Configuration
- public class RabbitmqTopicQueueConfig {
-
- @Bean
- public TopicExchange topicExchange () {
- return new TopicExchange("topicExchange");
- }
-
-
- @Bean
- public Queue topicA() {
- return new Queue("topic.a");
- }
-
- @Bean Queue topicB() {
- return new Queue("topic.b");
- }
-
- @Bean Queue topicC() {
- return new Queue("topic.c");
- }
-
- @Bean
- public Binding bindingTopicA() {
- return BindingBuilder.bind(topicA()).to(topicExchange()).with("topic.a");
- }
-
- @Bean
- public Binding bindingTopicB() {
- return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.*.msg");
- }
-
-
- @Bean
- public Binding bindingTopicC() {
- return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic.msg.#");
- }
-
- }
定义主题交换机发送端
- @PostMapping("/topicMsg")
- @Operation(summary = "发送主题交换机精确匹配a", description = "发送主题交换机精确匹配a")
- public String sendTopicMsg() {
- amqpTemplate.convertAndSend("topicExchange","topic.a","发送主题交换机a");
- return "ok";
- }
-
- @PostMapping("/topicMsg1")
- @Operation(summary = "发送主题交换机精确匹配b。匹配*号", description = "发送主题交换机精确匹配cb")
- public String sendTopicMsg1() {
- amqpTemplate.convertAndSend("topicExchange","topic.1.msg","发送主题交换机b");
- return "ok";
- }
-
- @PostMapping("/topicMsg9")
- @Operation(summary = "发送主题交换机精确匹配b。匹配*号", description = "发送主题交换机精确匹配cb")
- public String sendTopicMsg9() {
- amqpTemplate.convertAndSend("topicExchange","topic.2.msg","发送主题交换机b");
- return "ok";
- }
-
- @PostMapping("/topicMsg2")
- @Operation(summary = "发送主题交换机路由匹配c,匹配#号1", description = "发送主题交换机路由匹配")
- public String sendTopicMs2g() {
- amqpTemplate.convertAndSend("topicExchange","topic.msg.1","发送主题交换机c");
- return "ok";
- }
- @PostMapping("/topicMsg3")
- @Operation(summary = "发送主题交换机路由匹配c,匹配#号2", description = "发送主题交换机路由匹配")
- public String sendTopicMs3g() {
- amqpTemplate.convertAndSend("topicExchange","topic.msg.2","发送主题交换机c");
- return "ok";
- }
-
- @PostMapping("/topicMsg4")
- @Operation(summary = "发送主题交换机路由匹配c,匹配#号4", description = "发送主题交换机路由匹配")
- public String sendTopicMs4g() {
- amqpTemplate.convertAndSend("topicExchange","topic.msg.abcdefg","发送主题交换机c");
- return "ok";
- }
定义主题交换机接收端
- package com.macro.mall.portal.component;
-
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.messaging.handler.annotation.Header;
-
- import java.io.IOException;
-
- /**
- * @Description
- * @Author clj
- * @Date 2023/11/6 17:01
- */
- @Configuration
- @Slf4j
- public class topicReceive {
-
- @RabbitListener(queues = "topic.a")
- public void execute(String msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
- log.info("topicA接收精确匹配消息,{}",msg);
- channel.basicAck(tag,false);
- }
-
- @RabbitListener(queues = "topic.b")
- public void execute1(String msg) {
- log.info("topicB接收*号匹配消息,{}",msg);
- }
-
-
- @RabbitListener(queues = "topic.c")
- public void execute3(String msg) {
- log.info("topicC接收#匹配消息,{}",msg);
- }
- }