• RabbitMQ_五种模式


    1.Simple("Hello World")

    构成:生产者、消费者、消息队列 

    配置类

    构造函数参数:name durable exclusive autoDelete

    仅创建队列,不创建交换机,也不进行队列和交换机的绑定

    注:配置类置于生产者端或消费者端无硬性要求,但置于消费者端更有利于测试代码,原因如下:

    若先启动生产者端发送消息,再启动消费者端接收消息,则最先连接到RabbitMQ服务器的消费者将接收所有的消息,所以通常而言需要先启动消费者端;而先启动消费者端时,若将配置类置于生产者端,则可能会出现消费者端中使用,但尚未被配置到RabbitMQ的配置(若配置类在生产者端的话),所以将配置类置于消费者端更有利于测试代码,另外,在项目启动时就进行配置显然是很好的选择

    1. @Configuration
    2. public class HelloWorldConfiguration {
    3. // 创建消息队列
    4. // 不需要交换机(使用默认交换机direct)
    5. @Bean
    6. public Queue helloWorldQueue(){
    7. return new Queue("hello_world_queue", true, false, false);
    8. }
    9. }

    生产者

    convertAndSend方法参数:exchange routingKey message

    默认交换机为direct交换机,将routingKey设置为队列名可将消息直接发送至该队列

    1. @Service
    2. public class SayHelloWorldService {
    3. @Autowired
    4. RabbitTemplate rabbitTemplate;
    5. public void say(){
    6. rabbitTemplate.convertAndSend("", "hello_world_queue", "hello, world");
    7. System.out.println("消息发送成功");
    8. }
    9. }

    消费者

    @RabbitListener 监听队列

    @RabbitHandler 消息处理

    1. @Service
    2. @RabbitListener(queues = "hello_world_queue")
    3. public class HelloWorldService {
    4. @RabbitHandler
    5. public void message(String message){
    6. System.out.println("消息: " + message);
    7. }
    8. }

    测试

    1. @Autowired
    2. SayHelloWorldService sayHelloWorldService;
    3. @Test
    4. void helloWorld(){
    5. sayHelloWorldService.say();
    6. }

     

    2.Work Queues(工作模式)

    构成:生产者、消费者、消息队列(较Simple模式而言,存在多个消费者)

    配置类

    1. @Configuration
    2. public class WorkQueuesConfiguration {
    3. // 创建消息队列
    4. // 不需要交换机(使用默认交换机direct)
    5. @Bean
    6. public Queue workQueue(){
    7. return new Queue("work_queue", true, false, false);
    8. }
    9. }

    生产者

    1. @Service
    2. public class WorkQueuesService {
    3. @Autowired
    4. RabbitTemplate rabbitTemplate;
    5. public void workQueues(String message){
    6. rabbitTemplate.convertAndSend("", "work_queue", message);
    7. System.out.println("消息发送完成");
    8. }
    9. }

    消费者 

    1. @Service
    2. @RabbitListener(queues = "work_queue")
    3. public class Worker01Service {
    4. @RabbitHandler
    5. public void worker01(String message){
    6. System.out.println("Worker01收到消息: " + message);
    7. }
    8. }
    1. @Service
    2. @RabbitListener(queues = "work_queue")
    3. public class Worker02Service {
    4. @RabbitHandler
    5. public void worker01(String message){
    6. System.out.println("Worker02收到消息: " + message);
    7. }
    8. }
    1. @Service
    2. @RabbitListener(queues = "work_queue")
    3. public class Worker03Service {
    4. @RabbitHandler
    5. public void worker03(String message){
    6. System.out.println("Worker03收到消息: " + message);
    7. }
    8. }

    测试 

    可以看到消息被平分给了三个消费者

     

    3.Publish/Subscribe(发布/订阅模式)

    构成:生产者、消费者、消息队列、fanout交换机

    相较于上述模式,区别在于具有自定义的交换机,类型为fanout

    配置类

    消息队列与交换机进行绑定,当有生产者向交换机投递消息时,交换机将会把消息转发至所有与其绑定的消息队列

    1. @Configuration
    2. public class FanoutConfiguration {
    3. //订单交换机
    4. @Bean
    5. public FanoutExchange fanoutExchange(){
    6. return new FanoutExchange("fanout_exchange_order", true, false);
    7. }
    8. //消息通知队列(短信与邮件)
    9. @Bean
    10. public Queue fanoutSMSQueue(){
    11. return new Queue("sms_queue", true, false, false);
    12. }
    13. @Bean
    14. public Queue fanoutEmailQueue(){
    15. return new Queue("email_queue", true, false, false);
    16. }
    17. //绑定交换机与队列
    18. @Bean
    19. public Binding fanoutSMSBinding(){
    20. return BindingBuilder.bind(fanoutSMSQueue()).to(fanoutExchange());
    21. }
    22. @Bean
    23. public Binding fanoutEmailBinding(){
    24. return BindingBuilder.bind(fanoutEmailQueue()).to(fanoutExchange());
    25. }
    26. }

    生产者

    此处routingKey为空(fanout交换机无需routingKey)

    1. @Service
    2. public class FanoutOrderService {
    3. @Autowired
    4. RabbitTemplate rabbitTemplate;
    5. public void makeOrder(String userID, String producerID, int num){
    6. // 1.根据需求查询仓库 判断是否能满足需求
    7. // 2.若能满足则生成订单
    8. String orderID = UUID.randomUUID().toString();
    9. System.out.println("成功生成订单");
    10. // 3.通过RabbitMQ发送消息
    11. String exchangeName = "fanout_exchange_order";
    12. String routingKey = "";
    13. rabbitTemplate.convertAndSend(exchangeName, routingKey, orderID);
    14. System.out.println("订单发送成功");
    15. }
    16. }

    消费者

    1. @Service
    2. // @RabbitListener 监听消息队列
    3. @RabbitListener(queues = "email_queue")
    4. public class EmailMessageService {
    5. // @RabbitHandler 消息处理(接收消息)
    6. @RabbitHandler
    7. public void receiveEmailMessage(String message){
    8. System.out.println("接收到Email消息: " + message);
    9. }
    10. }
    1. @Service
    2. // @RabbitListener 监听消息队列
    3. @RabbitListener(queues = "sms_queue")
    4. public class SMSMessageService {
    5. // @RabbitHandler 消息处理(接收消息)
    6. @RabbitHandler
    7. public void receiveSMSMessage(String message){
    8. System.out.println("接收到SMS消息: " + message);
    9. }
    10. }

    测试 

    1. @Autowired
    2. FanoutOrderService fanoutOrderService;
    3. @Test
    4. void fanoutOrder() {
    5. fanoutOrderService.makeOrder("1", "1", 1);
    6. }

    4.Routing(路由模式)

    构成:生产者、消费者、消息队列、direct交换机

    相较于Publish/Subscribe模式,区别在于交换机类型,direct交换机支持以routingKey标识并分类消息队列

    配置类

    绑定消息队列与交换机时,还需要绑定routingKey,交换机将根据routingKey标识并分类消息队列;发送消息时,需要指明routingKey,交换机会根据routingKey将消息转发至对应的消息队列处

    1. @Configuration
    2. public class DirectConfiguration {
    3. //订单交换机
    4. @Bean
    5. public DirectExchange directExchange(){
    6. return new DirectExchange("direct_exchange_order", true, false);
    7. }
    8. //消息通知队列(短信与邮件)
    9. @Bean
    10. public Queue directSMSQueue(){
    11. return new Queue("sms_queue", true, false, false);
    12. }
    13. @Bean
    14. public Queue directEmailQueue(){
    15. return new Queue("email_queue", true, false, false);
    16. }
    17. //绑定交换机与队列
    18. @Bean
    19. public Binding directSMSBinding(){
    20. return BindingBuilder.bind(directSMSQueue()).to(directExchange()).with("sms");
    21. }
    22. @Bean
    23. public Binding directEmailBinding(){
    24. return BindingBuilder.bind(directEmailQueue()).to(directExchange()).with("email");
    25. }
    26. }

    生产者

    此处需要指明routingKey

    1. @Service
    2. public class DirectOrderService {
    3. @Autowired
    4. RabbitTemplate rabbitTemplate;
    5. public void makeOrder(String userID, String producerID, int num){
    6. // 1.根据需求查询仓库 判断是否能满足需求
    7. // 2.若能满足则生成订单
    8. String orderID = UUID.randomUUID().toString();
    9. System.out.println("成功生成订单");
    10. // 3.通过RabbitMQ发送消息
    11. String exchangeName = "direct_exchange_order";
    12. String routingKey01 = "sms";
    13. String routingKey02 = "email";
    14. rabbitTemplate.convertAndSend(exchangeName, routingKey01, orderID + " sms");
    15. rabbitTemplate.convertAndSend(exchangeName, routingKey02, orderID + " email");
    16. System.out.println("订单发送成功");
    17. }
    18. }

     消费者

    1. @Service
    2. // @RabbitListener 监听消息队列
    3. @RabbitListener(queues = "email_queue")
    4. public class EmailMessageService {
    5. // @RabbitHandler 消息处理(接收消息)
    6. @RabbitHandler
    7. public void receiveEmailMessage(String message){
    8. System.out.println("接收到Email消息: " + message);
    9. }
    10. }
    1. @Service
    2. // @RabbitListener 监听消息队列
    3. @RabbitListener(queues = "sms_queue")
    4. public class SMSMessageService {
    5. // @RabbitHandler 消息处理(接收消息)
    6. @RabbitHandler
    7. public void receiveSMSMessage(String message){
    8. System.out.println("接收到SMS消息: " + message);
    9. }
    10. }

    测试

    1. @Autowired
    2. DirectOrderService directOrderService;
    3. @Test
    4. void directOrder() {
    5. directOrderService.makeOrder("1", "1", 1);
    6. }

    5.Topic(主题模式)

    构成:生产者、消费者、消息队列、topic交换机

    相较于Routing模式,区别在于交换机类型,topic交换机可模糊匹配routingKey

    关于模糊匹配规则

    * 表示仅一级 #表示零级或多级(以.分级 级的内容可以为空)
    举例:队列1 2的routing key分别为 *.number.* 与 #.number.#
       x.number.y 将匹配队列1 2
       .number. 将匹配队列2
       x.number. 将匹配队列2
       .number.y 将匹配队列2
    

    配置类

    此处routingKey以模糊匹配规则定义

    1. @Configuration
    2. public class TopicConfiguration {
    3. //订单交换机
    4. @Bean
    5. public TopicExchange topicExchange(){
    6. return new TopicExchange("topic_exchange_order", true, false);
    7. }
    8. //消息通知队列(短信与邮件)
    9. @Bean
    10. public Queue topicSMSQueue(){
    11. return new Queue("sms_queue", true, false, false);
    12. }
    13. @Bean
    14. public Queue topicEmailQueue(){
    15. return new Queue("email_queue", true, false, false);
    16. }
    17. //绑定交换机与队列
    18. @Bean
    19. public Binding topicSMSBinding(){
    20. return BindingBuilder.bind(topicSMSQueue()).to(topicExchange()).with("*.sms.*");
    21. }
    22. @Bean
    23. public Binding topicEmailBinding(){
    24. return BindingBuilder.bind(topicEmailQueue()).to(topicExchange()).with("#.email.#");
    25. }
    26. }

    生产者

    1. @Service
    2. public class TopicOrderService {
    3. @Autowired
    4. RabbitTemplate rabbitTemplate;
    5. public void makeOrder(String userID, String producerID, int num){
    6. // 1.根据需求查询仓库 判断是否能满足需求
    7. // 2.若能满足则生成订单
    8. String orderID = UUID.randomUUID().toString();
    9. System.out.println("成功生成订单");
    10. // 3.通过RabbitMQ发送消息
    11. String exchangeName = "topic_exchange_order";
    12. String routingKey01 = "xxx.sms.yyy";
    13. String routingKey02 = ".email.";
    14. rabbitTemplate.convertAndSend(exchangeName, routingKey01, orderID + " sms");
    15. rabbitTemplate.convertAndSend(exchangeName, routingKey02, orderID + " email");
    16. System.out.println("订单发送成功");
    17. }
    18. }

     消费者

    1. @Service
    2. // @RabbitListener 监听消息队列
    3. @RabbitListener(queues = "email_queue")
    4. public class EmailMessageService {
    5. // @RabbitHandler 消息处理(接收消息)
    6. @RabbitHandler
    7. public void receiveEmailMessage(String message){
    8. System.out.println("接收到Email消息: " + message);
    9. }
    10. }
    1. @Service
    2. // @RabbitListener 监听消息队列
    3. @RabbitListener(queues = "sms_queue")
    4. public class SMSMessageService {
    5. // @RabbitHandler 消息处理(接收消息)
    6. @RabbitHandler
    7. public void receiveSMSMessage(String message){
    8. System.out.println("接收到SMS消息: " + message);
    9. }
    10. }

    测试

    可以看到,模糊匹配成功

  • 相关阅读:
    Java中同时POST文件和提交JSON数据的方法
    手把手写深度学习(0):专栏文章导航
    有什么自定义表单工具功能较好?
    实时天气API
    如何使用 Jmeter 进行抢购、秒杀等场景下,进行高并发?
    Resultf风格接口
    MySQL关于between and 和 大于等于>= 小于等于<= 区别
    【算法】一类支持向量机OC-SVM(2)
    在服务器上解压.7z文件
    手机流量卡经营商城小程序的作用是什么
  • 原文地址:https://blog.csdn.net/Mudrock__/article/details/128058457