目录
在RockerMQ中可以用topic将业务划分,例如将订单、商品、活动等业务划分在不同的topic。为了使业务逻辑更清晰还可以用tag再次划分,例如将订单划分为服装订单、家电订单、酒水订单等。
在服务端采用的是tag的hashcode过滤,当消费者的tag与订阅的queue中消息的tag的hashcode一致时就会直接返回,但这样只能过滤大部分tag,因为存在hash碰撞,所以还要在客户端还要根据tag值进行过滤
服务端使用hashcode过滤是因为减少了不必要的网络传输,并且hashcode过滤快,底层直接可以使用位运算
- public static void main(String[] args) {
- try {
- DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
- producer.setNamesrvAddr("localhost:9876");
- producer.start();
- // 建立了标签为tag_a的消息
- Message msg = new Message("topic_a", "tag_a", ("test").getBytes());
- producer.send(msg);
- producer.shutdown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
发送完之后在console中就能看到消息标签那列存在定义的值

- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
- consumer.setNamesrvAddr("localhost:9876");
- // consumer.subscribe("topic_a", "*");
- consumer.subscribe("topic_a", "tag_a");
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
- for (MessageExt msg : msgList) {
- System.out.println(new String(msg.getBody()));;
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- consumer.start();
- }
为*时不过滤任何消息,需要订阅多个tag用||隔开,例如tag_a||tag_b
在某些情况下,可能需要更复杂的过滤条件,这时候就可以使用sql过滤,sql过滤性能比tag低,只定义了一些基本的语法,如下:
>,>=,<,<=,BETWEEN,=;=,<>,IN;IS NULL或者 IS NOT NULL;AND,OR,NOT;常量支持类型为:
123,3.1415;'abc',必须用单引号包裹起来;NULL,特殊的常量;TRUE 或 FALSE需要在conf/broker.conf下添加以下配置,不然会报错
enablePropertyFilter=true
启动时使用指定的broker.conf文件
./mqbroker -n localhost:9876 -c ../conf/broker.conf
- public static void main(String[] args) {
- try {
- DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
- producer.setNamesrvAddr("localhost:9876");
- producer.start();
- Message msg = new Message("topic_a", ("test").getBytes());
- msg.putUserProperty("age", "11");
- msg.putUserProperty("name", "张三");
- producer.send(msg);
- producer.shutdown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
- consumer.setNamesrvAddr("localhost:9876");
- // 订阅age大于10并且name是张三的消息
- consumer.subscribe("topic_a", MessageSelector.bySql("age > 10 and name = '张三'"));
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
- for (MessageExt msg : msgList) {
- System.out.println(new String(msg.getBody()));;
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- consumer.start();
- }