• RocketMQ


    概念

    MQ:MessageQueue,消息队列,是一种FIFO 先进先出的数据结构。消息由生产者发送到MQ进行排队,然后按原来的顺序交由消息的消费者进行处理。QQ和微信就是典型的MQ。

    优点

    异步

    异步能提高系统的响应速度、吞吐量。

    例子:快递员发快递,直接到客户家效率会很低。引入菜鸟驿站后,快递员只需要把快递放到菜鸟驿站,就可以继续发其他快递去了。客户再按自己的时间安排去菜鸟驿站取快递。

    解耦

    1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。

    2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消

    费,并且消费者的增加或者减少对生产者没有影响。

    例子:《Thinking in JAVA》很经典,但是都是英文,我们看不懂,所以需要编辑社,将文章翻译成其他语言,这样就可以完成英语与其他语言的交流。

    削峰

    以稳定的系统资源应对突发的流量冲击。

    例子:长江每年都会涨水,但是下游出水口的速度是基本稳定的,所以会涨水。引入三峡大坝后,可以把水储存起来,下游慢慢排水。

    缺点

    系统可用性降低

    系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就需要考虑如何保证MQ的高可用。

    系统复杂度提高

    引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保证消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问题。

    消息一致性问题

    A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统处理成功,C系统处理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。

    MQ之间区别

    常用的MQ产品包括Kafka、RabbitMQ和RocketMQ

    MQ类别

    优点

    缺点

    应用场景

    Kafka

    吞吐量非常大、性能非常好、集群高可用

    会丢失数据,功能单一

    日志分析、大数据采集

    RabbitMQ

    消息可靠性高

    吞吐量比较低,会累积消息

    小规模场景

    RocketMQ

    高吞吐、高性能、高可用

    开源版本不如云上版本、客户端只支持java

    几乎适用所有场景

    下载

    RocketMQ的官网地址: http://rocketmq.apache.org

    github地址:https://github.com/apache/rocketmq ,当前最新的版本是4.7.1。我们就用这个4.7.1版本来进行学习

    RocketMQ运行版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip

    RocketMQ源码版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip

    RocketMQ组成

    NameServer : 提供轻量级的Broker路由服务。

    Broker:实际处理消息存储、转发等服务的核心组件。

    Producer:消息生产者集群。通常是业务系统中的一个功能模块。

    Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。

    所以我们要启动RocketMQ服务,需要先启动NameServer。

    Rocket编程模型

    消息发送者

    • 创建消息生产者producer,并制定生产者组名
    • 指定Nameserver地址
    • 启动producer
    • 创建消息对象,指定主题Topic、Tag和消息体
    • 发送消息
    • 关闭生产者producer

    消息消费者

    • 创建消费者Consumer,制定消费者组名
    • 指定Nameserver地址
    • 订阅主题Topic和Tag
    • 设置回调函数,处理消息
    • 启动消费者consumer

    使用java运行RocketMQ

    引入依赖

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.7.1</version>
            </dependency>

    启动消费者

    1. //初始化一个消费者实例 并指定consumerGroup
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    3. //设置nameServer,以便指向Rocket集群
    4. consumer.setNamesrvAddr("192.0.168.107:9876");
    5. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    6. //指定topic和
    7. consumer.subscribe("TopicTest", "*");
    8. consumer.registerMessageListener(new MessageListenerConcurrently() {
    9. @Override
    10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    11. ConsumeConcurrentlyContext context) {
    12. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    13. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    14. }
    15. });
    16. consumer.start();
    17. System.out.printf("Consumer Started.%n");
    18. }

    RocketMQ提供的生产者和消费者指定NameServer的方式:

    1、在代码中指定namesrvAddr属性。例如:consumer.setNamesrvAddr("127.0.0.1:9876");

    2、通过NAMESRV_ADDR环境变量来指定。多个NameServer之间用分号连接。

    启动生产者

    1. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    2. producer.setNamesrvAddr("192.0.168.107:9876");
    3. producer.start();
    4. for (int i = 0; i < 2; i++) {
    5. try {
    6. Message msg = new Message("TopicTest" /* Topic */,
    7. "TagA" /* Tag */,
    8. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    9. );
    10. msg.setDelayTimeLevel(3);
    11. SendResult sendResult = producer.send(msg);
    12. System.out.printf("%s%n", sendResult);
    13. } catch (Exception e) {
    14. e.printStackTrace();
    15. Thread.sleep(1000);
    16. }
    17. }
    18. producer.shutdown();

    生产者生产消息的方式

    单向发送

    是默认消息发送的一种方式,是其他消息发送的基础,指的是直接向mq发送消息,不管发送状态,也没有回调处理,就直接继续往下执行

    1. public static void main(String[] args) throws MQClientException, InterruptedException {
    2. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    3. producer.setNamesrvAddr("192.0.168.107:9876");
    4. producer.start();
    5. for (int i = 0; i < 2; i++) {
    6. try {
    7. Message msg = new Message("TopicTest" /* Topic */,
    8. "TagA" /* Tag */,
    9. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    10. );
    11. msg.setDelayTimeLevel(3);
    12. producer.sendOneway(msg);
    13. } catch (Exception e) {
    14. e.printStackTrace();
    15. Thread.sleep(1000);
    16. }
    17. }
    18. producer.shutdown();
    19. }

    同步发送

    指的是生产者向mq发送消息后,需要等待响应,根据返回的结果判断消息发送的状态,来决定失败后是否需要重试或执行其他处理。

    1. public static void main(String[] args) throws MQClientException, InterruptedException {
    2. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    3. producer.setNamesrvAddr("192.0.168.107:9876");
    4. producer.start();
    5. for (int i = 0; i < 2; i++) {
    6. try {
    7. Message msg = new Message("TopicTest" /* Topic */,
    8. "TagA" /* Tag */,
    9. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    10. );
    11. msg.setDelayTimeLevel(3);
    12. SendResult sendResult = producer.send(msg);
    13. System.out.printf("%s%n", sendResult);
    14. } catch (Exception e) {
    15. e.printStackTrace();
    16. Thread.sleep(1000);
    17. }
    18. }
    19. producer.shutdown();
    20. }
    1. public class SendResult {
    2. //记录消息发送状态
    3. private SendStatus sendStatus;
    4. private String msgId;
    5. private MessageQueue messageQueue;
    6. private long queueOffset;
    7. private String transactionId;
    8. private String offsetMsgId;
    9. private String regionId;
    10. private boolean traceOn = true;
    11. public SendResult() {
    12. }

    异步发送

    指的是生产者向mq发送消息后不等待响应直接就继续往下执行,但会提供一个回调方法给mq让其在收到消息后调用。

    1. public static void main(
    2. String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
    3. DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
    4. producer.setNamesrvAddr("192.0.168.107:9876");
    5. producer.start();
    6. //重试次数
    7. producer.setRetryTimesWhenSendAsyncFailed(0);
    8. int messageCount = 100;
    9. //保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
    10. final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
    11. for (int i = 0; i < messageCount; i++) {
    12. try {
    13. final int index = i;
    14. Message msg = new Message("TopicTest",
    15. "TagA",
    16. "OrderID188",
    17. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    18. //回调方法
    19. producer.send(msg, new SendCallback() {
    20. //发送成功的回调
    21. @Override
    22. public void onSuccess(SendResult sendResult) {
    23. countDownLatch.countDown();
    24. System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    25. }
    26. //发送失败或异常的回调
    27. @Override
    28. public void onException(Throwable e) {
    29. countDownLatch.countDown();
    30. System.out.printf("%-10d Exception %s %n", index, e);
    31. e.printStackTrace();
    32. }
    33. });
    34. System.out.println("消息发送完成");
    35. } catch (Exception e) {
    36. e.printStackTrace();
    37. }
    38. }
    39. countDownLatch.await(5, TimeUnit.SECONDS);
    40. producer.shutdown();
    41. }

    三种发送方式的区别

    单向:不需要管发送状态是否成功,吞吐量大,最快,容易丢数据。

    同步:最慢,安全性最好。

    异步:速度处于两种之间,但会丢数据。

    消费者消费消息的方式

    主动拉取

    过期版

    1. public static void main(String[] args) throws MQClientException {
    2. DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
    3. consumer.setNamesrvAddr("192.0.168.107:9876");
    4. consumer.start();
    5. //MessageQueue是订阅发布消息的最小单位,会均匀的分布在集群中的各个节点
    6. Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
    7. for (MessageQueue mq : mqs) {
    8. System.out.printf("Consume from the queue: %s%n", mq);
    9. SINGLE_MQ:
    10. while (true) {
    11. try {
    12. PullResult pullResult =
    13. //subExpression 表示的就是tag的过滤类型
    14. // offset偏移量,记录消费消息的位置,避免重复消费
    15. //maxNums 一次拉取的消息数量
    16. consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
    17. System.out.printf("%s%n", pullResult);
    18. putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
    19. switch (pullResult.getPullStatus()) {
    20. case FOUND:
    21. break;
    22. case NO_MATCHED_MSG:
    23. break;
    24. case NO_NEW_MSG:
    25. break SINGLE_MQ;
    26. case OFFSET_ILLEGAL:
    27. break;
    28. default:
    29. break;
    30. }
    31. } catch (Exception e) {
    32. e.printStackTrace();
    33. }
    34. }
    35. }
    36. consumer.shutdown();
    37. }
    38. private static long getMessageQueueOffset(MessageQueue mq) {
    39. Long offset = OFFSE_TABLE.get(mq);
    40. if (offset != null)
    41. return offset;
    42. return 0;
    43. }
    44. private static void putMessageQueueOffset(MessageQueue mq, long offset) {
    45. OFFSE_TABLE.put(mq, offset);
    46. }

    非过期版(基础模式)

    1. public static volatile boolean running = true;
    2. public static void main(String[] args) throws Exception {
    3. DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
    4. litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    5. litePullConsumer.subscribe("TopicTest", "*");
    6. litePullConsumer.start();
    7. try {
    8. while (running) {
    9. List<MessageExt> messageExts = litePullConsumer.poll();
    10. System.out.printf("%s%n", messageExts);
    11. }
    12. } finally {
    13. litePullConsumer.shutdown();
    14. }
    15. }

    灵活版(自由管理偏移位置)

    1. public static volatile boolean running = true;
    2. public static void main(String[] args) throws Exception {
    3. DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
    4. litePullConsumer.setAutoCommit(false);
    5. litePullConsumer.start();
    6. Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
    7. List<MessageQueue> list = new ArrayList<>(mqSet);
    8. List<MessageQueue> assignList = new ArrayList<>();
    9. for (int i = 0; i < list.size() / 2; i++) {
    10. assignList.add(list.get(i));
    11. }
    12. //存下队列信息
    13. litePullConsumer.assign(assignList);
    14. //设置offset
    15. litePullConsumer.seek(assignList.get(0), 10);
    16. try {
    17. while (running) {
    18. List<MessageExt> messageExts = litePullConsumer.poll();
    19. System.out.printf("%s %n", messageExts);
    20. litePullConsumer.commitSync();
    21. }
    22. } finally {
    23. litePullConsumer.shutdown();
    24. }
    25. }

    等待推送

    1. public static void main(String[] args) throws InterruptedException, MQClientException {
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    3. consumer.setNamesrvAddr("192.0.168.107:9876");
    4. //设置topic和tag进行过滤
    5. consumer.subscribe("TopicTest", "*");
    6. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    7. consumer.setConsumeTimestamp("20181109221800");
    8. //定义监听器,等待Broker推送过来
    9. consumer.registerMessageListener(new MessageListenerConcurrently() {
    10. @Override
    11. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    12. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    13. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    14. }
    15. });
    16. consumer.start();
    17. System.out.printf("Consumer Started.%n");
    18. }
    1. public enum ConsumeConcurrentlyStatus {
    2. CONSUME_SUCCESS,
    3. RECONSUME_LATER;
    4. private ConsumeConcurrentlyStatus() {
    5. }
    6. }

    消息类型

    顺序消息

    生产者生产的消息

    在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。

    1. public static void main(String[] args) throws UnsupportedEncodingException {
    2. try {
    3. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    4. producer.setNamesrvAddr("192.0.168.107:9876");
    5. producer.start();
    6. for (int i = 0; i < 10; i++) {
    7. int orderId = i;
    8. for(int j = 0 ; j <= 5 ; j ++){
    9. Message msg =
    10. new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId,
    11. ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
    12. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    13. @Override
    14. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    15. Integer id = (Integer) arg;
    16. //对id取模,将同一批orderid的消息放入同一个MessageQueue
    17. int index = id % mqs.size();
    18. return mqs.get(index);
    19. }
    20. }, orderId);
    21. System.out.printf("%s%n", sendResult);
    22. }
    23. }
    24. producer.shutdown();
    25. } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
    26. e.printStackTrace();
    27. }
    28. }

    消费者消费消息

    消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个 Message中取一批数据(默认不超过32条)。因此也无法保证消息有序。

    1. public static void main(String[] args) throws MQClientException {
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
    3. consumer.setNamesrvAddr("192.0.168.107:9876");
    4. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    5. consumer.subscribe("OrderTopicTest", "*");
    6. //MessageListenerOrderly按队列获取
    7. consumer.registerMessageListener(new MessageListenerOrderly() {
    8. @Override
    9. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    10. context.setAutoCommit(true);
    11. for(MessageExt msg:msgs){
    12. System.out.println("收到消息内容 "+new String(msg.getBody()));
    13. }
    14. return ConsumeOrderlyStatus.SUCCESS;
    15. }
    16. });
    17. consumer.start();
    18. System.out.printf("Consumer Started.%n");
    19. }

    局部有序和全局有序

    局部有序,指的是在一定的维度内获取的消息是有顺序的,但不是连续有序;全部有序则是所有的消息获取的顺序和发送的顺序完全一样。

    广播消息

    广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态 (MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。

    1. public static void main(String[] args) throws InterruptedException, MQClientException {
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
    3. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    4. //消息发送模式BROADCASTING广播
    5. consumer.setMessageModel(MessageModel.BROADCASTING);
    6. consumer.subscribe("TopicTest", "*");
    7. consumer.registerMessageListener(new MessageListenerConcurrently() {
    8. @Override
    9. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    10. ConsumeConcurrentlyContext context) {
    11. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    13. }
    14. });
    15. consumer.start();
    16. System.out.printf("Broadcast Consumer Started.%n");
    17. }

    延迟消息

    根据设置的消息的延迟级别,在等待一定时间后才会发送出去。

    1. //总共设置了18个延迟级别,分别对应了指定的延迟时间
    2. //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    3. msg.setDelayTimeLevel(3);

    批量消息

    指的是将多条消息合并成一个批量消息,一次发送出去;其优点是可以减少网络IO,提升吞吐量,但一个批次消息的大小不要超过1MB 。实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。

    1. public static void main(String[] args) throws Exception {
    2. DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
    3. producer.start();
    4. String topic = "BatchTest";
    5. List<Message> messages = new ArrayList<>();
    6. messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
    7. messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
    8. messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
    9. producer.send(messages);
    10. producer.shutdown();
    11. }

    判断大小、批量发送

    1. public static void main(String[] args) throws Exception {
    2. DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
    3. producer.start();
    4. //large batch
    5. String topic = "BatchTest";
    6. List<Message> messages = new ArrayList<>(100 * 1000);
    7. for (int i = 0; i < 100 * 1000; i++) {
    8. messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
    9. }
    10. // producer.send(messages);
    11. //消息大小判断,是否进行拆分
    12. ListSplitter splitter = new ListSplitter(messages);
    13. while (splitter.hasNext()) {
    14. List<Message> listItem = splitter.next();
    15. producer.send(listItem);
    16. }
    17. producer.shutdown();
    18. }
    19. }
    20. class ListSplitter implements Iterator<List<Message>> {
    21. private int sizeLimit = 1000 * 1000;
    22. private final List<Message> messages;
    23. private int currIndex;
    24. public ListSplitter(List<Message> messages) {
    25. this.messages = messages;
    26. }
    27. @Override
    28. public boolean hasNext() {
    29. return currIndex < messages.size();
    30. }
    31. @Override
    32. public List<Message> next() {
    33. int nextIndex = currIndex;
    34. int totalSize = 0;
    35. for (; nextIndex < messages.size(); nextIndex++) {
    36. Message message = messages.get(nextIndex);
    37. //主题+消息体大小
    38. int tmpSize = message.getTopic().length() + message.getBody().length;
    39. Map<String, String> properties = message.getProperties();
    40. for (Map.Entry<String, String> entry : properties.entrySet()) {
    41. tmpSize += entry.getKey().length() + entry.getValue().length();
    42. }
    43. tmpSize = tmpSize + 20; //for log overhead
    44. if (tmpSize > sizeLimit) {
    45. //it is unexpected that single message exceeds the sizeLimit
    46. //here just let it go, otherwise it will block the splitting process
    47. if (nextIndex - currIndex == 0) {
    48. //if the next sublist has no element, add this one and then break, otherwise just break
    49. nextIndex++;
    50. }
    51. break;
    52. }
    53. if (tmpSize + totalSize > sizeLimit) {
    54. break;
    55. } else {
    56. totalSize += tmpSize;
    57. }
    58. }
    59. List<Message> subList = messages.subList(currIndex, nextIndex);
    60. currIndex = nextIndex;
    61. return subList;
    62. }
    63. @Override
    64. public void remove() {
    65. throw new UnsupportedOperationException("Not allowed to remove");
    66. }

    过滤消息

    一个应用使用一个topic,而根据不同的业务使用不同的tag。

    生产者

    1. String[] tags = new String[] {"TagA", "TagB", "TagC"};
    2. for (int i = 0; i < 15; i++) {
    3. Message msg = new Message("TagFilterTest",
    4. tags[i % tags.length],
    5. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    6. SendResult sendResult = producer.send(msg);
    7. System.out.printf("%s%n", sendResult);
    8. }

    消费者

    1. //只会接收消费设定的tag的消息
    2. consumer.subscribe("TagFilterTest", "TagA || TagC");

    这种处理是broker进行推送的,会根据consumer的设定,只推送其需要的类型的tag,tagB就不会往消费者推送,以减少网络IO

    缺点

    限制:就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了

    解决方法

    生产者设置额外条件

    msg.putUserProperty("a", String.valueOf(i));

    消费者使用类sql的方式进行过滤

    1. consumer.subscribe("SqlFilterTest",
    2. MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
    3. "and (a is not null and a between 0 and 3)"));

    这个模式的关键是在消费者端使用MessageSelector.bySql(String sql)返回的一个MessageSelector。这里面的sql语句是按照SQL92标准来执行的。sql中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性。

    SQL92语法:

    RocketMQ只定义了一些基本语法来支持这个特性。

    • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
    • 字符比较,比如:=,<>,IN;
    • IS NULL 或者 IS NOT NULL;
    • 逻辑符号 AND,OR,NOT;

    常量支持类型为

    • 数值,比如:123,3.1415;字符,比如:'abc',必须用单引号包裹起来;
    • NULL,特殊的常量
    • 布尔值,TRUE 或 FALSE

    注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。

    事务消息

    指的是是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。

    生产者

    1. public static void main(String[] args) throws MQClientException, InterruptedException {
    2. TransactionListener transactionListener = new TransactionListenerImpl();
    3. //使用事务提交相关的pruducer
    4. TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    5. producer.setNamesrvAddr("127.0.0.1:9876");
    6. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
    7. @Override
    8. public Thread newThread(Runnable r) {
    9. Thread thread = new Thread(r);
    10. thread.setName("client-transaction-msg-check-thread");
    11. return thread;
    12. }
    13. });
    14. producer.setExecutorService(executorService);
    15. //事务监听机制开启
    16. producer.setTransactionListener(transactionListener);
    17. producer.start();
    18. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    19. for (int i = 0; i < 10; i++) {
    20. try {
    21. Message msg =
    22. //轮询的机制发送消息、即每个tag两条
    23. new Message("TopicTest", tags[i % tags.length], "KEY" + i,
    24. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    25. SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    26. System.out.printf("%s%n", sendResult);
    27. Thread.sleep(10);
    28. } catch (MQClientException | UnsupportedEncodingException e) {
    29. e.printStackTrace();
    30. }
    31. }
    32. for (int i = 0; i < 100000; i++) {
    33. Thread.sleep(1000);
    34. }
    35. producer.shutdown();
    36. }

    事务监听

    1. public class TransactionListenerImpl implements TransactionListener {
    2. private AtomicInteger transactionIndex = new AtomicInteger(0);
    3. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    4. /**
    5. * 本地事务提交的方法
    6. * @param msg
    7. * @param arg
    8. * @return
    9. */
    10. @Override
    11. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    12. String tags = msg.getTags();
    13. //若tag是tagA,事务状态为commit,mq直接提交发送
    14. if(StringUtils.contains(tags,"TagA")){
    15. return LocalTransactionState.COMMIT_MESSAGE;
    16. //若tag是tagB,事务状态为rollback,mq直接提交丢弃
    17. }else if(StringUtils.contains(tags,"TagB")){
    18. return LocalTransactionState.ROLLBACK_MESSAGE;
    19. //若tag是其他tag类型,事务状态为unknow,mq直接提交丢弃
    20. }else{
    21. return LocalTransactionState.UNKNOW;
    22. }
    23. }
    24. @Override
    25. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    26. String tags = msg.getTags();
    27. if(StringUtils.contains(tags,"TagC")){
    28. return LocalTransactionState.COMMIT_MESSAGE;
    29. }else if(StringUtils.contains(tags,"TagD")){
    30. return LocalTransactionState.ROLLBACK_MESSAGE;
    31. }else{
    32. return LocalTransactionState.UNKNOW;
    33. }
    34. }
    35. }

    事务消息的限制

    1、事务消息不支持延迟消息和批量消息。

    2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的回查次数限制为15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。

    3、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。

    4、事务性消息可能不止一次被检查或消费。

    5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

    6、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

    作用

    事务消息只保证了发送者本地事务和发送消息这两个操作的原子性,但是并不保证消费者本地事务的原子性,所以,事务消息只保证了分布式事务的一半。但是即使这样,对于复杂的分布式事务,RocketMQ提供的事务消息也是目前业内最佳的降级方案。

    half消息其实也是一个普通的消息,但它是针对与生产者和ropic直接的,对于消费者是不可见的。 

  • 相关阅读:
    TallComponents PDFRasterizer.NET 4.0 Crack
    The Missing Semester - 第五讲 学习笔记(二)
    ChatGPT 升级出现「we are unable to authenticate」怎么办?
    C++ AVL树
    文件包含漏洞和hash破解
    GFS分布式文件系统
    允许访问:掌握权限的艺术
    阿里技术大牛耗时几个月整理出这份Spring Cloud Alibaba学习总结,值得学习呢
    简析低功耗蓝牙芯片PHY6222/PHY6252 蓝牙锁的应用
    TiniXml C++ 开源代码中的几个概念
  • 原文地址:https://blog.csdn.net/xq_adress/article/details/125610558