• RocketMQ的tag过滤和sql过滤


    目录

    tag过滤

    应用场景

    原理

    示例代码

    生产者代码

    消费者代码

    sql过滤 

    介绍

    配置

    实例代码

    生产者

    消费者


    tag过滤

    应用场景

    在RockerMQ中可以用topic将业务划分,例如将订单、商品、活动等业务划分在不同的topic。为了使业务逻辑更清晰还可以用tag再次划分,例如将订单划分为服装订单、家电订单、酒水订单等。

    原理

    在服务端采用的是tag的hashcode过滤,当消费者的tag与订阅的queue中消息的tag的hashcode一致时就会直接返回,但这样只能过滤大部分tag,因为存在hash碰撞,所以还要在客户端还要根据tag值进行过滤

    服务端使用hashcode过滤是因为减少了不必要的网络传输,并且hashcode过滤快,底层直接可以使用位运算

    示例代码

    生产者代码

    1. public static void main(String[] args) {
    2. try {
    3. DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
    4. producer.setNamesrvAddr("localhost:9876");
    5. producer.start();
    6. // 建立了标签为tag_a的消息
    7. Message msg = new Message("topic_a", "tag_a", ("test").getBytes());
    8. producer.send(msg);
    9. producer.shutdown();
    10. } catch (Exception e) {
    11. e.printStackTrace();
    12. }
    13. }

    发送完之后在console中就能看到消息标签那列存在定义的值

    消费者代码

    1. public static void main(String[] args) throws Exception {
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
    3. consumer.setNamesrvAddr("localhost:9876");
    4. // consumer.subscribe("topic_a", "*");
    5. consumer.subscribe("topic_a", "tag_a");
    6. consumer.setMessageModel(MessageModel.CLUSTERING);
    7. consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
    8. for (MessageExt msg : msgList) {
    9. System.out.println(new String(msg.getBody()));;
    10. }
    11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    12. });
    13. consumer.start();
    14. }

    为*时不过滤任何消息,需要订阅多个tag用||隔开,例如tag_a||tag_b 

    sql过滤 

    介绍

    在某些情况下,可能需要更复杂的过滤条件,这时候就可以使用sql过滤,sql过滤性能比tag低,只定义了一些基本的语法,如下:

    • 数值比较,比如:>>=<<=BETWEEN=
    • 字符比较,比如:=<>IN
    • IS NULL或者 IS NOT NULL;
    • 逻辑符号 ANDORNOT

    常量支持类型为:

    • 数值,比如:1233.1415
    • 字符,比如:'abc',必须用单引号包裹起来;
    • NULL,特殊的常量;
    • 布尔值,TRUE 或 FALSE

    配置

    需要在conf/broker.conf下添加以下配置,不然会报错

    enablePropertyFilter=true

    启动时使用指定的broker.conf文件

    ./mqbroker -n localhost:9876 -c ../conf/broker.conf 

    实例代码

    生产者

    1. public static void main(String[] args) {
    2. try {
    3. DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
    4. producer.setNamesrvAddr("localhost:9876");
    5. producer.start();
    6. Message msg = new Message("topic_a", ("test").getBytes());
    7. msg.putUserProperty("age", "11");
    8. msg.putUserProperty("name", "张三");
    9. producer.send(msg);
    10. producer.shutdown();
    11. } catch (Exception e) {
    12. e.printStackTrace();
    13. }
    14. }

    消费者

    1. public static void main(String[] args) throws Exception {
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
    3. consumer.setNamesrvAddr("localhost:9876");
    4. // 订阅age大于10并且name是张三的消息
    5. consumer.subscribe("topic_a", MessageSelector.bySql("age > 10 and name = '张三'"));
    6. consumer.setMessageModel(MessageModel.CLUSTERING);
    7. consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
    8. for (MessageExt msg : msgList) {
    9. System.out.println(new String(msg.getBody()));;
    10. }
    11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    12. });
    13. consumer.start();
    14. }

  • 相关阅读:
    [iOS]分析Mach-O文件
    使用逆滤波算法deconvwnr恢复图像回复图像时,产生了很多横竖条纹。解决办法
    使用REPLACE将数据库某一列字段进行字符串操作
    C++-RTTI-运行时类型识别-typeid类型名-dynamic_cast-多继承类型转换-详细分析-Com基础
    跨境电商网站建设 三大跨境品牌厂商综合评测
    二维数组详解
    音频信号分析与实践
    视频融合云平台EasyCVR增加多级分组,可灵活管理接入设备
    安装运行dubbo-admin
    ECharts实现数据可视化入门教程(超详细)
  • 原文地址:https://blog.csdn.net/qq_35597828/article/details/125524337