MQ:MessageQueue,消息队列,是一种FIFO 先进先出的数据结构。消息由生产者发送到MQ进行排队,然后按原来的顺序交由消息的消费者进行处理。QQ和微信就是典型的MQ。
异步能提高系统的响应速度、吞吐量。
例子:快递员发快递,直接到客户家效率会很低。引入菜鸟驿站后,快递员只需要把快递放到菜鸟驿站,就可以继续发其他快递去了。客户再按自己的时间安排去菜鸟驿站取快递。
1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。
2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消
费,并且消费者的增加或者减少对生产者没有影响。
例子:《Thinking in JAVA》很经典,但是都是英文,我们看不懂,所以需要编辑社,将文章翻译成其他语言,这样就可以完成英语与其他语言的交流。
以稳定的系统资源应对突发的流量冲击。
例子:长江每年都会涨水,但是下游出水口的速度是基本稳定的,所以会涨水。引入三峡大坝后,可以把水储存起来,下游慢慢排水。
系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就需要考虑如何保证MQ的高可用。
引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保证消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问题。
A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统处理成功,C系统处理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。
常用的MQ产品包括Kafka、RabbitMQ和RocketMQ
MQ类别 | 优点 | 缺点 | 应用场景 |
Kafka | 吞吐量非常大、性能非常好、集群高可用 | 会丢失数据,功能单一 | 日志分析、大数据采集 |
RabbitMQ | 消息可靠性高 | 吞吐量比较低,会累积消息 | 小规模场景 |
RocketMQ | 高吞吐、高性能、高可用 | 开源版本不如云上版本、客户端只支持java | 几乎适用所有场景 |
RocketMQ的官网地址: http://rocketmq.apache.org
github地址:https://github.com/apache/rocketmq ,当前最新的版本是4.7.1。我们就用这个4.7.1版本来进行学习
RocketMQ运行版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
RocketMQ源码版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip
NameServer : 提供轻量级的Broker路由服务。
Broker:实际处理消息存储、转发等服务的核心组件。
Producer:消息生产者集群。通常是业务系统中的一个功能模块。
Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。
所以我们要启动RocketMQ服务,需要先启动NameServer。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
- //初始化一个消费者实例 并指定consumerGroup
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
-
- //设置nameServer,以便指向Rocket集群
- consumer.setNamesrvAddr("192.0.168.107:9876");
-
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-
- //指定topic和
- consumer.subscribe("TopicTest", "*");
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- consumer.start();
-
- System.out.printf("Consumer Started.%n");
- }
RocketMQ提供的生产者和消费者指定NameServer的方式:
1、在代码中指定namesrvAddr属性。例如:consumer.setNamesrvAddr("127.0.0.1:9876");
2、通过NAMESRV_ADDR环境变量来指定。多个NameServer之间用分号连接。
- DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
-
-
- producer.setNamesrvAddr("192.0.168.107:9876");
- producer.start();
-
- for (int i = 0; i < 2; i++) {
- try {
-
- Message msg = new Message("TopicTest" /* Topic */,
- "TagA" /* Tag */,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
- );
- msg.setDelayTimeLevel(3);
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000);
- }
- }
- producer.shutdown();
是默认消息发送的一种方式,是其他消息发送的基础,指的是直接向mq发送消息,不管发送状态,也没有回调处理,就直接继续往下执行
- public static void main(String[] args) throws MQClientException, InterruptedException {
-
-
- DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
-
-
- producer.setNamesrvAddr("192.0.168.107:9876");
- producer.start();
-
- for (int i = 0; i < 2; i++) {
- try {
-
- Message msg = new Message("TopicTest" /* Topic */,
- "TagA" /* Tag */,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
- );
- msg.setDelayTimeLevel(3);
- producer.sendOneway(msg);
-
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000);
- }
- }
- producer.shutdown();
- }
指的是生产者向mq发送消息后,需要等待响应,根据返回的结果判断消息发送的状态,来决定失败后是否需要重试或执行其他处理。
- public static void main(String[] args) throws MQClientException, InterruptedException {
-
-
- DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
-
-
- producer.setNamesrvAddr("192.0.168.107:9876");
- producer.start();
-
- for (int i = 0; i < 2; i++) {
- try {
-
- Message msg = new Message("TopicTest" /* Topic */,
- "TagA" /* Tag */,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
- );
- msg.setDelayTimeLevel(3);
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000);
- }
- }
- producer.shutdown();
- }
- public class SendResult {
- //记录消息发送状态
- private SendStatus sendStatus;
- private String msgId;
- private MessageQueue messageQueue;
- private long queueOffset;
- private String transactionId;
- private String offsetMsgId;
- private String regionId;
- private boolean traceOn = true;
-
- public SendResult() {
- }
指的是生产者向mq发送消息后不等待响应直接就继续往下执行,但会提供一个回调方法给mq让其在收到消息后调用。
- public static void main(
- String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
-
- DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
- producer.setNamesrvAddr("192.0.168.107:9876");
- producer.start();
- //重试次数
- producer.setRetryTimesWhenSendAsyncFailed(0);
-
- int messageCount = 100;
- //保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
- final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
- for (int i = 0; i < messageCount; i++) {
- try {
- final int index = i;
- Message msg = new Message("TopicTest",
- "TagA",
- "OrderID188",
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
- //回调方法
- producer.send(msg, new SendCallback() {
- //发送成功的回调
- @Override
- public void onSuccess(SendResult sendResult) {
- countDownLatch.countDown();
- System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
- }
- //发送失败或异常的回调
- @Override
- public void onException(Throwable e) {
- countDownLatch.countDown();
- System.out.printf("%-10d Exception %s %n", index, e);
- e.printStackTrace();
- }
- });
- System.out.println("消息发送完成");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- countDownLatch.await(5, TimeUnit.SECONDS);
- producer.shutdown();
- }
三种发送方式的区别
单向:不需要管发送状态是否成功,吞吐量大,最快,容易丢数据。
同步:最慢,安全性最好。
异步:速度处于两种之间,但会丢数据。
主动拉取
过期版
- public static void main(String[] args) throws MQClientException {
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
- consumer.setNamesrvAddr("192.0.168.107:9876");
- consumer.start();
- //MessageQueue是订阅发布消息的最小单位,会均匀的分布在集群中的各个节点
- Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
- for (MessageQueue mq : mqs) {
- System.out.printf("Consume from the queue: %s%n", mq);
- SINGLE_MQ:
- while (true) {
- try {
- PullResult pullResult =
- //subExpression 表示的就是tag的过滤类型
- // offset偏移量,记录消费消息的位置,避免重复消费
- //maxNums 一次拉取的消息数量
- consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
- System.out.printf("%s%n", pullResult);
- putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
- switch (pullResult.getPullStatus()) {
- case FOUND:
- break;
- case NO_MATCHED_MSG:
- break;
- case NO_NEW_MSG:
- break SINGLE_MQ;
- case OFFSET_ILLEGAL:
- break;
- default:
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- consumer.shutdown();
- }
-
- private static long getMessageQueueOffset(MessageQueue mq) {
- Long offset = OFFSE_TABLE.get(mq);
- if (offset != null)
- return offset;
-
- return 0;
- }
-
- private static void putMessageQueueOffset(MessageQueue mq, long offset) {
- OFFSE_TABLE.put(mq, offset);
- }
非过期版(基础模式)
- public static volatile boolean running = true;
-
- public static void main(String[] args) throws Exception {
- DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
- litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- litePullConsumer.subscribe("TopicTest", "*");
- litePullConsumer.start();
- try {
- while (running) {
- List<MessageExt> messageExts = litePullConsumer.poll();
- System.out.printf("%s%n", messageExts);
- }
- } finally {
- litePullConsumer.shutdown();
- }
- }
灵活版(自由管理偏移位置)
- public static volatile boolean running = true;
-
- public static void main(String[] args) throws Exception {
- DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
- litePullConsumer.setAutoCommit(false);
- litePullConsumer.start();
- Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
- List<MessageQueue> list = new ArrayList<>(mqSet);
- List<MessageQueue> assignList = new ArrayList<>();
- for (int i = 0; i < list.size() / 2; i++) {
- assignList.add(list.get(i));
- }
- //存下队列信息
- litePullConsumer.assign(assignList);
- //设置offset
- litePullConsumer.seek(assignList.get(0), 10);
- try {
- while (running) {
- List<MessageExt> messageExts = litePullConsumer.poll();
- System.out.printf("%s %n", messageExts);
- litePullConsumer.commitSync();
- }
- } finally {
- litePullConsumer.shutdown();
- }
-
- }
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
- consumer.setNamesrvAddr("192.0.168.107:9876");
- //设置topic和tag进行过滤
- consumer.subscribe("TopicTest", "*");
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.setConsumeTimestamp("20181109221800");
-
-
- //定义监听器,等待Broker推送过来
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.printf("Consumer Started.%n");
- }
- public enum ConsumeConcurrentlyStatus {
- CONSUME_SUCCESS,
- RECONSUME_LATER;
-
- private ConsumeConcurrentlyStatus() {
- }
- }
在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。
- public static void main(String[] args) throws UnsupportedEncodingException {
- try {
- DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
- producer.setNamesrvAddr("192.0.168.107:9876");
- producer.start();
-
- for (int i = 0; i < 10; i++) {
- int orderId = i;
-
- for(int j = 0 ; j <= 5 ; j ++){
- Message msg =
- new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId,
- ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- Integer id = (Integer) arg;
- //对id取模,将同一批orderid的消息放入同一个MessageQueue
- int index = id % mqs.size();
- return mqs.get(index);
- }
- }, orderId);
-
- System.out.printf("%s%n", sendResult);
- }
- }
-
- producer.shutdown();
- } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
- e.printStackTrace();
- }
- }
消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个 Message中取一批数据(默认不超过32条)。因此也无法保证消息有序。
- public static void main(String[] args) throws MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
- consumer.setNamesrvAddr("192.0.168.107:9876");
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-
- consumer.subscribe("OrderTopicTest", "*");
- //MessageListenerOrderly按队列获取
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- context.setAutoCommit(true);
- for(MessageExt msg:msgs){
- System.out.println("收到消息内容 "+new String(msg.getBody()));
- }
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- consumer.start();
- System.out.printf("Consumer Started.%n");
- }
局部有序,指的是在一定的维度内获取的消息是有顺序的,但不是连续有序;全部有序则是所有的消息获取的顺序和发送的顺序完全一样。
广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态 (MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
-
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- //消息发送模式BROADCASTING广播
- consumer.setMessageModel(MessageModel.BROADCASTING);
-
- consumer.subscribe("TopicTest", "*");
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- consumer.start();
- System.out.printf("Broadcast Consumer Started.%n");
- }
根据设置的消息的延迟级别,在等待一定时间后才会发送出去。
- //总共设置了18个延迟级别,分别对应了指定的延迟时间
- //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- msg.setDelayTimeLevel(3);
指的是将多条消息合并成一个批量消息,一次发送出去;其优点是可以减少网络IO,提升吞吐量,但一个批次消息的大小不要超过1MB 。实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
- producer.start();
- String topic = "BatchTest";
- List<Message> messages = new ArrayList<>();
- messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
- messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
- messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
-
- producer.send(messages);
- producer.shutdown();
- }
判断大小、批量发送
- public static void main(String[] args) throws Exception {
-
- DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
- producer.start();
-
- //large batch
- String topic = "BatchTest";
- List<Message> messages = new ArrayList<>(100 * 1000);
- for (int i = 0; i < 100 * 1000; i++) {
- messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
- }
- // producer.send(messages);
-
- //消息大小判断,是否进行拆分
- ListSplitter splitter = new ListSplitter(messages);
- while (splitter.hasNext()) {
- List<Message> listItem = splitter.next();
- producer.send(listItem);
- }
- producer.shutdown();
- }
-
- }
-
- class ListSplitter implements Iterator<List<Message>> {
- private int sizeLimit = 1000 * 1000;
- private final List<Message> messages;
- private int currIndex;
-
- public ListSplitter(List<Message> messages) {
- this.messages = messages;
- }
-
- @Override
- public boolean hasNext() {
- return currIndex < messages.size();
- }
-
- @Override
- public List<Message> next() {
- int nextIndex = currIndex;
- int totalSize = 0;
- for (; nextIndex < messages.size(); nextIndex++) {
- Message message = messages.get(nextIndex);
- //主题+消息体大小
- int tmpSize = message.getTopic().length() + message.getBody().length;
- Map<String, String> properties = message.getProperties();
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- tmpSize += entry.getKey().length() + entry.getValue().length();
- }
- tmpSize = tmpSize + 20; //for log overhead
- if (tmpSize > sizeLimit) {
- //it is unexpected that single message exceeds the sizeLimit
- //here just let it go, otherwise it will block the splitting process
- if (nextIndex - currIndex == 0) {
- //if the next sublist has no element, add this one and then break, otherwise just break
- nextIndex++;
- }
- break;
- }
- if (tmpSize + totalSize > sizeLimit) {
- break;
- } else {
- totalSize += tmpSize;
- }
-
- }
- List<Message> subList = messages.subList(currIndex, nextIndex);
- currIndex = nextIndex;
- return subList;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Not allowed to remove");
- }
一个应用使用一个topic,而根据不同的业务使用不同的tag。
生产者
- String[] tags = new String[] {"TagA", "TagB", "TagC"};
-
- for (int i = 0; i < 15; i++) {
- Message msg = new Message("TagFilterTest",
- tags[i % tags.length],
- "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
-
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- }
消费者
- //只会接收消费设定的tag的消息
- consumer.subscribe("TagFilterTest", "TagA || TagC");
这种处理是broker进行推送的,会根据consumer的设定,只推送其需要的类型的tag,tagB就不会往消费者推送,以减少网络IO
缺点
限制:就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了
解决方法
生产者设置额外条件
msg.putUserProperty("a", String.valueOf(i));
消费者使用类sql的方式进行过滤
- consumer.subscribe("SqlFilterTest",
- MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
- "and (a is not null and a between 0 and 3)"));
这个模式的关键是在消费者端使用MessageSelector.bySql(String sql)返回的一个MessageSelector。这里面的sql语句是按照SQL92标准来执行的。sql中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性。
SQL92语法:
RocketMQ只定义了一些基本语法来支持这个特性。
常量支持类型为
注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
指的是是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。
生产者
- public static void main(String[] args) throws MQClientException, InterruptedException {
- TransactionListener transactionListener = new TransactionListenerImpl();
- //使用事务提交相关的pruducer
- TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
- producer.setNamesrvAddr("127.0.0.1:9876");
- ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setName("client-transaction-msg-check-thread");
- return thread;
- }
- });
-
- producer.setExecutorService(executorService);
- //事务监听机制开启
- producer.setTransactionListener(transactionListener);
- producer.start();
-
- String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
- for (int i = 0; i < 10; i++) {
- try {
- Message msg =
- //轮询的机制发送消息、即每个tag两条
- new Message("TopicTest", tags[i % tags.length], "KEY" + i,
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.sendMessageInTransaction(msg, null);
- System.out.printf("%s%n", sendResult);
-
- Thread.sleep(10);
- } catch (MQClientException | UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
-
- for (int i = 0; i < 100000; i++) {
- Thread.sleep(1000);
- }
- producer.shutdown();
- }
事务监听
- public class TransactionListenerImpl implements TransactionListener {
- private AtomicInteger transactionIndex = new AtomicInteger(0);
-
- private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
-
- /**
- * 本地事务提交的方法
- * @param msg
- * @param arg
- * @return
- */
- @Override
- public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- String tags = msg.getTags();
- //若tag是tagA,事务状态为commit,mq直接提交发送
- if(StringUtils.contains(tags,"TagA")){
- return LocalTransactionState.COMMIT_MESSAGE;
- //若tag是tagB,事务状态为rollback,mq直接提交丢弃
- }else if(StringUtils.contains(tags,"TagB")){
- return LocalTransactionState.ROLLBACK_MESSAGE;
- //若tag是其他tag类型,事务状态为unknow,mq直接提交丢弃
- }else{
- return LocalTransactionState.UNKNOW;
- }
- }
-
- @Override
- public LocalTransactionState checkLocalTransaction(MessageExt msg) {
- String tags = msg.getTags();
- if(StringUtils.contains(tags,"TagC")){
- return LocalTransactionState.COMMIT_MESSAGE;
- }else if(StringUtils.contains(tags,"TagD")){
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }else{
- return LocalTransactionState.UNKNOW;
- }
- }
- }
1、事务消息不支持延迟消息和批量消息。
2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的回查次数限制为15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
3、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
4、事务性消息可能不止一次被检查或消费。
5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
作用
事务消息只保证了发送者本地事务和发送消息这两个操作的原子性,但是并不保证消费者本地事务的原子性,所以,事务消息只保证了分布式事务的一半。但是即使这样,对于复杂的分布式事务,RocketMQ提供的事务消息也是目前业内最佳的降级方案。
half消息其实也是一个普通的消息,但它是针对与生产者和ropic直接的,对于消费者是不可见的。