在开始之前,请确保已完成以下准备工作:
假设我们已经在 pom.xml
文件中添加了 RocketMQ 的依赖:
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-clientartifactId>
<version>4.9.3version>
dependency>
我们首先来看一下如何实现消息的生产者。生产者的主要任务是将消息发送到指定的主题(Topic)中,并为消息指定标签(Tag)。
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;
/**
* 生产者示例
*/
public class GTagTest {
@Test
public void tagProducer() throws Exception {
// 创建生产者实例并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("tag_producer_group");
// 设置 NameServer 地址
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 启动生产者
producer.start();
// 创建并发送消息
Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());
Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());
producer.send(message);
producer.send(message2);
System.out.println("消息发送成功");
// 关闭生产者
producer.shutdown();
}
}
在上面的代码中,我们创建了一个名为 tagProducer
的测试方法,用于发送两条消息到 tagTopic
主题中。这两条消息分别携带了不同的标签(tag),vip1
和 vip2
。
接下来,我们来看一下如何实现消息的消费者。消费者的任务是从指定的主题中订阅并消费消息,并根据标签进行过滤。
@Test
public void tagConsumer1() throws Exception {
// 创建消费者实例并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
// 设置 NameServer 地址
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 订阅指定主题和标签
consumer.subscribe("tagTopic", "vip1");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext
) {
System.out.println("我是vip1的消费者,我正在消费消息" + new String(list.get(0)
.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
// 阻塞当前线程,保持消费者运行
System.in.read();
}
@Test
public void tagConsumer2() throws Exception {
// 创建消费者实例并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
// 设置 NameServer 地址
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 订阅指定主题和多个标签
consumer.subscribe("tagTopic", "vip1||vip2");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext
) {
System.out.println("我是vip2的消费者,我正在消费消息" + new String(list.get(0)
.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
// 阻塞当前线程,保持消费者运行
System.in.read();
}
在上述代码中,我们创建了两个消费者,tagConsumer1
和 tagConsumer2
。第一个消费者只订阅 vip1
标签的消息,第二个消费者订阅 vip1
和 vip2
标签的消息。
tagProducer
方法,发送两条消息。tagConsumer1
和 tagConsumer2
方法,观察控制台输出。通过上述步骤,我们可以看到 tagConsumer1
只消费了 vip1
标签的消息,而 tagConsumer2
消费了 vip1
和 vip2
标签的消息。
本文介绍了如何使用 RocketMQ 实现基于标签过滤的消息生产和消费,通过示例代码展示了生产者和消费者的具体实现方式。希望本文能够帮助你更好地理解和使用 RocketMQ。