使用Docker的方式创建并且配置集群,可以参考docker简易搭建RabbitMQ集群
需要服务器上先安装好Docker,可以看看这篇Docker+Centos7安装+卸载
先拉取镜像:
这个镜像已经开启了页面管理插件,不需要我们再去使用开启插件了。
docker pull rabbitmq:3.7-management
mkdir /rabbitmqcluster
cd /rabbitmqcluster/
mkdir rabbitmq01 rabbitmq02 rabbitmq03
创建并且启动主节点 rabbitmq1
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' -v /rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq --hostname myRabbit1 --name rabbitmq1 rabbitmq:3.7-management
rabbitmq1设置为主节点,并且通过rabbitmq1的管理页面来管理整个集群。设置账号密码为admin
每个节点都要设置erlang的cookie,并且每个节点的erlang.cookie中的值是一样的才行。
创建启动rabbitmq2,从节点。–link连接 rabbitmq1的host
docker run -d -p 5673:5672 --link rabbitmq1:myRabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' -v /rabbitmqcluster/rabbitmq02:/var/lib/rabbitmq --hostname myRabbit2 --name rabbitmq2 rabbitmq:3.7-management
创建启动rabbitmq3,从节点。–link连接 rabbitmq1的host和 rabbitmq2的host
docker run -d -p 5674:5672 --link rabbitmq1:myRabbit1 --link rabbitmq2:myRabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' -v /rabbitmqcluster/rabbitmq03:/var/lib/rabbitmq --hostname myRabbit3 --name rabbitmq3 rabbitmq:3.7-management
rabbitmq1
docker exec -it rabbitmq1 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
rabbitmq2
-join_cluster: 加入到集群中,作为 hostname为myRabbit1的内存节点。
docker exec -it rabbitmq2 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@myRabbit1
rabbitmqctl start_app
exit
–ram : 参数“–ram”表示设置为内存节点,忽略次参数默认为磁盘节点。 rabbit是默认写死。myRabbit1是rabbitmq1的hostname值。
rabbitmq3
docker exec -it rabbitmq3 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@myRabbit1
rabbitmqctl start_app
exit
我们只开启了 rabbitmq1 的管理页面,只开放了 15672 端口,访问如下:
http://192.168.1.18:15672
账号:admin
密码:admin

可看到,集群中一共有3个节点。以及它们的内存占用空间,基本信息。
简述:
分布式事务是指事务的操作位于不同的节点上,需要保证事务的ACID特性。例如购物系统,订单和库存位于不同的节点上,假如减库存操作成功了,但是订单创建失败了(网络或者某种原因下),那么我们的分布式事务就需要确保减库存的操作进行回滚,保证数据的最终一致性。
总体来讲就是:独立的系统,独立的服务,独立的JVM,独立的数据库如何保证数据的一致性,就是指分布式事务。
方式:
MQ:第三方的MQ是支持事务消息的,它们实现的方式也是类似于采用二阶段提交(2PC)。如使用 RabbitMQ案例大致流程图

图知:订单服务和配送服务访问的不是同一个数据库,那么如何保证数据的一致性呢?
先把案例需要的环境及配置准备好。
数据库表
订单服务表

派送服务表

创建两个springboot项目,mvn依赖如下:
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-configuration-processorartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
dependency>
<dependency>
<groupId>org.apache.commonsgroupId>
<artifactId>commons-lang3artifactId>
<version>3.6version>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-jdbcartifactId>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformatgroupId>
<artifactId>jackson-dataformat-avroartifactId>
dependency>
dependencies>
application.yml
server:
port: 9000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/dispatcher?useUnicode=true&characterEncoding=utf-8
username: root
password: 134520
rabbitmq:
username: admin
password: admin
virtual-host: /
# 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了
addresses: 192.168.1.18:5672,192.168.1.18:5673,192.168.1.18:5674
DispatcherService.java实现
@Service
@Transactional
public class DispatcherService {
@Autowired
private JdbcTemplate jdbcTemplate;
public void dispatch(String orderId) throws Exception {
//定义sql
String sql = "insert into dispatcher(order_id,dispatcher_id,status,order_content,user_id,create_time) values (?,?,?,?,?,?)";
//添加记录
int count = jdbcTemplate.update(sql, orderId, UUID.randomUUID().toString(), 0, "测试数据", 1, new Date());
if (count != 1) {
throw new Exception("订单修改失败");
}
}
}
DispatcherController.java实现,配送接口模拟超时场景。
@RestController
public class DispatcherController {
@Autowired
private DispatcherService dispatchService;
@GetMapping("/dispatch")
public String dispatcher(String orderId) throws Exception {
dispatchService.dispatch(orderId);
//模拟超时 导致调用方请求失败
Thread.sleep(5000);
return "success";
}
}
application.yml
server:
port: 8089
spring:
datasource:
url: jdbc:mysql://localhost:3306/dispatcher_order?useTimezone=true&serverTimezone=GMT%2B8&characterEncoding=utf-8
username: root
password: 134520
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
username: admin
password: admin
virtual-host: /
# 集群连接方式
addresses: 192.168.1.18:5672,192.168.1.18:5673,192.168.1.18:5674
OrderDataService.java
@Transactional(rollbackFor = Exception.class)
@Service
public class OrderDataService {
@Autowired
private JdbcTemplate jdbcTemplate;
public void saveOrder(Order order) throws Exception {
String sql = "insert into `order`(order_id,user_id,order_content) values (?,?,?)";
int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());
if (count != 1) {
throw new Exception("订单创建失败。");
}
}
}
OrderService.java
@Service
public class OrderService {
@Autowired
private OrderDataService orderDataService;
@Transactional(rollbackFor = Exception.class)
public void createOrder(Order order) throws Exception {
orderDataService.saveOrder(order);
String result = dispatcherHttpApi(order.getOrderId());
if (!result.equals("success")) {
throw new Exception("远程调用失败。");
}
}
private String dispatcherHttpApi(String orderId) {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
//连接超时>3
factory.setConnectTimeout(3000);
//处理超时>2
factory.setReadTimeout(2000);
String url = "http://localhost:9000/dispatch?orderId=" + orderId;
RestTemplate restTemplate = new RestTemplate(factory);
return restTemplate.getForObject(url, String.class);
}
}
测试类
@SpringBootTest
class SpringbootOrderRabbitmqOrderApplicationTests {
@Autowired
private OrderService orderService;
@Test
void createOrder() throws Exception {
String orderId = "10001";
Order order = new Order();
order.setOrderId(orderId);
order.setUserId(1);
order.setOrderContent("汉堡");
orderService.createOrder(order);
System.out.println("创建成功!");
}
}
经过测试后发现:

订单服务在创建完订单后调用了配送服务,但是调用配送服务时超时了并且报了异常,然后订单服务进行数据回滚了。配送服务是先把配送数据给新增成功了,但是响应时间过长了。就导致订单服务中没有数据,配送服务反而多了一条数据。当然这个例子可能不是特别合理,一般情况下,并发量过高的系统中我们会去使用消息中间件异步执行,而不是直接去调用服务。
那么如何能让两个数据库的数据回滚呢?
如果让两边的数据同时都回滚,这是不可能的事,两个数据库实例都是独立的,无论是开启事务还是提交事务都只能影响到一个数据库。那么我们可以通过一些别的手段来实现想要的效果。
基于MQ的分布式事务整体设计思路。为了处理分布式事务问题,我们可以引入消息队列来实现相关的功能。
主要是分为两部分:
如果做到可靠生产呢?

思路:增加一个数据状态表,每次新增订单时同时也新增一条数据状态,然后利用中间件提供的publisher/confirm功能开启确认机制,每次消息发送到MQ中,MQ都会回调生产者的方法返回回执信息,然后再将数据状态表中的数据改为已发送状态。
我们也可以同时创建定时任务,从数据状态表中查找状态异常的数据,然后重新发送到MQ中
新增一个数据表,对数据进行冗余从而确保消息的可靠性。

方式:首先需要将订单存入数据表中,同时新增一条数据状态表,然后将消息推送到队列中,接收者接收到消息并且执行完相应的操作后将这个数据状态改为正常(或者别的能够理解的方式)。注意一定要打开消息队列的确认机制。
rabbitmq:
publisher-confirm-type: correlated # 确认机制 必须要
配置交换机和队列
@Configuration
public class RabbitMQConfiguration {
// 配置一个死信交换机
@Bean
public FanoutExchange deadExchange() {
return new FanoutExchange("dead_order_fanout_exchange", true, false);
}
// 死信队列
@Bean
public Queue deadOrderQueue() {
return new Queue("dead.order.queue", true);
}
// 绑定死信交换机和队列
@Bean
public Binding bindDeadOrder() {
return BindingBuilder.bind(deadOrderQueue()).to(deadExchange());
}
// 配置订单交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("order_fanout_exchange", true, false);
}
// 订单队列,并且设置好死信交换机
@Bean
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dead_order_fanout_exchange");
return new Queue("order.queue", true, false, false, args);
}
// 绑定
@Bean
public Binding bindOrder() {
return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
}
}
业务层代码
@Service
public class MQOrderService {
@Autowired
private OrderDataService orderDataService;
@Autowired
private OrderMQService orderMQService;
public void createOrder(Order order) throws Exception {
orderDataService.saveOrder(order);
orderMQService.sendMessage(order);
}
}
OrderDataService.java保存数据类
@Transactional(rollbackFor = Exception.class)
@Service
public class OrderDataService {
@Autowired
private JdbcTemplate jdbcTemplate;
public void saveOrder(Order order) throws Exception {
String sql = "insert into `order`(order_id,user_id,order_content) values (?,?,?)";
int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());
if (count != 1) {
throw new Exception("订单创建失败。");
}
saveLocalMessage(order);
}
private void saveLocalMessage(Order order) throws Exception{
String sqlString = "insert into orders_message(order_id,order_content,status,unique_id) values(?,?,?,?)";
int count = jdbcTemplate.update(sqlString, order.getOrderId(), order.getOrderContent(), 0, 1);
if (count != 1) {
throw new RuntimeException("保存消息异常");
}
}
}
发送消息,以及rabbitmq返回的ack应答机制(回调)。
@Transactional(rollbackFor = Exception.class)
@Service
public class OrderMQService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
//java 自带注解
//被他注解的方法会在服务器加载servlet的时候运行,并且只执行一次,在init()方法前执行
// 表示此类被实例化完成后调用此方法
@PostConstruct
public void regCallBack() {
//消息发送成功后,给与生产者的消息回执,确保生产者的可靠性
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
System.out.println("cause:" + s);
String orderId = correlationData.getId();
//如果ack为false,则代表消息没收到
if (!ack) {
// 这里可能要进行其他的方式进行存储
System.out.println("应答失败,orderid:" + orderId);
return;
}
try {
String updateSQL = "update orders_message set status = 1 where order_id = ?";
int count = jdbcTemplate.update(updateSQL, orderId);
if (count == 1) {
System.out.println("修改成功,进入消息队列。");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void sendMessage(Order order) {
//通过MQ发送消息
rabbitTemplate.convertAndSend("order_fanout_exchange", "", JSON.toJSONString(order), new CorrelationData(order.getOrderId()));
}
}
测试后,订单表中新增了一条数据,并且状态表新增了一条数据,并且状态数据的status字段值被改为了1,说明流程正常执行,确保了消息生产的可靠。

思路:首先利用RabbitMQ的ACK机制,由消费者控制消息的重发、清除、丢弃,然后再考虑可能会出现的消息重复(多半由于网络动荡原因导致MQ没接收到ack,然后进行了重试发送消息)造成的非幂等性问题(可用mysql的主键、唯一索引之类、redis的分布式锁解决)
在我们没有开启手动ack并且没有设置最大重试次数时,一旦我们的消费者在程序中抛出异常,那么消费者将会一直尝试重新消费。所以,我们一定要将手动ack机制打开,且将最大重试次数给设置上,否则将会一直死循环抛出异常!!!
且我们在前面配置了死信队列,一旦此消息的重试次数达到了最大后,那么此消息会被放置到死信队列中去。
实现消费可靠性,在不进行处理的情况下,如果消息错误,会造成死循环,可以通过两种方式来解决这种情况。
# 这里是开启手动ack,让程序去控制MQ的消息重发
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true # 开启重试
max-attempts: 10 # 最大重试次数
initial-interval: 2000ms # 间隔时间
消费者完整的yml文件
server:
port: 9000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/dispatcher?useUnicode=true&characterEncoding=utf-8
username: root
password: 134520
rabbitmq:
username: admin
password: admin
virtual-host: /
# 开启手动 ack,让程序控制MQ的消息重发和删除和转义
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true # 开启重试
max-attempts: 10 # 最大重试次数
initial-interval: 2000ms # 重试间隔
# 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了
addresses: 192.168.1.18:5672,192.168.1.18:5673,192.168.1.18:5673
消费者监听
@Component
public class MQConsumer {
@Autowired
private DispatcherService dispatcherService;
@RabbitListener(queues = "order.queue")
public void messageConsumer(String orderMsg) throws Exception {
System.out.println("消息:" + orderMsg);
JSONObject order = JSONObject.parseObject(orderMsg);
String orderId = order.getString("orderId");
// 派单处理
dispatcherService.dispatch(orderId);
System.out.println(1 / 0); // 出现异常
}
}
以上重试10次后,仍然出现异常,消息会投放到死信队列中去。
首先也需要打开手动ack模式,然后在监听队列的时候对异常进行相关的处理。如果出现异常,则手动控制消息是否重发,不建议重发,否则在try/catch中会出现死循环;由于我们设置了死信队列,所以该队列会将消息转交给死信队列。
我们在监听死信队列的时候,如果还出现异常就进行相对应的处理,可以将其存储到其他数据库中或者短信提示让人工进行干预,然后将消息移除即可。
消费者进行监听
@Component
public class MQConsumer {
@Autowired
private DispatcherService dispatcherService;
@RabbitListener(queues = "order.queue")
public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
System.out.println("消息:" + orderMsg);
JSONObject order = JSONObject.parseObject(orderMsg);
String orderId = order.getString("orderId");
// 派单处理
dispatcherService.dispatch(orderId);
System.out.println(1 / 0); // 出现异常
// 手动确认
channel.basicAck(tag, false);
} catch (Exception e) {
// 如果出现异常的情况下 根据实际情况重发
// 重发一次后,丢失
// 参数1:消息的tag
// 参数2:多条处理
// 参数3:重发
// false 不会重发,会把消息打入到死信队列
// true 重发,建议不使用try/catch 否则会死循环
// 手动拒绝消息
channel.basicNack(tag, false, false); // false会将数据给死信队
}
}
}
@Header(AmqpHeaders.DELIVERY_TAG) long tag这个注解可以获取到接收到的消息Header中的一些数据,而在手动提交/决绝时必须要携带这个Header中的数据才能正常提交/拒绝。
以上,使用了try-catch如果捕获到异常,那么会直接将数据丢给死信队列,重试次数的配置就已经没有任何意义了。如果在手动拒绝消息时设置为重发,那么配置的最大重试次数也会失效(无限重发)。(如果使用了try-catch捕获异常,手动拒绝消息,重试次数的配置就已经毫无意义了。)
而且,当消费者中接收数据后处理出现异常,很多情况下是bug原因,是需要程序员修改的。
如果要做好消息闭环,那么再配置一个死信接收者,处理后续的消息。如果在死信消费中也异常,那么直接将消息通过人工干预,或存入其他DB中方便查看,再手动拒绝掉。
@Service
public class DeadOrderMQConsumer {
@Autowired
private DispatchService dispatchService;
@RabbitListener(queues = {"dead.order.queue"})
public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
System.out.println("消息:" + orderMsg);
JSONObject order = JSONObject.parseObject(orderMsg);
String orderId = order.getString("orderId");
//派单处理
dispatchService.dispatch(orderId);
// 手动确认
channel.basicAck(tag, false);
} catch (Exception e) {
System.out.println("人工干预");
System.out.println("同时把消息转移到别的存储db");
// 手动拒绝
channel.basicNack(tag, false, false);
}
}
}
基于MQ的分布式事务解决方案优点
缺点
建议