• RabbitMQ的常用交换机在springboot中的使用


    如果没有安装rabbitmq》》》》linux安装rabbitmq

    一、导入依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>

    二、编写配置

    1. 下面配置都是spring:下的
    2. rabbitmq:
    3. host: 44.98.18
    4. username: guest
    5. password: guest
    6. # 虚拟主机
    7. virtual-host: /
    8. port: 5672
    9. listener:
    10. simple:
    11. # 消费者最小数量
    12. concurrency: 10
    13. # 消费者最大数量
    14. max-concurrency: 10
    15. # 限制消费者每次只能处理一条消息,处理完再继续处理下一条消息
    16. prefetch: 1
    17. # 启动时是否默认启动容器,默认true
    18. auto-startup: true
    19. # 拒绝策略:被拒绝时是否重新进入队列
    20. default-requeue-rejected: true
    21. template:
    22. retry:
    23. # 发布重试,默认false
    24. enabled: true
    25. # 重试时间默认1000ms
    26. initial-interval: 1000ms
    27. # 重试次数,默认3
    28. max-attempts: 3
    29. # 重试最大间隔时间,默认10000ms
    30. max-interval: 10000ms
    31. # 重试的间隔乘数,比如配置2.0,第一次等10s,第二次等20s,第三次等40秒
    32. multiplier: 1

    一个简单队列

    需要创建一个配置类,然后创建一个生产者,再创建一个消费者,之后在具体的业务中调用生产者就欧克了

    创建一个队列 RabbitMQConfig.java

    1. /**
    2. *

    3. * RabbitMQ配置类
    4. *

    5. *
    6. * @author wangmh
    7. * @since 2022/9/20 17:01
    8. */
    9. @Configuration
    10. public class RabbitMQConfig {
    11. @Bean
    12. public Queue queue(){
    13. // 名字,是否持久化
    14. return new Queue("queue",true);
    15. }
    16. }

    生产者:MQSender.java

    1. @Service
    2. @Slf4j
    3. public class MQSender {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. public void send(Object msg){
    7. log.info("发送消息:"+msg);
    8. rabbitTemplate.convertAndSend("queue",msg);
    9. }
    10. }

    消费者:MQReceiver.java

    1. @Service
    2. @Slf4jd
    3. public class MQReceiver {
    4. // 监听这个队列
    5. @RabbitListener(queues = "queue")
    6. public void receive(Object msg){
    7. log.info("接收消息"+msg);
    8. }
    9. }

    输出结果

    1. 结果:
    2. 2022-09-20 19:43:49.720 INFO 11180 --- [nio-8080-exec-1] com.example.rabbitmq.MQSender : 发送消息:hello
    3. 2022-09-20 19:43:49.853 INFO 11180 --- [ntContainer#0-9] com.example.rabbitmq.MQReceiver : 接收消息(Body:'hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue, deliveryTag=1, consumerTag=amq.ctag-rr1oDI2wtJb1Np0jucHeGw, consumerQueue=queue])

    交换机

    一、fanout

    广播模式,把队列和交换机进行绑定,转发消息是最快的,因为他不需要去处理路由键

    发送到交换机的消息,会被转发到和这个交换机绑定的所有队列上(发布订阅模式

    在配置文件中配置2个队列,1个交换机,把这个交换机和2个队列做绑定

    1. @Configuration
    2. public class RabbitMQConfig {
    3. private static final String QUEUE01 = "queue_fanout01";
    4. private static final String QUEUE02 = "queue_fanout02";
    5. private static final String EXCHANGE = "fanoutExchange";
    6. @Bean
    7. public Queue queue01(){
    8. return new Queue(QUEUE01);
    9. }
    10. @Bean
    11. public Queue queue02(){
    12. return new Queue(QUEUE02);
    13. }
    14. @Bean
    15. public FanoutExchange fanoutExchange(){
    16. return new FanoutExchange(EXCHANGE);
    17. }
    18. // 绑定交换机和队列
    19. @Bean
    20. public Binding binding01(){
    21. return BindingBuilder.bind(queue01()).to(fanoutExchange());
    22. }
    23. @Bean
    24. public Binding binding02(){
    25. return BindingBuilder.bind(queue02()).to(fanoutExchange());
    26. }
    27. }

    发送:

    1. @Autowired
    2. private RabbitTemplate rabbitTemplate;
    3. public void send01(Object msg){
    4. log.info("发送消息:"+msg);
    5. rabbitTemplate.convertAndSend("fanoutExchange","",msg);
    6. }

    接收 :

    1. @RabbitListener(queues = "queue_fanout01")
    2. public void receive01(Object msg){
    3. log.info("queue01:"+msg);
    4. }
    5. @RabbitListener(queues = "queue_fanout02")
    6. public void receive02(Object msg){
    7. log.info("queue02 :"+msg);
    8. }

    运行后就可以看到控制台的变化:

     

     结果:

    二、direct

    所有被发送到direct交换机上的消息,都会被转发到路由key中指定的一个queue

    和fanout相比,就是多了一个key,匹配的时候根据key进行匹配

    发送的时候要带有路由key,如果匹配不到默认上会丢失,但是我们在yaml中配置了,发送失败重新进入队列

    但是:

    当项目使用的时间越来越就,路由key就会越来越多,就会变得越来越难管

    RabbitMQConfig.java

    1. @Configuration
    2. public class RabbitMQConfig {
    3. private static final String QUEUE01 = "queue_direct01";
    4. private static final String QUEUE02 = "queue_direct02";
    5. private static final String EXCHANGE = "directExchange";
    6. private static final String ROUTINGKEY01 = "queue.red";
    7. private static final String ROUTINGKEY02 = "queue.green";
    8. @Bean
    9. public Queue queue01(){
    10. return new Queue(QUEUE01);
    11. }
    12. @Bean
    13. public Queue queue02(){
    14. return new Queue(QUEUE02);
    15. }
    16. @Bean
    17. public DirectExchange directExchange(){
    18. return new DirectExchange(EXCHANGE);
    19. }
    20. // 绑定队列和交换机,这种模式需要指定路由key
    21. @Bean
    22. public Binding binding01(){
    23. return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
    24. }
    25. @Bean
    26. public Binding binding02(){
    27. return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
    28. }
    29. }

    发送者:

    1. @Service
    2. @Slf4j
    3. public class MQSender {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. public void send02(Object msg){
    7. log.info("发送红色的消息::"+msg);
    8. rabbitTemplate.convertAndSend("directExchange","queue.red",msg);
    9. }
    10. public void send03(Object msg){
    11. log.info("发送绿色的消息::"+msg);
    12. rabbitTemplate.convertAndSend("directExchange","queue.green",msg);
    13. }
    14. }

    接收者:

    1. @Service
    2. @Slf4j
    3. public class MQReceiver {
    4. // 监听这个队列
    5. @RabbitListener(queues = "queue_direct01")
    6. public void receive02(Object msg){
    7. log.info("queue_direct01:"+msg);
    8. }
    9. @RabbitListener(queues = "queue_direct02")
    10. public void receive03(Object msg){
    11. log.info("queue_direct02 :"+msg);
    12. }
    13. }

    测试

    1. @RequestMapping("/direct")
    2. @ResponseBody
    3. public void mq01(){
    4. mqSender.send02("hello+red");
    5. mqSender.send03("hello+green");
    6. }

    运行结果:

     看到管控台可以看到,对应的交换机,队列,和队列对应的key

    三、topic

    *只能替代1个词

    #可以替代0个或多个词、

    可以说是 direct模式的延申,方便我们管理路由key

    所有发送到主机交换机的消息都会被转发到所有的路由ley中,所指定的topic的queue上去, 交换机会将路由key和topic模糊匹配,此时,队列需要绑定一个topic

    RabbitMQConfig.java

    1. @Configuration
    2. public class RabbitMQConfig {
    3. private static final String QUEUE01 = "queue_topic01";
    4. private static final String QUEUE02 = "queue_topic02";
    5. private static final String EXCHANGE = "topicExchange";
    6. private static final String ROUTINGKEY01 = "#.queue.#";
    7. private static final String ROUTINGKEY02 = "*.queue.#";
    8. @Bean
    9. public Queue queue01(){
    10. return new Queue(QUEUE01);
    11. }
    12. @Bean
    13. public Queue queue02(){
    14. return new Queue(QUEUE02);
    15. }
    16. @Bean
    17. public TopicExchange topicExchange(){
    18. return new TopicExchange(EXCHANGE);
    19. }
    20. // 绑定队列和交换机,这种模式需要指定路由key
    21. @Bean
    22. public Binding binding01(){
    23. return BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
    24. }
    25. @Bean
    26. public Binding binding02(){
    27. return BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
    28. }
    29. }

    发送消息:

    1. @Service
    2. @Slf4j
    3. public class MQSender {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. public void send01(Object msg){
    7. log.info("发送消息Queue1::"+msg);
    8. rabbitTemplate.convertAndSend("topicExchange","queue.red.message",msg);
    9. }
    10. public void send02(Object msg){
    11. log.info("发送消息,被queue1+queue2接收::"+msg);
    12. rabbitTemplate.convertAndSend("topicExchange","message.queue.green",msg);
    13. }
    14. }

    接收消息

    1. @Service
    2. @Slf4j
    3. public class MQReceiver {
    4. // 监听这个队列
    5. @RabbitListener(queues = "queue_topic01")
    6. public void receive02(Object msg){
    7. log.info("queue_topic01:"+msg);
    8. }
    9. @RabbitListener(queues = "queue_topic02")
    10. public void receive03(Object msg){
    11. log.info("queue_topic02 :"+msg);
    12. }
    13. }

    测试

    1. @RequestMapping("/topic")
    2. @ResponseBody
    3. public void mq01(){
    4. mqSender.send02("hello+red");
    5. mqSender.send02("hello+green");
    6. }

    结果

     和刚刚一样,可以去管控太去查看响应的交换机和队列的信息

    三、Headers模式

    并不依赖于路由key

    whereAll() 多个键值对都要满足

    whereAny() 任意一个键值对满足即

    RabbitMQConfig.java

    1. @Configuration
    2. public class RabbitMQConfig {
    3. private static final String QUEUE01 = "queue_headers01";
    4. private static final String QUEUE02 = "queue_headers02";
    5. private static final String EXCHANGE = "headerExchange";
    6. @Bean
    7. public Queue queue01(){
    8. return new Queue(QUEUE01);
    9. }
    10. @Bean
    11. public Queue queue02(){
    12. return new Queue(QUEUE02);
    13. }
    14. @Bean
    15. public HeadersExchange headersExchange(){
    16. return new HeadersExchange(EXCHANGE);
    17. }
    18. // 绑定队列和交换机,这种模式需要指定路由key
    19. @Bean
    20. public Binding binding01(){
    21. Map map = new HashMap<>();
    22. map.put("color","red");
    23. map.put("speed","low");
    24. // 两个只需要匹配一个
    25. return BindingBuilder.bind(queue01()).to(headersExchange()).whereAny(map).match();
    26. }
    27. @Bean
    28. public Binding binding02(){
    29. Map map = new HashMap<>();
    30. map.put("color","red");
    31. map.put("speed","fast");
    32. // 必须要两个同时匹配上才可以
    33. return BindingBuilder.bind(queue02()).to(headersExchange()).whereAll(map).match();
    34. }
    35. }

    发送消息:

    1. public void send01(String msg){
    2. log.info("发送两个都能接收的::"+msg);
    3. MessageProperties properties = new MessageProperties();
    4. properties.setHeader("color","red");
    5. properties.setHeader("speed","fast");
    6. Message message = new Message(msg.getBytes(),properties);
    7. rabbitTemplate.convertAndSend("headerExchange","",message);
    8. }
    9. public void send02(String msg){
    10. log.info("发送消息,queue1接收::"+msg);
    11. MessageProperties properties = new MessageProperties();
    12. properties.setHeader("color","red");
    13. properties.setHeader("speed","normal");
    14. Message message = new Message(msg.getBytes(),properties);
    15. rabbitTemplate.convertAndSend("headerExchange","",message);
    16. }

    接收消息:

    1. @RabbitListener(queues = "queue_headers01")
    2. public void receive01(Message message){
    3. log.info("queue01接收Message对象:"+message);
    4. log.info("queue01接收消息:"+ new String(message.getBody()));
    5. }
    6. @RabbitListener(queues = "queue_headers02")
    7. public void receive02(Message message){
    8. log.info("queue02接收Message对象:"+message);
    9. log.info("queue02接收消息:"+ new String(message.getBody()));
    10. }

    测试:

    1. @RequestMapping("/direct")
    2. @ResponseBody
    3. public void mq01(){
    4. mqSender.send01("王萌虎");
    5. mqSender.send02("wangmenghu");
    6. }

    结果:

  • 相关阅读:
    ArcGIS:如何利用站点数据(例如臭氧)进行克里金插值得到连续臭氧表面?
    Zabbix监控组件及流程
    Django(二)精美博客搭建(13)实现留言页面及留言功能
    论文精读GAN: Generative Adversarial Nets
    QML元素定位器:Row、Colum、Grid、Flow、定位器嵌套以及Repeater用法
    436. 寻找右区间--LeetCode_二分
    量化交易之日内回转策略:如何利用MACD指标实现盈利?
    【机器学习】Kmeans聚类算法
    全域营销怎么做?统一客户数据是起点
    神奇的MappedByteBuffer
  • 原文地址:https://blog.csdn.net/w13966597931/article/details/127105242