• 在Broker端进行消息过滤


            在Broker端进行消息过滤,可以减少无效消息发送到Consumer,少占用网络带宽从而提高吞吐量。Broker端有三种方式进行消息过滤。

    1.消息的Tag和Key

            对一个应用来说,尽可能只用一个Topic,不同的消息子类型用Tag来标识(每条消息只能有一个Tag),服务器端基于Tag进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了Tag以后,消费方在订阅消息时,才可以利用Tag在Broker端做消息过滤。其次是消息的Key。对发送的消息设置好Key,以后可以根据这个Key来查找消息。所以这个Key一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker会创建专门的索引文件,来存储Key到消息的映射,由于是哈希索引,应尽量使Key唯一,避免潜在的哈希冲突。Tag和Key的主要差别是使用场景不同,Tag用在Consumer的代码中,用来进行服务端消息过滤,Key主要用于通过命令行查询消息。

    2.通过Tag进行过滤

            用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效,Broker端可以在ConsumeQueue中做这种过滤,只从CommitLog里读取过滤后被命中的消息。看一下ConsumerQueue的存储格式,如图7-1所示。

    图7-1 ConsumerQueue的存储格式

    Consume Queue的第三部分存储的是Tag对应的hashcode,是一个定长的字符串,通过Tag过滤的过程就是对比定长的hashcode。经过hashcode对比,符合要求的消息被从CommitLog读取出来,不用担心Hash冲突问题,消息在被消费前,会对比完整的Message Tag字符串,消除Hash冲突造成的误读。

    3.用SQL表达式的方式进行过滤

            使用Tag方式过滤虽然高效,但是支持的逻辑比较简单,在构造Message的时候,还可以通过putUserProperty函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑,如代码清单7-1所示。

    代码清单7-1 在消息中增加自定义属性

    Message msg = new Message("TopicTest",
        tag,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
    );
    // Set some properties.
    msg.putUserProperty("a", String.valueOf(i));
    msg.putUserProperty("b",  “hello”);

     

    代码中这个消息就有了两个特殊的属性值a和b,我们用类似SQL表达式的方式对消息进行过滤,用法如下(目前只支持在PushConsumer中实现这种过滤):

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  // only subsribe messages have property a, also a >=0 and a <= 3

    consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
    consumer.registerMessageListener(new MessageListenerConcurrently()
    {    

    @Override    

    public ConsumeConcurrentlyStatus consumeMessage
        (List msgs, ConsumeConcurrentlyContext context)
    {        

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    

    }

    });

    consumer.start();
     

    类似SQL的过滤表达式,支持如下语法:

    ·数字对比,比如>、>=、<、<=、BETWEEN、=;

    ·字符串对比,比如=、<>、IN;

    ·IS NULL or IS NOT NULL;

    ·逻辑符号AND、OR、NOT。

    支持的数据类型:

    ·数字型,比如123、3.1415;

    ·字符型,比如'abc'、注意必须用单引号;

    ·NULL,这个特殊字符;

    ·布尔型,TRUEorFALSE。

    SQL表达式方式的过滤需要Broker先读出消息里的属性内容,然后做SQL计算,增大磁盘压力,没有Tag方式高效。

    4.Filter Server方式过滤

            Filter Server是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。要使用Filter Server,首先要在启动Broker前在配置文件里加上filterServer-Nums=3这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。实现过滤逻辑的示例如代码清单7-2所示。

    代码清单7-2 实现过滤逻辑的代码示例

    public class MessageFilterImpl implements MessageFilter {
        @Override
        public boolean match(MessageExt msg) {
            String property = msg.getUserProperty("SequenceId");
            if (property != null) {
                int id = Integer.parseInt(property);
                if ((id % 3) == 0 && (id > 10)) {
                    return true;
                }
            }
            return false;
        }
    }

     

    上面代码实现了过滤逻辑,它是根据消息的“SequenceId”这个属性来过滤的,其实不一定要根据消息属性来过滤,也可以根据消息体的内容或其他特征过滤,如代码清单7-3所示。

    代码清单7-3 使用FilterServer的Consumer示例

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer-GroupNamecc4");
        // 使用Java代码,在服务器做消息过滤
        String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
            consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode);
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }

     

    在使用Filter Server的Consumer例子中,主要是把实现过滤逻辑的类作为参数传到Broker端,Broker端的Filter Server会解析这个类,然后根据match函数里的逻辑进行过滤。

  • 相关阅读:
    各种LLM数据集包括SFT数据集
    分布式节能聚类算法(Matlab代码实现)
    promise实现koa2洋葱中间件模型
    萌新小白必做题(1):找两数间的最大公约数与最小公倍数
    外包干了2个月,技术退步明显...
    【测试工具】UnixBench 测试
    【Paper Note】利用Boundary-aware Attention边界感知注意力机制增强部分伪造音频定位
    网络安全——DNS域传送漏洞
    Kafka MQ 主题和分区
    【andriod】APP登录设备云FlexManager平台API接口
  • 原文地址:https://blog.csdn.net/zhao_god/article/details/134433760