• RocketMQ的tag过滤和sql过滤


    目录

    tag过滤

    应用场景

    原理

    示例代码

    生产者代码

    消费者代码

    sql过滤 

    介绍

    配置

    实例代码

    生产者

    消费者


    tag过滤

    应用场景

    在RockerMQ中可以用topic将业务划分,例如将订单、商品、活动等业务划分在不同的topic。为了使业务逻辑更清晰还可以用tag再次划分,例如将订单划分为服装订单、家电订单、酒水订单等。

    原理

    在服务端采用的是tag的hashcode过滤,当消费者的tag与订阅的queue中消息的tag的hashcode一致时就会直接返回,但这样只能过滤大部分tag,因为存在hash碰撞,所以还要在客户端还要根据tag值进行过滤

    服务端使用hashcode过滤是因为减少了不必要的网络传输,并且hashcode过滤快,底层直接可以使用位运算

    示例代码

    生产者代码

    1. public static void main(String[] args) {
    2. try {
    3. DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
    4. producer.setNamesrvAddr("localhost:9876");
    5. producer.start();
    6. // 建立了标签为tag_a的消息
    7. Message msg = new Message("topic_a", "tag_a", ("test").getBytes());
    8. producer.send(msg);
    9. producer.shutdown();
    10. } catch (Exception e) {
    11. e.printStackTrace();
    12. }
    13. }

    发送完之后在console中就能看到消息标签那列存在定义的值

    消费者代码

    1. public static void main(String[] args) throws Exception {
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
    3. consumer.setNamesrvAddr("localhost:9876");
    4. // consumer.subscribe("topic_a", "*");
    5. consumer.subscribe("topic_a", "tag_a");
    6. consumer.setMessageModel(MessageModel.CLUSTERING);
    7. consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
    8. for (MessageExt msg : msgList) {
    9. System.out.println(new String(msg.getBody()));;
    10. }
    11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    12. });
    13. consumer.start();
    14. }

    为*时不过滤任何消息,需要订阅多个tag用||隔开,例如tag_a||tag_b 

    sql过滤 

    介绍

    在某些情况下,可能需要更复杂的过滤条件,这时候就可以使用sql过滤,sql过滤性能比tag低,只定义了一些基本的语法,如下:

    • 数值比较,比如:>>=<<=BETWEEN=
    • 字符比较,比如:=<>IN
    • IS NULL或者 IS NOT NULL;
    • 逻辑符号 ANDORNOT

    常量支持类型为:

    • 数值,比如:1233.1415
    • 字符,比如:'abc',必须用单引号包裹起来;
    • NULL,特殊的常量;
    • 布尔值,TRUE 或 FALSE

    配置

    需要在conf/broker.conf下添加以下配置,不然会报错

    enablePropertyFilter=true

    启动时使用指定的broker.conf文件

    ./mqbroker -n localhost:9876 -c ../conf/broker.conf 

    实例代码

    生产者

    1. public static void main(String[] args) {
    2. try {
    3. DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
    4. producer.setNamesrvAddr("localhost:9876");
    5. producer.start();
    6. Message msg = new Message("topic_a", ("test").getBytes());
    7. msg.putUserProperty("age", "11");
    8. msg.putUserProperty("name", "张三");
    9. producer.send(msg);
    10. producer.shutdown();
    11. } catch (Exception e) {
    12. e.printStackTrace();
    13. }
    14. }

    消费者

    1. public static void main(String[] args) throws Exception {
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
    3. consumer.setNamesrvAddr("localhost:9876");
    4. // 订阅age大于10并且name是张三的消息
    5. consumer.subscribe("topic_a", MessageSelector.bySql("age > 10 and name = '张三'"));
    6. consumer.setMessageModel(MessageModel.CLUSTERING);
    7. consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
    8. for (MessageExt msg : msgList) {
    9. System.out.println(new String(msg.getBody()));;
    10. }
    11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    12. });
    13. consumer.start();
    14. }

  • 相关阅读:
    数据结构:循环队列
    十大经典排序算法——插入排序与希尔排序(超详解)
    ESP32网络开发实例-Web显示传感器实时数据
    如何调试前端应用程序?
    账号攻击的几种常见手法
    Linux环境下 安装部署mysql
    Rust通用编程概念(3)
    IP-Guard批量部署客户端的方法有哪些?
    MySql数据库的初步安装与管理
    Selective Search学习笔记
  • 原文地址:https://blog.csdn.net/qq_35597828/article/details/125524337