• RocketMQ


    RocketMQ是一个开源的分布式消息队列系统,主要用于解决高并发、高可靠性的消息通信场景。在Java后端中,RocketMQ的作用主要有以下几个方面:

    1.异步解耦:RocketMQ可以将消息的发送和接收进行异步解耦,提供了可靠的消息传递机制。在高并发场景下,通过使用RocketMQ,可以将消息发送方和接收方解耦,提高系统的稳定性和可伸缩性。

    2.流量削峰:在高并发的情况下,如果直接将请求发送到后端服务,可能会导致服务过载。而使用RocketMQ可以将请求转换为消息,并通过消息队列平滑地削峰填谷,保证后端服务的稳定性和响应速度。

    3.异步通信:RocketMQ支持异步消息发送和接收,可以有效提高系统的吞吐量和性能。通过异步发送消息,发送方可以立即返回,而不需要等待消息被处理完毕,从而提升系统的响应速度。

    4.顺序消息处理:RocketMQ提供了顺序消息的支持,可以确保消息按照发送的顺序被消费,适用于一些对消息顺序要求较高的场景,如订单处理、流程审批等。

    5.消息广播:RocketMQ支持消息的广播模式,可以将消息发送给所有订阅者,适用于一些需要广播通知的场景。

    总之,RocketMQ在Java后端中的作用是提供高性能、可靠的消息传递机制,解决分布式系统中的异步通信、流量削峰、顺序消息处理等问题,提升系统的可伸缩性和稳定性。
    资料源自:陈天狼1234

    在这里插入图片描述

    rockemq的运行流程:
    1.生产者和消费者在启动时会向Name Server注册自己的信息,包括IP地址、端口号等基本信息,以便于Name Server能够为它们提供服务路由信息,此外broker节点通过心跳机制将自己的信息定时上报到Name Server中
    2.当生产者要发送消息时,首先需要通过Name Server获取指定Topic目前可用的Broker(可以是多个),然后根据负载均衡算法选择其中一个Broker发送消息。消息会被刷盘机制持久化存储
    3.消息在经过同步刷盘或异步刷盘机制持久化存储之后,会被放入该Topic对应的队列中。
    4.消费者在订阅指定的Topic和Tag之后,会从Broker服务器中拉取消息进行消费。消费者首先会向Name Server获取指定Topic目前可用的Broker,然后根据自身消费能力获取消息。

    同步刷盘:消息从内存 立马同步存储到磁盘 然后发送 ack 给消息生产者,安全可靠
    异步刷盘:就是开一条线程去异步存储到磁盘,然后不管有没有存储成功返回ack,有消息丢失的风险,性能高,适合存储日志消息。

    1.生产者发送消息的三种方式

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import java.nio.charset.StandardCharsets;
    
    
    public class producer {
        public static void main(String[] args) throws Exception {
            //创造生产者
            DefaultMQProducer producer = new DefaultMQProducer("helloRocketMQ");
            //关联NameServer地址
            producer.setNamesrvAddr("192.168.200.130:9876");
            //producer和NameServer建立连接,获取broker地址和端口,并和broker建立连接
            producer.start();
            String topic="helloTopic";
    
            //封装消息
            Message message = new Message(topic,("消息").getBytes(StandardCharsets.UTF_8));
            //发送
            /**
             * 1.同步消息:执行send后需要等待,消息发送到中间件,持久化(内存和磁盘都保存)之后返回结果
             * 优点:消息可靠性相对较高
             * 缺点:发送消息需要等待,响应比较慢
             * 场景:简单的使用,追求可靠性,性能不需要太高
             * SendResult sendResult = producer.send(message);
             * System.out.println("发送结果:"+sendResult.getSendStatus());
             *
             */
    
            /**
             * 2.异步发送:主线程main执行send后马上往下执行,发送结果或异常会使用netty的线程接受
             * 优点:发送消息无需要等待,响应快
             * 缺点:可靠性不高。但可以在发送消息后将消息入库+定时任务扫描,增加消息的可靠性
             * 场景:高并发
             *         producer.send(message, new SendCallback() {
             *             @Override
             *             public void onSuccess(SendResult sendResult) {
             *                 System.out.println("发送结果:"+sendResult.getSendStatus()+",处理的线程"+Thread.currentThread());
             *             }
             *
             *             @Override
             *             public void onException(Throwable throwable) {
             *                 System.out.println("消息发送发生异常"+",处理的线程"+Thread.currentThread());
             *             }
             *         });
             *         //避免发送完成后连接shutdown
             *         TimeUnit.SECONDS.sleep(5);
             */
    
            /**
             * 3.一次性发送:只发生一次,只管发送,不关心消息是否存储到消息中间件
             * 优点:性能高
             * 缺点:有消息丢失的风险
             * 场景:日志
             * producer.sendOneway(message);
             */
    
    
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    2.刷盘机制(中间件===>磁盘)

    在rocketmq/conf/broker.conf文件中有刷盘方式的配置

    在这里插入图片描述

    3.消息的消费方式(点对点,广播)

    在这里插入图片描述

    RocketMQ是一个开源的消息中间件,它支持多种消费模式,以满足不同场景下的需求。以下是 RocketMQ 支持的几种消费模式:

    1.集群消费模式(Clustering Mode):多个消费者以集群的方式同时消费同一个主题下的消息。消息会均匀地分发给每个消费者进行处理,每条消息只会被消费者组中的一个消费者消费,从而提高消息的处理吞吐量和容错能力。

    2.广播消费模式(Broadcasting Mode):每个消费者都会独立地接收并消费同一主题下的所有消息,实现对消息的广播消费。适用于需要所有消费者都能独立处理消息的场景,比如日志采集等。

    3.顺序消费模式(Orderly Consumption):保证相同消息队列中的消息按照严格的顺序被消费。这种方式适用于对消息处理顺序有严格要求的场景,比如订单处理等。

    4.并行消费模式(Concurrent Consumption):在应用程序中创建多个线程来同时消费消息,提升消息的处理效率。适用于需要快速处理大量消息的场景,比如数据分析等。

    4.延迟队列和延迟消息

    延迟队列:消息已经发送到了队列中对消费者不可见
    延迟消息:延迟一定时间才发送消息到队列中
    本质都是一个定时任务(官网)

    在这里插入图片描述

    5.消息过滤

    根据tags标签可以过滤消息

    发送

                    Message msg1 = new Message("filterTest", "Tag", new String("过滤消息").getBytes(Charset.forName("utf-8")));
                    msg1.putUserProperty("i",String.valueOf(i));
    
    • 1
    • 2

    5.1.标签过滤

            // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
            consumer.subscribe("filterTest", "Tag || ....");
    
    • 1
    • 2

    5.1.sql92过滤

            //根据message的属性
            consumer.subscribe("filterTest", MessageSelector.bySql("i>3 and i<8"));
    
    • 1
    • 2

    6.顺序消费

    package com.study.test2;
    
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    
    
    public class producer {
        public static void main(String[] args) throws Exception {
            /**
             * rocketmq默认不是顺序消费消息的,如果需要实现顺序消费,发送消息到同一个队列即可
             */
            //构造订单的各个阶段
            ArrayList<order> orders = new ArrayList<>();
            orders.add(new order(19999L,"付款"));
            orders.add(new order(19999L,"真减库存"));
            orders.add(new order(19999L,"配送"));
            orders.add(new order(19999L,"完成"));
    
            orders.add(new order(19998L,"付款"));
            orders.add(new order(19998L,"真减库存"));
            orders.add(new order(19998L,"配送"));
            orders.add(new order(19998L,"完成"));
            //创造生产者
            DefaultMQProducer producer = new DefaultMQProducer("helloRocketMQ");
            //关联NameServer地址
            producer.setNamesrvAddr("192.168.200.130:9876");
            //producer和NameServer建立连接,获取broker地址和端口,并和broker建立连接
            producer.start();
            String topic="orderlyTopic";
            //消息队列选择器
            MessageQueueSelector selector=new MessageQueueSelector() {
                /**
                 *
                 * @param list mq的队列,默认是4个
                 * @param message 此次的消息
                 * @param o 传递参数
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    System.out.println("队列个数:"+list.size());
                    Long orderId= (Long) o;
                    int index=(int)(orderId%list.size());
                    return list.get(index);
                }
            };
            for (order order : orders) {
                //封装消息
                Message message = new Message(topic,order.toString().getBytes(StandardCharsets.UTF_8));
                //发送  message:消息,selector:队列选择器,order.getOrderId():队列选择器方法的参数
                producer.sendOneway(message,selector,order.getOrderId());
            }
            producer.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    package com.study.test2;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.*;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class consumer {
        public static void main(String[] args) throws MQClientException {
            //顺序消费,单线程监听器即可
            //1.定义消费者组
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
            //2.设置nameserver地址
            consumer.setNamesrvAddr("192.168.200.130:9876");
            String topic="orderlyTopic";
            //3.订阅topic
            consumer.subscribe(topic,"*");
            //4.设置消息监听器  MessageListenerOrderly 单线程监听器
            consumer.setMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                    for (MessageExt messageExt : list) {
                        //处理业务
                        System.out.println("当前线程:"+Thread.currentThread()+",消息内容:"+new String(messageExt.getBody())+",存储的队列:"+consumeOrderlyContext.getMessageQueue());
                    }
                    return ConsumeOrderlyStatus.SUCCESS;//  ack成功
    
            }
        });
            //5.建立连接
            consumer.start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    6.发送失败的重试
    消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略
    发送方式是同步或异步时,抛出异常,才会触发重试机制,重试机制会尽可能保证消息会不丢失。但是可能会造成消息的重复消费
    rocketMQTemplate的ymal配置可以配置重试次数

    7.消费失败的重试 死信队列
    死信队列:当消息无法正常消费时,消息会触发重试机制,消息会被发送到重试队列。如果一直消费失败,重试了16次之后,消息就会发送到死信队列,这个消息也成为死信消息
    8.消息的重复消费问题
    消息的重复消费问题RocketMQ中很难避免,因为消息在发送和消费的过程中都有的重试机制,极容易造成消息会被多次执行。那么应该如何处理此次问题呢?
    常用的解决方案有两种
    1.创建一个消息的log表,消息要有一个唯一的主键,每次消费消息前都往该表插入数据。插入成功表示该消息没有被消费,反则被消费了。注意该方法需要被事务处理。
    2.对于一些操作有状态值的字段,是天然幂等的,只需要执行前判断状态即可。如,订单状态的修改待支付===》已支付,执行前判断状态是否为已支付即可。

  • 相关阅读:
    js的indexOf方法
    使用git上传代码至gitee入门(1)
    使用Wireshark软件抓包(分析报文)
    【前沿】数据目录是什么?您为何需要它?
    数字孪生园区场景中的坐标知识
    Aspose.OCR for Java Crack by Xacker
    AWS上迁移WordPress遭遇若干问题记处理办法
    数据库的基本操作(期末复习大全)
    如何能在项目具体编码实现之前能尽可能早的发现问题并解决问题
    go语言包管理和变量保护
  • 原文地址:https://blog.csdn.net/qq_56533553/article/details/132359564