• 【RocketMQ 二十三】RocketMQ应用之消息过滤


    消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。
    对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。

    1 Tag过滤

    通过consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。

    DefaultMQPushConsumer consumer = new
    DefaultMQPushConsumer("CID_EXAMPLE");
    consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
    
    • 1
    • 2
    • 3

    2 SQL过滤

    SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。

    SQL过滤表达式中支持多种常量类型与运算符。

    支持的常量类型:

    • 数值:比如:123,3.1415
    • 字符:必须用单引号包裹起来,比如:‘abc’
    • 布尔:TRUE 或 FALSE
    • NULL:特殊的常量,表示空

    支持的运算符有:

    • 数值比较:>,>=,<,<=,BETWEEN,=
    • 字符比较:=,<>,IN
    • 逻辑运算 :AND,OR,NOT
    • NULL判断:IS NULL 或者 IS NOT NULL

    默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:

    enablePropertyFilter 1 = true

    在启动Broker时需要指定这个修改过的配置文件。例如对于单机Broker的启动,其修改的配置文件是conf/broker.conf,启动时使用如下命令:

    sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

    3 代码举例

    定义Tag过滤Producer

    public class FilterByTagProducer {
    public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("pg");
    producer.setNamesrvAddr("rocketmqOS:9876");
    producer.start();
    String[] tags = {"myTagA","myTagB","myTagC"};
    for (int i = 0; i < 10; i++) {
    byte[] body = ("Hi," + i).getBytes();
    String tag = tags[i%tags.length];
    Message msg = new Message("myTopic",tag,body);
    SendResult sendResult = producer.send(msg);
    System.out.println(sendResult);
    }
    producer.shutdown();
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    定义Tag过滤Consumer

    public class FilterByTagConsumer {
    public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new
    DefaultMQPushConsumer("pg");
    consumer.setNamesrvAddr("rocketmqOS:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
    );
    consumer.subscribe("myTopic", "myTagA || myTagB");
    consumer.registerMessageListener(new
    MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus
    consumeMessage(List<MessageExt> msgs,
    ConsumeConcurrentlyContext context) {
    for (MessageExt me:msgs){
    System.out.println(me);
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();
    System.out.println("Consumer Started");
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    定义SQL过滤Producer

    public class FilterBySQLProducer {
    public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("pg");
    producer.setNamesrvAddr("rocketmqOS:9876");
    producer.start();
    for (int i = 0; i < 10; i++) {
    try {
    byte[] body = ("Hi," + i).getBytes();
    Message msg = new Message("myTopic", "myTag", body);
    msg.putUserProperty("age", i + "");
    SendResult sendResult = producer.send(msg);
    System.out.println(sendResult);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    producer.shutdown();
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    定义SQL过滤Consumer

    public class FilterBySQLConsumer {
    public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new
    DefaultMQPushConsumer("pg");
    consumer.setNamesrvAddr("rocketmqOS:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
    );
    consumer.subscribe("myTopic", MessageSelector.bySql("age between
    0 and 6"));
    consumer.registerMessageListener(new
    MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus
    consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
    context) {
    for (MessageExt me:msgs){
    System.out.println(me);
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();
    System.out.println("Consumer Started");
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
  • 相关阅读:
    Linux(基于Centos7)(三)
    安装Java (JDK16)
    AI智慧安防智能监控平台如何做到健身房智能视频监控?
    了解操作系统,什么是操作系统Operation System?
    SpringMVC源码解析
    js逆向播放量增加,增加视频热度,uuid,sid,buvid3,aid,b_lsid, b_nut 还原实现过程
    AI时代设计工具Motiff亮相世界互联网大会 带来AI在SaaS领域落地应用案例
    【Java JVM】垃圾回收
    在vs中创建linux项目
    《React扩展知识一》setState更新状态2种写法/ lazyLoad / Fragment / Context / Hooks
  • 原文地址:https://blog.csdn.net/qq_33333654/article/details/126520624