• 04-RabbitMQ之编程模型


    一、编程模型
    RabbitMQ包含7中模型,官网地址:https://www.rabbitmq.com/getstarted.html
    1、Hello World

    P端发送一个消息到一个指定的queue,中间不需要任何exchange规则。C端按queue方式进行消费
    2、Work Queues

    类似于kafka同一groupid的消息分发模式,Producer消息发送给queue,服务器根据负载方案决定把消息发给一个指定的Consumer处理。这个模式是最常用的模式
    3、Publish/Subscribe

    type为fanout的exchange。这个机制是对上面的一种补充,把preducer与Consumer进行进一步的解耦。producer只负责发送消息,至于消息进入哪个queue,由exchange来分配
    4、Routing

    type为direct的exchange。在exchange往所有队列发送消息的基础上,增加一个路由配置,指定exchange如何将不同类别的消息分发到不同的queue上
    5、Topics

    type为topic的exchange。这个模式也在上一个模式的基础上,对routingKey进行了模糊匹配单词之间用,隔开,*代表一个具体的单词。#代表0个或多个单词
    6、RPC

    远程调用是同步阻塞的调用远程服务并获取结果。RPC远程调用机制其实并不是消息中间件的处理强项,并且RPC远程调用的场景,也有太多可替代的技术会比用消息中间件处理得好,因此不推荐这种模型

    7、Publisher Confirms
    RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不会丢失消息。但是发送者发送消息是否成功是没有保证的。一次发送消息是否成功,应用是不知道的,这在业务上就容易造成消息丢失。而这个模块就是通过给发送者提供一些确认机制,来保证这个消息发送的过程是成功的。类似于RocketMQ的事务。
    在官网的示例中,重点解释了三种策略
    (1)发布一条消息就确认一条消息

    //发送消息
    channel.basicPublish("", queue, null, body.getBytes());
    //等待
    channel.waitForConfirmsOrDie(5_000);
    
    • 1
    • 2
    • 3
    • 4

    waitForConfirmsOrDie方法会在channel端等待RabbitMQ给出一个响应用来表明这个消息已经正确发送到了RabbitMQ服务端。这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。如果到了超时时间,还没有收到服务端的确认机制,那就会抛出异常。通常处理这个异常的方式是记录错误日志或者尝试重发消息,但是尝试重发时一定要注意不要使程序陷入死循环
    (2)发送批量消息
    之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后再一起确认.

            for (int i = 0; i < 10; i++) {
                channel.basicPublish("", queue, null, body.getBytes());
                if (i == 5) {
                    channel.waitForConfirmsOrDie(5_000);
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这种方式稍微可以缓解一下发送者确认模式对吞吐量的影响,但是当异常出现时,不能定位到具体哪一条消息有问题,所以接下来就需要增加一个机制能够具体对每一条发送出错的消息进行处理
    (3)异步确认消息
    实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。

    channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);
    
    • 1

    发送者在发送完消息后,会执行第一个监听器callback1,然后等服务端发过来的反馈后,再执行第二个监听器callback2。
    ConfirmCallback是个监听器接口里面只有一个方法void handle(long sequenceNumber, boolean multiple),其中包含两个参数:

    • sequenceNumer:这是一个唯一的序列号,代表一个唯一的消息。在RabbitMQ中,它的消息体只是一个二进制数组,并不像RocketMQ一样有一个封装的对象,所以默认消息是没有序列号的。而RabbitMQ提供了一个方法channel.getNextPublishSeqNo()生成一个全局递增的序列号。但是需要应用程序自己来将这个序列号与消息对应起来,比较坑
    • multiple:这个是一个Boolean型的参数。如果是true,就表示这一次只确认了当前一条消息。如果是false,就表示RabbitMQ这一次确认了一批消息,在sequenceNumber之前的所有消息都已经确认完成了
  • 相关阅读:
    ZooKeeper 5:集群模式
    基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的零售柜商品检测软件(Python+PySide6界面+训练代码)
    【图解大数据技术】流式计算:Spark Streaming、Flink
    pr视频剪辑素材,免费下载
    markdown语法简述
    WPF--在后台执行Command指令
    Java笔记(五)
    C++ day4
    3.快速上手
    软件测试/测试开发丨结对编程助手 GitHubCopilot
  • 原文地址:https://blog.csdn.net/qq_39234967/article/details/126198434