• rabbitmq 交换机相关实例代码


    1.扇形交换机

        定义扇形交换机和队列

    1. package com.macro.mall.portal.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.FanoutExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. /**
    9. * 扇形交换机测试
    10. */
    11. @Configuration
    12. public class RabbitMqFanoutQueueConfig {
    13. //=================== fanout 模式下,所发的消息属于广播模式 ====================
    14. /**
    15. * 定义队列 fanout.a fanout.b fanout.c
    16. */
    17. @Bean
    18. public Queue fanoutA() {
    19. return new Queue("fanout.a");
    20. }
    21. @Bean
    22. public Queue fanoutB() {
    23. return new Queue("fanout.b");
    24. }
    25. @Bean
    26. public Queue fanoutC() {
    27. return new Queue("fanout.c");
    28. }
    29. /**
    30. * 定义个fanout交换器
    31. */
    32. @Bean
    33. FanoutExchange fanoutExchange() {
    34. // 定义一个名为fanoutExchange的fanout交换器
    35. return new FanoutExchange("fanoutExchange");
    36. }
    37. /**
    38. * 将队列fanout.a fanout.b fanout.c 分别 与fanout交换器绑定
    39. */
    40. @Bean
    41. public Binding bindingExchangeWithA() {
    42. return BindingBuilder.bind(fanoutA()).to(fanoutExchange());
    43. }
    44. @Bean
    45. public Binding bindingExchangeWithB() {
    46. return BindingBuilder.bind(fanoutB()).to(fanoutExchange());
    47. }
    48. @Bean
    49. public Binding bindingExchangeWithC() {
    50. return BindingBuilder.bind(fanoutC()).to(fanoutExchange());
    51. }
    52. }

     定义扇形交换机发送端,发送时,第二个参数是路由,不需要设置

    1. @Autowired
    2. private AmqpTemplate amqpTemplate;
    3. @PostMapping("/fanoutMsg")
    4. @Operation(summary = "发送扇形消息", description = "发送扇形消息")
    5. public String sendMsg() {
    6. amqpTemplate.convertAndSend("fanoutExchange","","扇形交换机消息");
    7. return "ok";
    8. }

    定义扇形交换机接收端

    1. package com.macro.mall.portal.component;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Component;
    5. /**
    6. * @Description 扇形消息接收
    7. * @Author clj
    8. * @Date 2023/11/3 16:57
    9. */
    10. @Component
    11. @Slf4j
    12. public class FanoutReceive {
    13. @RabbitListener(queues = "fanout.a")
    14. public void consumers(String msg) {
    15. log.info("[faount.a] recvice,{}",msg);
    16. }
    17. @RabbitListener(queues = "fanout.b")
    18. public void consumers2(String msg) {
    19. log.info("[faount.b] recvice,{}",msg);
    20. }
    21. @RabbitListener(queues = "fanout.c")
    22. public void consumers3(String msg) {
    23. log.info("[faount.c] recvice,{}",msg);
    24. }
    25. }

    当点击发送后,以上三个方法都会接受到消息,不需要路由。

    2主题交换机

       定义交换机和队列,其中路由可以根据规则匹配,*表示匹配一个任意字符,#表示一个或多个

    1. package com.macro.mall.portal.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.Queue;
    5. import org.springframework.amqp.core.TopicExchange;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. /**
    9. * topic消息测试
    10. * @Author clj
    11. * @Date 2023/11/6 16:52
    12. */
    13. @Configuration
    14. public class RabbitmqTopicQueueConfig {
    15. @Bean
    16. public TopicExchange topicExchange () {
    17. return new TopicExchange("topicExchange");
    18. }
    19. @Bean
    20. public Queue topicA() {
    21. return new Queue("topic.a");
    22. }
    23. @Bean Queue topicB() {
    24. return new Queue("topic.b");
    25. }
    26. @Bean Queue topicC() {
    27. return new Queue("topic.c");
    28. }
    29. @Bean
    30. public Binding bindingTopicA() {
    31. return BindingBuilder.bind(topicA()).to(topicExchange()).with("topic.a");
    32. }
    33. @Bean
    34. public Binding bindingTopicB() {
    35. return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.*.msg");
    36. }
    37. @Bean
    38. public Binding bindingTopicC() {
    39. return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic.msg.#");
    40. }
    41. }

     定义主题交换机发送端

    1. @PostMapping("/topicMsg")
    2. @Operation(summary = "发送主题交换机精确匹配a", description = "发送主题交换机精确匹配a")
    3. public String sendTopicMsg() {
    4. amqpTemplate.convertAndSend("topicExchange","topic.a","发送主题交换机a");
    5. return "ok";
    6. }
    7. @PostMapping("/topicMsg1")
    8. @Operation(summary = "发送主题交换机精确匹配b。匹配*号", description = "发送主题交换机精确匹配cb")
    9. public String sendTopicMsg1() {
    10. amqpTemplate.convertAndSend("topicExchange","topic.1.msg","发送主题交换机b");
    11. return "ok";
    12. }
    13. @PostMapping("/topicMsg9")
    14. @Operation(summary = "发送主题交换机精确匹配b。匹配*号", description = "发送主题交换机精确匹配cb")
    15. public String sendTopicMsg9() {
    16. amqpTemplate.convertAndSend("topicExchange","topic.2.msg","发送主题交换机b");
    17. return "ok";
    18. }
    19. @PostMapping("/topicMsg2")
    20. @Operation(summary = "发送主题交换机路由匹配c,匹配#号1", description = "发送主题交换机路由匹配")
    21. public String sendTopicMs2g() {
    22. amqpTemplate.convertAndSend("topicExchange","topic.msg.1","发送主题交换机c");
    23. return "ok";
    24. }
    25. @PostMapping("/topicMsg3")
    26. @Operation(summary = "发送主题交换机路由匹配c,匹配#号2", description = "发送主题交换机路由匹配")
    27. public String sendTopicMs3g() {
    28. amqpTemplate.convertAndSend("topicExchange","topic.msg.2","发送主题交换机c");
    29. return "ok";
    30. }
    31. @PostMapping("/topicMsg4")
    32. @Operation(summary = "发送主题交换机路由匹配c,匹配#号4", description = "发送主题交换机路由匹配")
    33. public String sendTopicMs4g() {
    34. amqpTemplate.convertAndSend("topicExchange","topic.msg.abcdefg","发送主题交换机c");
    35. return "ok";
    36. }

    定义主题交换机接收端

    1. package com.macro.mall.portal.component;
    2. import com.rabbitmq.client.Channel;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.amqp.support.AmqpHeaders;
    6. import org.springframework.context.annotation.Configuration;
    7. import org.springframework.messaging.handler.annotation.Header;
    8. import java.io.IOException;
    9. /**
    10. * @Description
    11. * @Author clj
    12. * @Date 2023/11/6 17:01
    13. */
    14. @Configuration
    15. @Slf4j
    16. public class topicReceive {
    17. @RabbitListener(queues = "topic.a")
    18. public void execute(String msg, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    19. log.info("topicA接收精确匹配消息,{}",msg);
    20. channel.basicAck(tag,false);
    21. }
    22. @RabbitListener(queues = "topic.b")
    23. public void execute1(String msg) {
    24. log.info("topicB接收*号匹配消息,{}",msg);
    25. }
    26. @RabbitListener(queues = "topic.c")
    27. public void execute3(String msg) {
    28. log.info("topicC接收#匹配消息,{}",msg);
    29. }
    30. }

  • 相关阅读:
    【笔记1-2】Qt系列:QkeyEvent 键盘事件 设定快捷键
    【python】anaconda使用指南
    如何评价GPT-4o?
    uint 与 int 相加,事与愿违?
    pytorch深度学习实战lesson8
    如何体验最新GPT-4o模型?
    【Linux】 ls命令使用
    封装unordered_map和unordered_set
    pycharm2022.2 远程连接服务器调试代码
    算法打卡day31|贪心算法篇05|Leetcode 435. 无重叠区间、763.划分字母区间、56. 合并区间
  • 原文地址:https://blog.csdn.net/b452608/article/details/134265801