• 【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息


    您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
    💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
    😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
    ❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
    ❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
    ❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门


    本文将主要介绍在SpringBoot项目中如何集成RocketMQ以实现普通消息和事务消息的。

    首先是分别创建生产者的springboot项目 springboot-rocketmq-producer,创建消费者的springboot项目 springboot-rocketmq-consumer。

    1. 引入依赖

    本例中使用的RocketMQ的版本是 5.1.3。所以引入的 rocketmq-spring-boot 版本要与之匹配。

    可以通过mvnrepository进行查看。https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot/2.2.2

    <dependency>
        <groupId>org.apache.rocketmqgroupId>
        <artifactId>rocketmq-spring-bootartifactId>
        <version>2.2.2version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. 配置文件修改

    在springboot-rocketmq-producer项目的application.yml文件中添加如下配置:

    rocketmq:
      name-server: 172.31.184.89:9876
      producer:
        group: feige-producer-group
      consumer:
        topic: my-spring-boot-topic
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在springboot-rocketmq-consumer项目的application.yml文件中添加如下配置:

    server:
      port: 8080
    rocketmq:
      name-server: 172.31.184.89:9876
      consumer:
        group: feige-consumer-group
        topic: my-spring-boot-topic
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3. 实现生产者

    定义一个生产者类MyProducer,在该类中引入RocketMQTemplate 操作类,然后定义发送消息的方法sendMessage,在此方法中调用 rocketMQTemplate.convertAndSend 方法进行消息发送。

    @Component
    public class MyProducer {
    	@Autowired
    	private RocketMQTemplate rocketMQTemplate;
    
    	/**
    	 * 发送普通消息
    	 *
    	 * @param topic   主题
    	 * @param message 消息
    	 */
    	public void sendMessage(String topic, String message) {
    		rocketMQTemplate.convertAndSend(topic, message);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    3.1. 编写生产者单元测试
    @Autowired
    	private MyProducer myProducer;
    
    	@Value("${rocketmq.consumer.topic:}")
    	private String consumerTopic;
    
    	@Test
    	void sendMessage() {
    		myProducer.sendMessage(consumerTopic,"飞哥SpringBoot集成RocketMQ消息测试");
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    4.实现消费者

    定义消费者类MyConsumer。此类实现了RocketMQListener接口并重写了onMessage方法用于接收broker推送过来的消息。

    @Component
    @RocketMQMessageListener(topic = "${rocketmq.consumer.topic:}", consumerGroup = "generalConsumerGroup")
    public class MyConsumer implements RocketMQListener<String> {
    
    	@Override
    	public void onMessage(String s) {
    		System.out.println("收到的消息是=" + s);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    5. 实现事务消息

    在SpringBoot中实现RocketMQ的事务消息,整体思路与 【RocketMQ系列六】RocketMQ事务消息 文中提到的思路相同。

    5.1. 实现事务消息的生产者

    在前面创建的MyProducer类中添加实现事务消息的方法 sendTransactionMessage。

    /**
    	 * 发送事务消息
    	 *
    	 * @param topic 话题
    	 * @param msg   消息
    	 */
    	public void sendTransactionMessage(String topic, String msg) throws InterruptedException {
    		String[] tags = {"tagA", "tagB", "tagC", "tagD", "tagE"};
    		for (int i = 0; i < 10; i++) {
    			// 2. 将topic和tag整合在一起,以:分割,
    			String destination = topic + ":" + tags[i % tags.length];
    				// 1.注意该message是org.springframework.messaging.Message
    			Message<String> message = MessageBuilder.withPayload(msg + "_" + tags[i % tags.length] + "_" + i)
    					.setHeader("destination", destination).build();
    			// 第一个参数是发布的目的地,第二个参数是消息,第三个参数是额外的参数
    			rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
    
    			Thread.sleep(10);
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    这里需要注意的是传入的Message类是org.springframework.messaging.Message ,不是RocketMQ的Message。

    5.2. 实现本地事务消息

    接着在定义生产者本地事务实现类 MyTransactionListener,该类实现了RocketMQLocalTransactionListener接口,并重写了executeLocalTransaction方法和checkLocalTransaction方法。这里多了一步就是将 org.springframework.messaging.Message 转成 org.apache.rocketmq.common.message.Message

    @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
    public class MyTransactionListener implements RocketMQLocalTransactionListener {
    	@Override
    	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    		// 将消息转成rocketmq下的message
    		org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "utf-8", (String) arg, msg);
    		String tags = message.getTags();
    		if (tags.equals("tagA")) {
    			return RocketMQLocalTransactionState.COMMIT;
    		} else if (tags.equals("tagB")) {
    			return RocketMQLocalTransactionState.ROLLBACK;
    		}
    		return RocketMQLocalTransactionState.UNKNOWN;
    	}
    
    @Override
    	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    		// 将消息转成rocketmq下的message
    		String destination = (String) msg.getHeaders().get("destination");
    		org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),
    				"utf-8",destination, msg);
    		String tags = message.getTags();
    		if (tags.equals("tagC")) {
    			return RocketMQLocalTransactionState.COMMIT;
    		} else if (tags.equals("tagD")) {
    			return RocketMQLocalTransactionState.ROLLBACK;
    		}
    		return RocketMQLocalTransactionState.UNKNOWN;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

  • 相关阅读:
    超级简单学习Shiro会话管理
    LetCode之热题100.1——哈希(两数之和)
    【活动系列】那些年写的比较愚蠢的代码
    PMP考试可以延缓考吗?解答来了!
    简单聊聊大数据
    力扣:1175. 质数排列
    【安全框架】快速了解安全框架
    PPT设置“只读模式”的两种方法
    微信小程序如何跳转页面
    java - 包装类
  • 原文地址:https://blog.csdn.net/u014534808/article/details/133828858