• RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息


    RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息

    发送同步消息

    同步消息

    生产者发送消息,mq进行确认,然后返回给生产者状态。这就是同步消息。

    前文demo程序就是发送的同步消息。

    发送异步消息

    异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。

    代码如下:

    /**
         * 异步消息测试
         */
    @Test
    public void simpleAsyncProducer() throws Exception {
        //创建一个生产者,并指定一个组名
        DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
        //连接namesrv,参数是namesrv的ip地址:端口号
        producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        //启动
        producer.start();
    
    
        //指定topic,创建一个消息
        Message message = new Message("asyncTopic1", "这是一条异步消息".getBytes());
    
        //发送异步消息,并设置回调内容
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("回调内容,发送成功");
            }
    
            @Override
            public void onException(Throwable throwable) {
                log.info("回调内容,发送失败");
            }
        });
    
    
        log.info("主线程执行中=========");
    
        System.in.read();
    }
    

    运行结果

    从运行结果可以看到是不同的线程输出的内容。

    发送单向消息

    这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送。

    代码如下:

    @Test
    public void oneWayMessageTest() throws Exception {
        //创建一个生产者,并指定一个组名
        DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
        //连接namesrv,参数是namesrv的ip地址:端口号
        producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        //启动
        producer.start();
    
    
        //指定topic,创建一个消息
        Message message = new Message("onewayTopic1", "这是一条单向消息".getBytes());
    
        //发送单向消息
        producer.sendOneway(message);
    
        producer.shutdown();
    }
    

    发送延迟消息

    消息放入mq后,过一段时间,才会被监听到,然后消费.

    比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

    代码如下

    @Test
    public void msMessageTest() throws Exception{
        //创建一个生产者,并指定一个组名
        DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
        //连接namesrv,参数是namesrv的ip地址:端口号
        producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        //启动
        producer.start();
    
        //指定topic,创建一个消息
        Message message = new Message("msTopic1", "这是一条单向消息".getBytes());
        //给消息设置一个延迟时间
        message.setDelayTimeLevel(3);
    
        //发送延时消息
        producer.sendOneway(message);
    
        producer.shutdown();
    }
    

    延时等级如下:

    消息延时等级

    发送批量消息

    代码如下:

    @Test
    public void testBatchProducer() throws Exception {
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("test-batch-group");
        //连接namesrv,参数是namesrv的ip地址:端口号
        producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        // 启动实例
        producer.start();
        List msgs = Arrays.asList(
            new Message("batchTopicTest", "我是一组消息的A消息".getBytes()),
            new Message("batchTopicTest", "我是一组消息的B消息".getBytes()),
            new Message("batchTopicTest", "我是一组消息的C消息".getBytes())
    
        );
        SendResult send = producer.send(msgs);
        System.out.println(send);
        // 关闭实例
        producer.shutdown();
    }
    

    这些消息会被放到同一个队列中。

    发送顺序消息

    可以想象一个场景,我们在网上购物时,需要先完成下订单操作,然后再去发短信,再进行发货,需要保证顺序的。

    前文我们讲的都是并发消息,这种消息并不能完成上述的场景逻辑。比如一个topic里有10个消息,分别在4个队列中;

    • 如果消费者,同时有20个线程在消费,可能A线程拿到消息1了,B线程拿到消息2了,但是B线程可能完成的比A线程早,这就没办法上述场景的顺序了。
    • 如果消费者只有一个线程,轮询消费四个队列中的消息时,也不能保证是网购场景中的顺序的。

    这就要引出顺序消息:把消费者变为单线程,把下订单消息、发短信消息、发货消息放到同一个队列就可以了。

    代码

    消息封装成实体类如下:

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class MessageModel {
        //订单id
        private String orderId;
        //用户id
        private String userId;
        //消息描述
        private String description;
    }
    

    发送顺序消息的生产者代码如下:

    /**
         * 顺序消息
         */
    
    @Test
    public void testOrderlyProducer() throws Exception {
    
        List messageModelList = Arrays.asList(
            //用户1的订单
            new MessageModel("order-111","user-1","下单"),
            new MessageModel("order-111","user-1","发短信"),
            new MessageModel("order-111","user-1","发货"),
    
            //用户2的订单
            new MessageModel("order-222","user-2","下单"),
            new MessageModel("order-222","user-2","发短信"),
            new MessageModel("order-222","user-2","发货")
        );
    
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("test-orderly-group");
        //连接namesrv,参数是namesrv的ip地址:端口号
        producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        // 启动实例
        producer.start();
    
        //发送顺序消息时 发送时相同用户的消息要保证有序,并且发到同一个队列里
        messageModelList.forEach(
            messageModel->{
                Message message = new Message("orderlyTopic", messageModel.toString().getBytes());
                try {
                    //发送消息,相同订单号去相同队列
                    producer.send(message, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List mqs, Message message, Object arg) {
                            //producer.send(message,selector,arg),第三个参数订单号会传给selector要实现的方法的arg
                            //在这里选择队列
                            int hashCode = Math.abs(arg.toString().hashCode());
                            int index = hashCode % mqs.size();
                            return mqs.get(index);
                        }
                    }, messageModel.getOrderId());
                } catch (Exception e) {
                    log.error("有错误发生",e);
                }
            }
        );
        // 关闭实例
        producer.shutdown();
        log.info("发送完成");
    }
    

    消费顺序消息的消费者代码如下:

    //消费者
    @Test
    public void orderlyConsumer() throws Exception {
        //创建一个消费者,并指定一个组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-orderly-consumer-group");
        //连接namesrv,参数是namesrv的ip地址:端口号
        consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        //订阅一个主题 *号表示订阅这个主题中所有的消息
        consumer.subscribe("orderlyTopic","*");
        //设置一个监听器(一直监听,异步回调方式)
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                log.info("线程id"+Thread.currentThread().getId());
                log.info("消息内容:"+new String(msgs.get(0).getBody()));
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
    
        //启动消费者
        consumer.start();
    
        //挂起当前jvm,防止主线程结束,让监听器一直监听
        System.in.read();
    
    }
    
    

    运行结果如下:

    运行结果

    可以看到同一个订单是顺序消费的。

    其他问题

    如果我们的消息消费失败了怎么办?

    如果是并发模式,消费失败会进行重试,重试16次后还会没消费成功,会被放到死信队列里。

    如果是顺序模式,如果重试失败,会无限重试,是int的最大值。

    发送带标签的消息,消息过滤

    如果我们有衣服订单的消息、手机订单的消息,如果我们只使用topic进行区分,就要使用两个topic;但是它们都是订单,所以在同一个topic中会好一些,Rocketmq就提供了消息过滤功能,通过tag或者key进行区分。

    生产者代码如下:

    @Test
    public void testTagProducer() throws Exception {
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("test-tag-group");
        //连接namesrv,参数是namesrv的ip地址:端口号
        producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        // 启动实例
        producer.start();
        Message messageTopic1 = new Message("tagTopic", "tag1", "这是topic1的消息".getBytes());
        Message messageTopic2 = new Message("tagTopic", "tag2", "这是topic2的消息".getBytes());
        producer.send(messageTopic1);
        producer.send(messageTopic2);
        // 关闭实例
        producer.shutdown();
    }
    

    消费tag1的消费者

    //消费tag1的消费者
    @Test
    public void tagConsumer1() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
        consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        consumer.subscribe("tagTopic", "tag1");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                System.out.println("我是tag1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.in.read();
    }
    

    消费tag1和tag2的消费者

    //消费tag1和tag2的消费者
    @Test
    public void tagConsumer2() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
        consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        consumer.subscribe("tagTopic", "tag1 || tag2");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                System.out.println("我是tag1和tag2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.in.read();
    }
    

    带key的消息

    消息都会有自己的MessageId的,如下图:

    messageid

    那我们能否指定id呢?

    在发送消息时可以指定key:

    @Test
    public void testKeyProducer() throws Exception {
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("test-key-group");
        //连接namesrv,参数是namesrv的ip地址:端口号
        producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    
        String key = UUID.randomUUID().toString();
    
        // 启动实例
        producer.start();
        Message messageTopic1 = new Message("keyTopic", "tag1",key, "这是topic1的消息".getBytes());
        producer.send(messageTopic1);
        // 关闭实例
        producer.shutdown();
    }
    

    消费者获取key:

    @Test
    public void testKeyConsumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group-a");
        consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        consumer.subscribe("keyTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                System.out.println("我们设置的key:" + msgs.get(0).getKeys());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.in.read();
    }
    

    输出如下:

    输出

  • 相关阅读:
    Flask笔记一之项目搭建、配置项导入
    [附源码]Python计算机毕业设计SSM教师教学质量评价系统(程序+LW)
    Xshell连接不上创建的虚拟机
    CSS层级小技巧:在滚动时自动添加头部阴影
    AI在玩一种很新的艺术,700万网友在线围观,ControlNet又立功了
    电磁场知识回顾——基本概念
    正厚软件|UIAutomator2的使用教程(上)
    一文搞懂 ARM 64 系列: 一文搞懂 ARM 64 系列: 函数调用传参与返回值
    总结:linux笔记-004
    奇迹1.03H服务端开服架设文件Data文件详解
  • 原文地址:https://www.cnblogs.com/nicaicai/p/18005528