• springboot集成rabbitmq:fanout、topic


    编写Fanout模式的消息接收

    其他模块和上文保持一致https://blog.csdn.net/weixin_59334478/article/details/127740411?spm=1001.2014.3001.5501

    ReceiveServiceImpl实现类

    1. package com.it.rabbitmq.impl;
    2. import com.it.rabbitmq.ReceiveService;
    3. import org.springframework.amqp.core.AmqpTemplate;
    4. import org.springframework.amqp.rabbit.annotation.Exchange;
    5. import org.springframework.amqp.rabbit.annotation.Queue;
    6. import org.springframework.amqp.rabbit.annotation.QueueBinding;
    7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    8. import org.springframework.stereotype.Service;
    9. import javax.annotation.Resource;
    10. @Service("receiveService")
    11. public class ReceiveServiceImpl implements ReceiveService {
    12. //注入amqp的模板类,利用这个对象来发送和接收消息
    13. @Resource
    14. private AmqpTemplate amqpTemplate;
    15. @Override
    16. public void receiveMessage() {
    17. /**
    18. * 发送消息
    19. * 参数1为交换机名称
    20. * 参数2位RoutingKey
    21. * 参数3为具体发送的消息数据
    22. */
    23. String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
    24. System.out.println(message);
    25. }
    26. /**
    27. * @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息
    28. * 这个方法不需要手动调用,spring会自动监听
    29. * 属性queues:用于指定一个已经存在的队列名称,用于队列的监听
    30. * @param message 参数就是接收到的具体消息数据
    31. */
    32. @RabbitListener(queues = {"bootDirectQueue"})
    33. public void directReceive(String message) {
    34. System.out.println("监听器接收的消息:"+message);
    35. }
    36. @RabbitListener(bindings = {
    37. @QueueBinding(value = @Queue(),
    38. exchange = @Exchange(name="fanoutExchange",type = "fanout")
    39. )})
    40. public void fanoutReceive1(String message) {
    41. System.out.println("fanoutReceive1监听器接收的消息:"+message);
    42. }
    43. @RabbitListener(bindings = {
    44. @QueueBinding(value = @Queue(),
    45. exchange = @Exchange(name="fanoutExchange",type = "fanout")
    46. )})
    47. public void fanoutReceive2(String message) {
    48. System.out.println("fanoutReceive2监听器接收的消息:"+message);
    49. }
    50. }

     

     

    编写Fanout模式的消息接收

    1.SendService接口

    1. package com.it.rabbitmq;
    2. public interface SendService {
    3. void sendMessage(String message);
    4. void sendFanoutMessage(String message);
    5. }

    SendServiceImpl实现类

    1. package com.it.rabbitmq.impl;
    2. import com.it.rabbitmq.SendService;
    3. import org.springframework.amqp.core.AmqpTemplate;
    4. import org.springframework.stereotype.Service;
    5. import javax.annotation.Resource;
    6. @Service("sendService")
    7. public class SendServiceImpl implements SendService {
    8. //注入amqp的模板类,利用这个对象来发送和接收消息
    9. @Resource
    10. private AmqpTemplate amqpTemplate;
    11. @Override
    12. public void sendMessage(String message) {
    13. /**
    14. * 发送消息
    15. * 参数1为交换机名称
    16. * 参数2位RoutingKey
    17. * 参数3为具体发送的消息数据
    18. */
    19. amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);
    20. }
    21. @Override
    22. public void sendFanoutMessage(String message) {
    23. amqpTemplate.convertAndSend("fanoutExchange","",message);
    24. }
    25. }

    2.RabbitMQConfig类

    1. package com.it.rabbitmq.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. @Configuration
    6. public class RabbitMQConfig {
    7. //配置一个direct类型的交换机
    8. @Bean
    9. public DirectExchange directExchange() {
    10. return new DirectExchange("bootDirectExchange");
    11. }
    12. //配置一个队列
    13. @Bean
    14. public Queue directQueue() {
    15. return new Queue("bootDirectQueue");
    16. }
    17. /**
    18. * 配置一个交换机和队列的绑定
    19. *
    20. * @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
    21. * @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
    22. * @return
    23. */
    24. @Bean
    25. public Binding directBinding(Queue directQueue, DirectExchange directExchange) {
    26. //完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkey
    27. return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
    28. }
    29. //配置一个fanout类型的交换机
    30. @Bean
    31. public FanoutExchange fanoutExchange() {
    32. return new FanoutExchange("fanoutExchange");
    33. }
    34. }

    3.运行主函数

    1. package com.it;
    2. import com.it.rabbitmq.SendService;
    3. import org.springframework.boot.SpringApplication;
    4. import org.springframework.boot.autoconfigure.SpringBootApplication;
    5. import org.springframework.context.ApplicationContext;
    6. @SpringBootApplication
    7. public class RabbitmqSpringbootApplication {
    8. public static void main(String[] args) {
    9. ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);
    10. SendService sendService = (SendService) ac.getBean("sendService");
    11. // sendService.sendMessage("boot的测试数据");
    12. sendService.sendFanoutMessage("boot的fanout测试数据!");
    13. }
    14. }

     

     

     编写Topic模式消息接收

    ReceiveServiceImpl实现类

    1. package com.it.rabbitmq.impl;
    2. import com.it.rabbitmq.ReceiveService;
    3. import org.springframework.amqp.core.AmqpTemplate;
    4. import org.springframework.amqp.rabbit.annotation.Exchange;
    5. import org.springframework.amqp.rabbit.annotation.Queue;
    6. import org.springframework.amqp.rabbit.annotation.QueueBinding;
    7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    8. import org.springframework.stereotype.Service;
    9. import javax.annotation.Resource;
    10. @Service("receiveService")
    11. public class ReceiveServiceImpl implements ReceiveService {
    12. //注入amqp的模板类,利用这个对象来发送和接收消息
    13. @Resource
    14. private AmqpTemplate amqpTemplate;
    15. @Override
    16. public void receiveMessage() {
    17. /**
    18. * 发送消息
    19. * 参数1为交换机名称
    20. * 参数2位RoutingKey
    21. * 参数3为具体发送的消息数据
    22. */
    23. String message= (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
    24. System.out.println(message);
    25. }
    26. /**
    27. * @RabbitListener:用于标记当前方法是一个rabbitmq的消息监听方法,作用是持续性的接收消息
    28. * 这个方法不需要手动调用,spring会自动监听
    29. * 属性queues:用于指定一个已经存在的队列名称,用于队列的监听
    30. * @param message 参数就是接收到的具体消息数据
    31. */
    32. @RabbitListener(queues = {"bootDirectQueue"})
    33. public void directReceive(String message) {
    34. System.out.println("监听器接收的消息:"+message);
    35. }
    36. @RabbitListener(bindings = {
    37. @QueueBinding(value = @Queue(),
    38. exchange = @Exchange(name="fanoutExchange",type = "fanout")
    39. )})
    40. public void fanoutReceive1(String message) {
    41. System.out.println("fanoutReceive1监听器接收的消息:"+message);
    42. }
    43. @RabbitListener(bindings = {
    44. @QueueBinding(value = @Queue(),
    45. exchange = @Exchange(name="fanoutExchange",type = "fanout")
    46. )})
    47. public void fanoutReceive2(String message) {
    48. System.out.println("fanoutReceive2监听器接收的消息:"+message);
    49. }
    50. @RabbitListener(bindings = {
    51. @QueueBinding(value = @Queue("topic1"),
    52. key = {"aa"},
    53. exchange = @Exchange(name="topicExchange",type = "topic")
    54. )})
    55. public void topicReceive1(String message){
    56. System.out.println("topic1消费者---aa---"+message);
    57. }
    58. @RabbitListener(bindings = {
    59. @QueueBinding(value = @Queue("topic2"),
    60. key = {"aa.*"},
    61. exchange = @Exchange(name="topicExchange",type = "topic")
    62. )})
    63. public void topicReceive2(String message){
    64. System.out.println("topic2消费者---aa---"+message);
    65. }
    66. @RabbitListener(bindings = {
    67. @QueueBinding(value = @Queue("topic3"),
    68. key = {"aa.#"},
    69. exchange = @Exchange(name="topicExchange",type = "topic")
    70. )})
    71. public void topicReceive3(String message){
    72. System.out.println("topic3消费者---aa---"+message);
    73. }
    74. }

     

     

     编写Topic模式消息发送

    1.SendService接口

    1. package com.it.rabbitmq;
    2. public interface SendService {
    3. void sendMessage(String message);
    4. void sendFanoutMessage(String message);
    5. void sendTopicMessage(String message);
    6. }

    SendServiceImpl类

    1. package com.it.rabbitmq.impl;
    2. import com.it.rabbitmq.SendService;
    3. import org.springframework.amqp.core.AmqpTemplate;
    4. import org.springframework.stereotype.Service;
    5. import javax.annotation.Resource;
    6. @Service("sendService")
    7. public class SendServiceImpl implements SendService {
    8. //注入amqp的模板类,利用这个对象来发送和接收消息
    9. @Resource
    10. private AmqpTemplate amqpTemplate;
    11. @Override
    12. public void sendMessage(String message) {
    13. /**
    14. * 发送消息
    15. * 参数1为交换机名称
    16. * 参数2位RoutingKey
    17. * 参数3为具体发送的消息数据
    18. */
    19. amqpTemplate.convertAndSend("bootDirectExchange", "bootDirectRouting", message);
    20. }
    21. @Override
    22. public void sendFanoutMessage(String message) {
    23. amqpTemplate.convertAndSend("fanoutExchange","",message);
    24. }
    25. @Override
    26. public void sendTopicMessage(String message) {
    27. amqpTemplate.convertAndSend("topicExchange","",message);
    28. }
    29. }

    2.RabbitMQConfig类,提前声明一个topic的交换机

    1. package com.it.rabbitmq.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. @Configuration
    6. public class RabbitMQConfig {
    7. //配置一个direct类型的交换机
    8. @Bean
    9. public DirectExchange directExchange() {
    10. return new DirectExchange("bootDirectExchange");
    11. }
    12. //配置一个队列
    13. @Bean
    14. public Queue directQueue() {
    15. return new Queue("bootDirectQueue");
    16. }
    17. /**
    18. * 配置一个交换机和队列的绑定
    19. *
    20. * @param directQueue 需要绑定的队列对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
    21. * @param directQueue 需要绑定的交换机对象,参数名必须要与某个@Bean的方法名完全相同这样就会自动注入
    22. * @return
    23. */
    24. @Bean
    25. public Binding directBinding(Queue directQueue, DirectExchange directExchange) {
    26. //完成绑定:参数1为需要绑定的队列,参数2为需要绑定的交换机,参数3为需要绑定的routingkey
    27. return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
    28. }
    29. //配置一个fanout类型的交换机
    30. @Bean
    31. public FanoutExchange fanoutExchange() {
    32. return new FanoutExchange("fanoutExchange");
    33. }
    34. //配置一个topic类型的交换机
    35. @Bean
    36. public TopicExchange topicExchange() {
    37. return new TopicExchange("topicExchange");
    38. }
    39. }

    3.运行主函数

    1. package com.it;
    2. import com.it.rabbitmq.SendService;
    3. import org.springframework.boot.SpringApplication;
    4. import org.springframework.boot.autoconfigure.SpringBootApplication;
    5. import org.springframework.context.ApplicationContext;
    6. @SpringBootApplication
    7. public class RabbitmqSpringbootApplication {
    8. public static void main(String[] args) {
    9. ApplicationContext ac = SpringApplication.run(RabbitmqSpringbootApplication.class, args);
    10. SendService sendService = (SendService) ac.getBean("sendService");
    11. // sendService.sendMessage("boot的测试数据");
    12. // sendService.sendFanoutMessage("boot的fanout测试数据!");
    13. sendService.sendTopicMessage("boot的topic测试数据,key:aa");
    14. }
    15. }

    功能测试:

    查看接收类

    修改发送信息的routingkey

     

     修改发送信息的routingkey

     

     

  • 相关阅读:
    优盘无法格式化?分享简单解决方法!
    初始网络知识
    WPF程序打包
    倍思、南卡、漫步者开放式耳机好不好用? 硬核测评年度最强产品
    el-menu动态加载路由,菜单的解决方案
    05 | 基础篇:某个应用的CPU使用率居然达到100%,我该怎么办?笔记
    小谈设计模式(27)—享元模式
    gitlab环境准备
    差分约束原理及其应用
    SpringCloud对服务内某个client进行单独配置
  • 原文地址:https://blog.csdn.net/weixin_59334478/article/details/127752022