• 消息队列实现分布式事务


     业务流转图:

    搭建环境:activemq + springboot + mybatis + mysql

    1、下载activemq配置activemq配置信息(conf/activemq.xml):

    2、建表td_order_event,分别在每个服务对应的每个库创建一张临时流转表记录,这边演示创建两边一模一样的表);

    1. CREATE TABLE `td_order_event` (
    2. `id` tinyint(10) NOT NULL,
    3. `order_type` tinyint(10) DEFAULT NULL COMMENT '订单类型(0: 创建,1, 已下单,2,已支付 )',
    4. `process` varchar(255) DEFAULT NULL,
    5. `content` varchar(500) DEFAULT NULL,
    6. `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
    7. `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '订单中间事件表',
    8. PRIMARY KEY (`id`)
    9. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

    3、分别准备两个服务 、搭建环境导入相关依赖;

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-webartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>org.projectlombokgroupId>
    8. <artifactId>lombokartifactId>
    9. <version>1.18.12version>
    10. <scope>providedscope>
    11. dependency>
    12. <dependency>
    13. <groupId>com.alibabagroupId>
    14. <artifactId>fastjsonartifactId>
    15. <version>1.2.28version>
    16. dependency>
    17. <dependency>
    18. <groupId>mysqlgroupId>
    19. <artifactId>mysql-connector-javaartifactId>
    20. <version>5.1.6version>
    21. dependency>
    22. <dependency>
    23. <groupId>com.alibabagroupId>
    24. <artifactId>druidartifactId>
    25. <version>1.1.22version>
    26. dependency>
    27. <dependency>
    28. <groupId>com.alibabagroupId>
    29. <artifactId>druid-spring-boot-starterartifactId>
    30. <version>1.1.22version>
    31. dependency>
    32. <dependency>
    33. <groupId>org.springframework.bootgroupId>
    34. <artifactId>spring-boot-starter-activemqartifactId>
    35. dependency>
    36. <dependency>
    37. <groupId>org.apache.activemqgroupId>
    38. <artifactId>activemq-poolartifactId>
    39. dependency>
    40. <dependency>
    41. <groupId>org.mybatis.spring.bootgroupId>
    42. <artifactId>mybatis-spring-boot-starterartifactId>
    43. <version>2.0.0version>
    44. dependency>
    45. <dependency>
    46. <groupId>org.springframework.bootgroupId>
    47. <artifactId>spring-boot-starter-tomcatartifactId>
    48. <scope>providedscope>
    49. dependency>
    50. <dependency>
    51. <groupId>org.springframework.bootgroupId>
    52. <artifactId>spring-boot-starter-testartifactId>
    53. <scope>testscope>
    54. <exclusions>
    55. <exclusion>
    56. <groupId>org.junit.vintagegroupId>
    57. <artifactId>junit-vintage-engineartifactId>
    58. exclusion>
    59. exclusions>
    60. dependency>
    61. dependencies>

    4、配置生产端链接信息application.yml, 及代码编写;

    1. import org.apache.activemq.ActiveMQConnectionFactory;
    2. import org.apache.activemq.command.ActiveMQQueue;
    3. import org.springframework.beans.factory.annotation.Value;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. import javax.jms.Queue;
    7. @Configuration
    8. public class ActiveConfig {
    9. @Value("${spring.activemq.broker-url}")
    10. private String brokerUrl;
    11. @Bean
    12. public Queue queue() {
    13. return new ActiveMQQueue("ActiveMQQueue");
    14. }
    15. @Bean
    16. public ActiveMQConnectionFactory connectionFactory() {
    17. return new ActiveMQConnectionFactory(brokerUrl);
    18. }
    19. }
    1. @SpringBootApplication
    2. @MapperScan(value = "com.xxx.serviceorder.dao")
    3. @EnableJms
    4. @EnableScheduling
    5. public class ServiceOrderApplication {
    6. public static void main(String[] args) {
    7. SpringApplication.run(ServiceOrderApplication.class, args);
    8. }
    9. }

    4.2 编写sql语句, 分别一个查询语句,更新数据;

     4.3 编写定时任务,监听数据;

    1. import com.alibaba.fastjson.JSONObject;
    2. import com.xckk.serviceorder.dao.TdOrderEventDao;
    3. import com.xckk.serviceorder.entity.TdOrderEvent;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.jms.core.JmsMessagingTemplate;
    6. import org.springframework.scheduling.annotation.Scheduled;
    7. import org.springframework.stereotype.Component;
    8. import org.springframework.transaction.annotation.Transactional;
    9. import javax.jms.Queue;
    10. import java.util.Date;
    11. import java.util.List;
    12. @Component
    13. public class Produce {
    14. @Autowired
    15. private TdOrderEventDao tdOrderEventDao;
    16. @Autowired
    17. private Queue queue;
    18. @Autowired
    19. private JmsMessagingTemplate jmsMessagingTemplate;
    20. @Scheduled(cron = "0/5 * * * * ?")
    21. @Transactional(rollbackFor = Exception.class)
    22. public void task() {
    23. System.out.println(new Date() +"【开始执行】");
    24. // 查询新建的中间表
    25. List tdOrderEvents = tdOrderEventDao.selectOrderEventByType("0");
    26. for (TdOrderEvent tdOrderEvent : tdOrderEvents) {
    27. tdOrderEventDao.updateOrderEventById(tdOrderEvent.getId());
    28. System.out.println(tdOrderEvent.getId() + "数据修改成功");
    29. //
    30. jmsMessagingTemplate.convertAndSend(queue, JSONObject.toJSONString(tdOrderEvent));
    31. }
    32. }
    33. }

    5、编写消费端代码;

     sql语句,利用主键id,确保消息重复;

     5.1 配置mqbean信息

    1. import org.apache.activemq.ActiveMQConnectionFactory;
    2. import org.apache.activemq.RedeliveryPolicy;
    3. import org.springframework.beans.factory.annotation.Value;
    4. import org.springframework.context.annotation.Bean;
    5. import org.springframework.context.annotation.Configuration;
    6. import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    7. import org.springframework.jms.config.JmsListenerContainerFactory;
    8. @Configuration
    9. public class ActiveConfig {
    10. @Value("${spring.activemq.broker-url}")
    11. private String brokerUrl;
    12. @Value("${spring.activemq.user}")
    13. private String userName;
    14. @Value("${spring.activemq.password}")
    15. private String passWord;
    16. @Bean
    17. public ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy) {
    18. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);
    19. connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    20. return connectionFactory;
    21. }
    22. /**
    23. * 重发配置
    24. * @return
    25. */
    26. @Bean
    27. public RedeliveryPolicy redeliveryPolicy() {
    28. RedeliveryPolicy policy = new RedeliveryPolicy();
    29. return policy;
    30. }
    31. @Bean
    32. public JmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
    33. DefaultJmsListenerContainerFactory containerFactory = new DefaultJmsListenerContainerFactory();
    34. containerFactory.setConnectionFactory(activeMQConnectionFactory);
    35. // 1: 自动确认 2:客户端手动确认 3:自动批量确认 4:事务提交并确认
    36. containerFactory.setSessionAcknowledgeMode(2);
    37. return containerFactory;
    38. }
    39. }

    业务处理 :处理失败测重试六次,六次都失败则加入死信队列处理;

    1. import com.alibaba.fastjson.JSONObject;
    2. import com.xckk.servicepay.dao.TdOrderEventDao;
    3. import com.xckk.servicepay.entity.TdOrderEvent;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.jms.annotation.JmsListener;
    6. import org.springframework.stereotype.Component;
    7. import javax.jms.JMSException;
    8. import javax.jms.Session;
    9. import javax.jms.TextMessage;
    10. @Component
    11. public class ConsumerQueue {
    12. @Autowired
    13. private TdOrderEventDao tdOrderEventDao;
    14. @JmsListener(destination = "ActiveMQQueue", containerFactory = "jmsListenerContainerFactory")
    15. public void receive(TextMessage textMessage, Session session) throws JMSException {
    16. System.out.println(" 消费的消息:"+textMessage.getText());
    17. try {
    18. String text = textMessage.getText();
    19. TdOrderEvent tdOrderEvent = JSONObject.toJavaObject(JSONObject.parseObject(text), TdOrderEvent.class);
    20. tdOrderEventDao.insert(tdOrderEvent);
    21. textMessage.acknowledge();
    22. } catch (Exception e) {
    23. e.printStackTrace();
    24. System.out.println("ActiveMQQueue>>> 异常!!!!");
    25. session.recover();
    26. }
    27. }
    28. /**
    29. * 死信队列
    30. *
    31. * @param text
    32. */
    33. @JmsListener(destination = "DLQ.ActiveMQQueue")
    34. public void receive(String text) {
    35. System.out.println("处理失败的数据!!!" + text);
    36. }
    37. }

    验证:

    1、启动mq 

    2、启动两端服务

    3、插入一条测试数据

     

    数据处理完毕;

    4、测试异常信息(比如mq挂掉、消息重复等等)

  • 相关阅读:
    Flutter有状态组件StatefulWidget生命周期
    社区街道治安智慧监管方案,AI算法赋能城市基层精细化治理
    合肥综合性国家科学中心人工智能研究院-机器学习作业(一)
    认知负担的挑战与平台工程的机遇
    Flutter SQLite 教程之笔记App 数据存储CRUD操作基于 Flutter Sqflite 插件
    公安机关警务vr综合实战模拟训练提高团队合作能力
    计算机基础(1)——Verilog语法入门
    Hadoop Hbase
    【Java】使用Java实现爬虫
    Rocket MQ Crash-Safe机制浅析
  • 原文地址:https://blog.csdn.net/qq_42878086/article/details/127036339