• RabbitMQ【直连、主题、扇形交换机实战】


    目录

    1. 直连交换机(Direct实战)

         provider生产者(publisher)

         consumer消费者

    2. 主题交换机(Topic实战)

          provider生产者(publisher)

          consumer消费者

    3. 扇形交换机(Fanout实战)

          provider生产者(publisher)

          consumer消费者


    前言

       想学习RabbitMQ基础的请阅读下边博文链接

       RabbitMQ【基本使用】_JoneClassMate的博客-CSDN博客


    1.  直连交换机(Direct实战)

          provider生产者(publisher)

    • DirectConfig

    1. package com.jmh.provider.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. /**
    9. * @author 蒋明辉
    10. * @data 2022/11/25 19:02
    11. */
    12. @Configuration
    13. @SuppressWarnings("all")
    14. public class DirectConfig {
    15. /**
    16. * 创建队列
    17. */
    18. @Bean
    19. public Queue directQueueA(){
    20. return new Queue("directQueueA",true);
    21. }
    22. @Bean
    23. public Queue directQueueB(){
    24. return new Queue("directQueueB",true);
    25. }
    26. @Bean
    27. public Queue directQueueC(){
    28. return new Queue("directQueueC",true);
    29. }
    30. /**
    31. * 创建交换机
    32. */
    33. @Bean
    34. public DirectExchange directExchange(){
    35. return new DirectExchange("directExchange");
    36. }
    37. /**
    38. * 设置队列和交换机的绑定
    39. */
    40. @Bean
    41. public Binding bindingA(){
    42. return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA");
    43. }
    44. @Bean
    45. public Binding bindingB(){
    46. return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB");
    47. }
    48. @Bean
    49. public Binding bindingC(){
    50. return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC");
    51. }
    52. }
    •  controller
    1. package com.jmh.provider.controller;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:08
    9. */
    10. @RestController
    11. @SuppressWarnings("all")
    12. public class ProviderController {
    13. @Autowired
    14. private RabbitTemplate rabbitTemplate;
    15. /**
    16. * 直连交换机
    17. * @param key
    18. * @return
    19. */
    20. @RequestMapping("/directSend")
    21. public String directSend(String key){
    22. rabbitTemplate.convertAndSend("directExchange",key,"Hello World");
    23. return "yes";
    24. }
    25. }

         consumer消费者

    • DirectReceiverA

    1. package com.jmh.consumer.listener;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:12
    9. */
    10. @Component
    11. @SuppressWarnings("all")
    12. @RabbitListener(queues = "directQueueA")
    13. @Slf4j
    14. public class DirectReceiverA {
    15. @RabbitHandler
    16. public void info(String msg){
    17. log.info("A接收到了"+msg);
    18. }
    19. }
    •  DirectReceiverB
    1. package com.jmh.consumer.listener;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:12
    9. */
    10. @Component
    11. @SuppressWarnings("all")
    12. @RabbitListener(queues = "directQueueB")
    13. @Slf4j
    14. public class DirectReceiverB {
    15. @RabbitHandler
    16. public void info(String msg){
    17. log.info("B接收到了"+msg);
    18. }
    19. }
    •  DirectReceiverC
    1. package com.jmh.consumer.listener;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:12
    9. */
    10. @Component
    11. @SuppressWarnings("all")
    12. @RabbitListener(queues = "directQueueC")
    13. @Slf4j
    14. public class DirectReceiverC {
    15. @RabbitHandler
    16. public void info(String msg){
    17. log.info("C接收到了"+msg);
    18. }
    19. }

    2. 主题交换机(Topic实战)  

     

          provider生产者(publisher)

    • TopicConfig

    1. package com.jmh.provider.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. /**
    6. * @author 蒋明辉
    7. * @data 2022/11/25 19:02
    8. */
    9. @Configuration
    10. @SuppressWarnings("all")
    11. public class TopicConfig {
    12. private static final String KEY_A="*.a.*";
    13. private static final String KEY_B="*.*.a";
    14. private static final String KEY_C="a.#";
    15. /**
    16. * 创建队列
    17. */
    18. @Bean
    19. public Queue topicQueueA(){
    20. return new Queue("topicQueueA",true);
    21. }
    22. @Bean
    23. public Queue topicQueueB(){
    24. return new Queue("topicQueueB",true);
    25. }
    26. @Bean
    27. public Queue topicQueueC(){
    28. return new Queue("topicQueueC",true);
    29. }
    30. /**
    31. * 创建交换机
    32. */
    33. @Bean
    34. public TopicExchange topicExchange(){
    35. return new TopicExchange("topicExchange");
    36. }
    37. /**
    38. * 设置队列和交换机的绑定
    39. */
    40. @Bean
    41. public Binding topicBindingA(){
    42. return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
    43. }
    44. @Bean
    45. public Binding topicBindingB(){
    46. return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
    47. }
    48. @Bean
    49. public Binding topicBindingC(){
    50. return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
    51. }
    52. }
    •  controller
    1. package com.jmh.provider.controller;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:08
    9. */
    10. @RestController
    11. @SuppressWarnings("all")
    12. public class ProviderController {
    13. @Autowired
    14. private RabbitTemplate rabbitTemplate;
    15. /**
    16. * 主题交换机
    17. * @param key
    18. * @return
    19. */
    20. @RequestMapping("/topicSend")
    21. public String topicSend(String key){
    22. rabbitTemplate.convertAndSend("topicExchange",key,"Hello World");
    23. return "yes";
    24. }
    25. }

          consumer消费者

    • TopicReceiverA

    1. package com.jmh.consumer.listener;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:12
    9. */
    10. @Component
    11. @SuppressWarnings("all")
    12. @RabbitListener(queues = "topicQueueA")
    13. @Slf4j
    14. public class TopicReceiverA {
    15. @RabbitHandler
    16. public void info(String msg){
    17. log.info("A接收到了"+msg);
    18. }
    19. }
    •  TopicReceiverB
    1. package com.jmh.consumer.listener;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:12
    9. */
    10. @Component
    11. @SuppressWarnings("all")
    12. @RabbitListener(queues = "topicQueueB")
    13. @Slf4j
    14. public class TopicReceiverB {
    15. @RabbitHandler
    16. public void info(String msg){
    17. log.info("B接收到了"+msg);
    18. }
    19. }
    •  TopicReceiverC
    1. package com.jmh.consumer.listener;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:12
    9. */
    10. @Component
    11. @SuppressWarnings("all")
    12. @RabbitListener(queues = "topicQueueC")
    13. @Slf4j
    14. public class TopicReceiverC {
    15. @RabbitHandler
    16. public void info(String msg){
    17. log.info("C接收到了"+msg);
    18. }
    19. }

    3. 扇形交换机(Fanout实战)

          provider生产者(publisher)

    • FanoutConfig

    1. package com.jmh.provider.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. /**
    6. * @author 蒋明辉
    7. * @data 2022/11/25 19:02
    8. */
    9. @Configuration
    10. @SuppressWarnings("all")
    11. public class FanoutConfig {
    12. /**
    13. * 创建队列
    14. */
    15. @Bean
    16. public Queue fanoutQueueA(){
    17. return new Queue("fanoutQueueA",true);
    18. }
    19. @Bean
    20. public Queue fanoutQueueB(){
    21. return new Queue("fanoutQueueB",true);
    22. }
    23. @Bean
    24. public Queue fanoutQueueC(){
    25. return new Queue("fanoutQueueC",true);
    26. }
    27. /**
    28. * 创建交换机
    29. */
    30. @Bean
    31. public FanoutExchange fanoutExchange(){
    32. return new FanoutExchange("fanoutExchange");
    33. }
    34. /**
    35. * 设置队列和交换机的绑定
    36. */
    37. @Bean
    38. public Binding fanoutBindingA(){
    39. return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
    40. }
    41. @Bean
    42. public Binding fanoutBindingB(){
    43. return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
    44. }
    45. @Bean
    46. public Binding fanoutBindingC(){
    47. return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
    48. }
    49. }
    •  controller
    1. package com.jmh.provider.controller;
    2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.web.bind.annotation.RequestMapping;
    5. import org.springframework.web.bind.annotation.RestController;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:08
    9. */
    10. @RestController
    11. @SuppressWarnings("all")
    12. public class ProviderController {
    13. @Autowired
    14. private RabbitTemplate rabbitTemplate;
    15. /**
    16. * 扇形交换机
    17. * @param key
    18. * @return
    19. */
    20. @RequestMapping("/fanoutSend")
    21. public String fanoutSend(){
    22. rabbitTemplate.convertAndSend("fanoutExchange",null,"Hello World");
    23. return "yes";
    24. }
    25. }

          consumer消费者

    • FanoutReceiverA

    1. package com.jmh.consumer.listener;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:12
    9. */
    10. @Component
    11. @SuppressWarnings("all")
    12. @RabbitListener(queues = "fanoutQueueA")
    13. @Slf4j
    14. public class FanoutReceiverA {
    15. @RabbitHandler
    16. public void info(String msg){
    17. log.info("A接收到了"+msg);
    18. }
    19. }
    • FanoutReceiverB 
    1. package com.jmh.consumer.listener;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:12
    9. */
    10. @Component
    11. @SuppressWarnings("all")
    12. @RabbitListener(queues = "fanoutQueueB")
    13. @Slf4j
    14. public class FanoutReceiverB {
    15. @RabbitHandler
    16. public void info(String msg){
    17. log.info("B接收到了"+msg);
    18. }
    19. }
    •  FanoutReceiverC
    1. package com.jmh.consumer.listener;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. /**
    7. * @author 蒋明辉
    8. * @data 2022/11/25 19:12
    9. */
    10. @Component
    11. @SuppressWarnings("all")
    12. @RabbitListener(queues = "fanoutQueueC")
    13. @Slf4j
    14. public class FanoutReceiverC {
    15. @RabbitHandler
    16. public void info(String msg){
    17. log.info("C接收到了"+msg);
    18. }
    19. }

     

  • 相关阅读:
    SEO搜索引擎优化-SEO搜索引擎优化软件
    torch.jit.trace与torch.jit.script的区别
    java毕业生设计医院门诊分诊系统计算机源码+系统+mysql+调试部署+lw
    Dubbo3注册为应用级时报错“No provider available for the service XXX”
    Django 基于ORM的CURD、外键关联,请求的生命周期
    AI人工智能进阶-BERT/Transformer/LSTM/RNN原理与代码
    论文精读《OFT: Orthographic Feature Transform for Monocular 3D Object Detection》
    学习笔记——指针!
    【JavaScript流程控制-分支】
    十、Mysql - 全备份 - 根据二进制日志还原数据
  • 原文地址:https://blog.csdn.net/m0_63300795/article/details/128043807