• 【RocketMQ】RocketMQ快速入门


    RocketMQ的介绍

    RocketMQ版本发展

    Metaq1.x是RocketMQ前身的第一个版本,本质上把Kafka做了一次java版本的重写(Kafka是scala语言开发)。

    Meta2.x,主要是对存储部分进行了优化,因为kafka的数据存储,它的partition是一个全量的复制,在阿里、在淘宝的这种海量交易。Kafka这种机制的横向拓展是非常不好的。

    2012年阿里同时把Meta2.0从阿里内部开源出来,取名RocketMQ,同时为了命名上的规范(版本上延续),所以这个就是RocketMQ3.0。

    现在RocketMQ主要维护的是4.x的版本,也是大家使用得最多的版本,2017年从Apache顶级项目毕业。

    阿里内部项目的使用

    那么在阿里公司内部,原则上遵守开源共建原则。RocketMQ项目只维护核心功能,且去除了所有其他运行时依赖,核心功能最简化。每个BU(Business Unit业务单元)的个性化需求都在RocketMQ项目之上进行深度定制。RocketMQ向其他BU提供的仅仅是Jar包,例如要定制一个Broker,那么只需要依赖rocketmq-broker这jar包即可,可通过API进行交互, 如果定制client,则依赖rocketmq-client这个jar包,对其提供的api进行再封装。

    在RocketMQ项目基础上几个常用的项目如下:

    • com.taobao.metaq v3.0 = RocketMQ + 淘宝个性化需求:为淘宝应用提供消息服务
    • com.alipay.zpullmsg v1.0 = RocketMQ + 支付宝个性化需求:为支付宝应用提供消息服务
    • com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B个性化需求:为B2B应用提供消息服务

    展望未来

    从阿里负责RocketMQ的架构核心人员的信息来看,阿里内部一直全力拓展RocketMQ。

    2017年10月份,OpenMessaging项目由阿里巴巴发起,与雅虎、滴滴出行、Streamlio公司共同参与创立, 项目意在创立厂商无关、平台无关的分布式消息及流处理领域的应用开发标准。同时OpenMessaging入驻Linux基金会。

    OpenMessaging项目已经开始在Apache RocketMQ中率先落地,并推广至整个阿里云平台.

    另外RocketMQ5的版本也在内部推进,主要的方向是Cloud Native(云原生)。

    另外还要讲一下Apache RocketMQ的商业版本,Aliware MQ在微服务、流计算、IoT、异步解耦、数据同步等场景有非常广泛的运用。

    1656577575183.png

    RocketMQ的物理架构

    消息队列RocketMQ是阿里巴巴集团基于高可用分布式集群技术,自主研发的云正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双11使用的核心产品。

    RocketMQ的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。

    1656577658719.png

    NameServer

    NameServer是整个RocketMQ的“大脑”,它是RocketMQ的服务注册中心,所以RocketMQ需要先启动NameServer再启动Broker。

    Broker在启动时向所有NameServer注册(主要是服务器地址等),生产者在发送消息之前先从NameServer获取Broker服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

    NameServer与每台Broker服务保持长连接,并间隔30S检查Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。这样就可以实现RocketMQ的高可用。

    生产者(Producer)

    生产者:也称为消息发布者,负责生产并发送消息至RocketMQ。

    消费者(Consumer)

    消费者:也称为消息订阅者,负责从RocketMQ接收并消费消息。

    消息(Message)

    消息:生产或消费的数据,对于RocketMQ来说,消息就是字节数组。

    主机(Broker)

    RocketMQ的核心,用于暂存和传输消息。

    物理架构中的整体运转

    1. NameServer先启动
    2. Broker启动时向NameServer注册
    3. 生产者在发送某个主题的消息之前先从NamerServer获取Broker服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker进行消息发送。
    4. NameServer与每台Broker服务器保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机(使用心跳机制,如果检测超过120S),则从路由注册表中将其移除。
    5. 消费者在订阅某个主题的消息之前从NamerServer获取Broker服务器地址列表(有可能是集群),但是消费者选择从Broker中订阅消息,订阅规则由Broker配置决定。

    RocketMQ的概念模型

    分组(Group)

    生产者:标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。主要作用用于事务消息,事务消息中如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其它producer,确认这条消息应该commit还是rollback。

    消费者:标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。

    消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。

    主题(Topic)

    标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。

    区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息。

    标签(Tag)

    RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic的时候,如果你消费订阅的时候指定的是tagA,那么tagB的消息将不会投递。

    消息队列(Message Queue)

    简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若一个Topic创建在不同的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。

    无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。

    每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。

    偏移量(Offset)

    RocketMQ中,有很多offset的概念。一般我们只关心暴露到客户端的offset。不指定的话,就是指Message Queue下面的offset。

    Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset,Message queue中的max offset表示消息的最大offset。

    Consumer offset可以理解为标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息offset+1,即实际上表示的是下次拉取的offset位置。

    普通消息的发送与消费

    消息发送

    发送同步消息

    同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。

    1656578837414.png

    这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

    package com.morris.rocketmq.simple;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
    import static com.morris.rocketmq.util.Contant.PRODUCER_GROUP;
    
    /**
     * 同步发送消息
     */
    public class SynProducer {
    
        public static void main(String[] args) throws Exception {
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new
                    DefaultMQProducer(PRODUCER_GROUP);
            // Specify name server addresses.
            producer.setNamesrvAddr(NAME_SERVER_ADDRESS);
            producer.setSendMsgTimeout(6000);
            //Launch the instance.
            producer.start();
            for (int i = 0; i < 100; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            //Shut down once the producer instance is not longer in use.
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    运行结果如下:

    SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2DFEB0000, offsetMsgId=AC12FA2E00002A9F0000000000012E3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=1], queueOffset=100]
    SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0030001, offsetMsgId=AC12FA2E00002A9F0000000000012EF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=2], queueOffset=101]
    SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0060002, offsetMsgId=AC12FA2E00002A9F0000000000012FB6, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=3], queueOffset=100]
    SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0090003, offsetMsgId=AC12FA2E00002A9F0000000000013074, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=0], queueOffset=99]
    ... ...
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Message ID:消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条消息。

    SendStatus:发送的标识。成功,失败等

    Queue:相当于是Topic的分区;用于并行发送和接收消息

    发送异步消息

    异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

    1656579044564.png

    消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

    package com.morris.rocketmq.simple;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
    import static com.morris.rocketmq.util.Contant.PRODUCER_GROUP;
    
    /**
     * 异步发送消息
     */
    public class AsyncProducer {
        public static void main(String[] args) throws Exception {
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
            // Specify name server addresses.
            producer.setNamesrvAddr(NAME_SERVER_ADDRESS);
            //Launch the instance.
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
            
            int messageCount = 100;
            final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
            for (int i = 0; i < messageCount; i++) {
                try {
                    final int index = i;
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            countDownLatch.countDown();
                            System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                        }
    
                        @Override
                        public void onException(Throwable e) {
                            countDownLatch.countDown();
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            countDownLatch.await(5, TimeUnit.SECONDS);
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    单向发送消息

    这种方式主要用在不特别关心发送结果的场景,例如日志发送。

    1656579146987.png

    单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

    package com.morris.rocketmq.simple;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
    import static com.morris.rocketmq.util.Contant.PRODUCER_GROUP;
    
    /**
     * 单向发送消息
     */
    public class OnewayProducer {
        public static void main(String[] args) throws Exception{
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
            // Specify name server addresses.
            producer.setNamesrvAddr(NAME_SERVER_ADDRESS);
            //Launch the instance.
            producer.start();
            for (int i = 0; i < 100; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                        i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                producer.sendOneway(msg);
            }
            //Wait for sending to complete
            Thread.sleep(5000);        
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    消息发送时的权衡

    发送方式发送TPS发送结果反馈可靠性使用场景
    同步发送可靠邮件、短信、推送
    异步发送可靠视频转码
    单向发送最快可能丢失日志收集

    消息消费

    集群消费

    1656579661627.png

    消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

    实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。

    而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。

    这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker。

    package com.morris.rocketmq.simple;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
    import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
    
    /**
     * 集群消费消息(默认)
     */
    public class ClusterConsumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // Instantiate with specified consumer group name.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
             
            // Specify name server addresses.
            consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
            consumer.setMessageModel(MessageModel.CLUSTERING); // default
            
            // Subscribe one more more topics to consume.
            consumer.subscribe("TopicTest", "*");
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
    
            //Launch the consumer instance.
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    广播消费

    1656579689332.png

    消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。

    实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

    这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。

    package com.morris.rocketmq.simple;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
    import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
    
    /**
     * 广播消费消息(默认)
     */
    public class BroadcastingConsumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // Instantiate with specified consumer group name.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
             
            // Specify name server addresses.
            consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
            consumer.setMessageModel(MessageModel.BROADCASTING); // default
            
            // Subscribe one more more topics to consume.
            consumer.subscribe("TopicTest", "*");
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
    
            //Launch the consumer instance.
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    这种模式下消费者只能收到启动后发送MQ中的消息。

    消息消费时的权衡

    集群模式:

    • 消费端集群化部署,每条消息只需要被处理一次。
    • 由于消费进度在服务端维护,可靠性更高。
    • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
    • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

    广播模式:

    • 广播消费模式下不支持顺序消息。
    • 广播消费模式下不支持重置消费位点。
    • 每条消息都需要被相同逻辑的多台机器处理。
    • 消费进度在客户端维护,出现重复的概率稍大于集群模式。

    广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。

    广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

    广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。

    目前仅Java客户端支持广播模式。广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

  • 相关阅读:
    C++程序dll崩溃如何定位分析?
    素数判断[牛客]
    给定n个结点m条边的简单无向图,判断该图是否存在鱼形状的子图:有一个环,其中有一个结点有另外两条边,连向不在环内的两个结点。若有,输出子图的连边
    rocketmq-spring-boot-starter 2.1.0 事务消息移除参数txProducerGroup
    学C/C++想提升功底 可以花点时间看看这篇博客---C语言程序环境和预处理
    计算机网络 - 数据链路层 选择填空复习题
    C++入门,详解类和对象(1)
    人工智能相关书籍介绍
    金仓数据库 KingbaseGIS 使用手册(8.9. 栅格波段统计和分析函数、8.10. 栅格输出)
    【编程题】【Scratch四级】2022.06 判断闰年
  • 原文地址:https://blog.csdn.net/u022812849/article/details/125600649