• RocketMQ学习笔记(一)


    RocketMQ学习笔记

    消息中间件应用场景

    • 应用解耦
    • 削峰填谷
    • 数据分发

    常见的消息中间件

    • ActiveMQ:Apache出品,比较老的一个开源的消息中间件,以前在中小企业应用广泛
    • Kafka:Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统。
    • RabbitMQ:基于erlang语言开发的消息中间件,RabbitMQ最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。适用于对数据的一致性、稳定性和可靠性要求比较高的场景
    • RocketMQ:高性能、低延时和高可靠性等特性

    消息中间件对比

    KafkaRocketMQRabbitMQ
    定位日志消息、监控数据非日志的可靠消息传输非日志的可靠消息传输
    可用性非常高、分布式、主从非常高、分布式、主从高、主从、采用镜像模式实现,数据量大时可能有性能问题
    消息可靠性异步刷盘,容易丢数据同步刷盘、异步刷盘同步刷盘
    单机吞吐量百万级十万级万级
    堆积能力非常好非常好一般
    顺序消费支持,一台broker宕机后,消息会乱序支持,顺序消费场景下,消费失败时消费队列将会暂停支持,如果一个消费失败,此消息的顺序会被打乱
    定时消息不支持支持支持
    事务消息不支持支持不支持
    消息重试不支持支持支持
    死信队列不支持支持支持
    访问权限类似数据库,配置用户名和密码

    核心概念

    • 消息生产者Producer:往RocketMQ发送消息的应用程序
    • 消息消费者Consumer:从RocketMQ拉取消息,并根据消息执行业务的应用程序
    • 代理服务器Broker:实际和消费生产者、消息消费者进行交互的程序,主要进行消息的存储、消息的推送,一般从性能考虑,会对消息进行集群
    • 命名服务NameServer:代理服务器Broker在启动的时候注册信息到NameServer中,消息生产者和消息消费者启动的时候从NameServer拉取Broker的IP和端口,消息生产者应用程序和消息消费者的应用程序会和Broker建立长链接(基于Netty),发送消息和消费消息都是基于长链接的通道
    • 主题Topic:划分不同类型的消息,比如订单消息发送到OrderTopic,会员消息发送到MemberTopic
    • 消息队列MessageQueue:在Topic内部,专门进行消息的存储的地方,最底层存储的数据结构(默认一个Topic中有4个MessageQueue)
    • 消息Message:在消息生产者给RocketMQ发送消息的时候,需要将传递的参数封装到Message对象中,通过网络传输到RocketMQ,消息会存储在MessageQueue中
    • 标签Tag:可以在发送的时候给消息添加标签Tag,消费者可以通过标签Tag进行过滤

    简单使用

    引入jar包

    
        org.apache.rocketmq
        rocketmq-client
        4.9.5
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    生产者

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    public class SimpleProducer {
        public static void main(String[] args) throws Exception{
            //定义消费的生产者对象
            DefaultMQProducer producer = new DefaultMQProducer("helloProducerGroup");
            //定义nameServer地址
            producer.setNamesrvAddr("127.0.0.1:9876");
            //自动连接(从nameServer拉取broker地址,并且建立连接)
            producer.start();
            //定义消息发送的目的地Topic
            String topic = "helloTopic";
            for(int i=0;i<10;i++){
                //定义消息
                Message message = new Message(topic,("helloTopic的消息=="+i).getBytes());
                //发送消息
                SendResult result = producer.send(message);
                //输出消息储存的结果
                System.out.println("消息存储的状态:"+result.getSendStatus());
                System.out.println("消息存储的消息ID:"+result.getMsgId());
            }
            //关闭连接
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    消费者

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class SimpleConsumer {
        public static void main(String[] args) throws Exception {
            //定义消息的消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
            //定义nameServer地址
            consumer.setNamesrvAddr("127.0.0.1:9876");
            //定义消费的主题
            String topic = "helloTopic";
            //监听该主题消息
            consumer.subscribe(topic,"*");
            //设置消息监听器,服务器把消息推送给我们,消费消息
            consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
                for(MessageExt messageExt:list){
                    System.out.println("处理的线程:"+Thread.currentThread()+",消息内容:"+new String(messageExt.getBody()));
                }
                //告诉消息中间件,消息处理的情况
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
  • 相关阅读:
    【LeetCode-中等题】117. 填充每个节点的下一个右侧节点指针 II
    数据库系统概论习题3
    简记 Vue3(一)—— setup、ref、reactive、toRefs、toRef
    二维码智慧门牌管理系统:构建未来社区管理新典范
    C++ LibCurl实现Web指纹识别
    学生Dreamweaver静态网页设计 基于HTML+CSS+JavaScript制作简食餐厅美食网站制作
    [C#基础训练]FoodRobot食品管理部分代码-2
    低代码发展趋势解读|低代码成为企业数字化转型“加速器”
    java 视频断点播放,实现无卡顿
    图像处理软件Photoshop 2023 mac新增功能 ps 2023中文版
  • 原文地址:https://blog.csdn.net/flash_love/article/details/134059184