现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的。 引入消息队列可以给这个项目带来很多的好处:比如
再通过A、B、C这三个系统举例。A系统在返回给用户的执行结果前需要完成B、C系统的调用,这个总的执行时间是A+B+C的执行时间,如果引入MQ,A系统的执行完成后将数据投递到MQ,直接响应用户。B、C再这在通过监听完成数据的处理。这样也降低了用户的等待时间
除了这些好处,当然引入MQ还会有不好的地方:比如
这些都是问题,所在是否要引入MQ还需要看业务需求
这里有张投递消息到消费的流程图

从这张图上可看出这也是一种AMQP协议的实现。消息的提供者先是通过某一个信道将消息发送到交换机,然后交换机通通RoutingKey来将消息分发到某一个队列上。然后,消费者在临听某一个队列来进行消息的消费。
今天我们的主题是如何保证消息的投递可靠性。那么我们来想想在这个流程中那些位置可能会影响我们消息的投递可靠性?
从上图中我们可以总结出有二个因素影响着消息是否被成功投递和被成功消费
解决这个问题,我们可以通过提供者的发送方确认机制来实现,这个发送方确认机制又分成三种:
首先要在当前的Channel上开启消息确认模式,然后通过waitForConfirms()方法进行消息确认是否发送成功。
- public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
- ConnectionFactory cf = new ConnectionFactory();
- cf.setHost("host");
- cf.setPort(5672);
- cf.setUsername("账号");
- cf.setPassword("密码");
-
- try(Connection connection = cf.newConnection();
- Channel channel = connection.createChannel()){
- channel.confirmSelect();
-
- Map<String,String> mes = new HashMap<>();
- mes.put("name","1111");
- String messageStr = objectMapper.writeValueAsString(mes);
- channel.basicPublish(
- "exchange.drinks",
- "drinks.juzi",
- null,
- messageStr.getBytes());
- boolean isSendSuccess = channel.waitForConfirms();
- if(isSendSuccess){
- System.out.print("消息发送成功");
- }
- }
- }
- 复制代码
这样做的话每次发完消息后,都会确保消息是否发送成功。如果发送失败的话进行相应的处理。
多条消息的确认和单条的差不多,比如我将发送消息的代码放到一个循环内。
- public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
- ConnectionFactory cf = new ConnectionFactory();
- cf.setHost("host");
- cf.setPort(5672);
- cf.setUsername("账号");
- cf.setPassword("密码");
-
- try(Connection connection = cf.newConnection();
- Channel channel = connection.createChannel()){
- channel.confirmSelect();
-
- Map<String,String> mes = new HashMap<>();
- mes.put("name","1111");
- String messageStr = objectMapper.writeValueAsString(mes);
- for(int i = 0;i < 100;i++){
- channel.basicPublish(
- "exchange.drinks",
- "drinks.juzi",
- null,
- messageStr());
- }
- boolean isSendSuccess = channel.waitForConfirms();
- System.out.println(isSendSuccess);
- }
- }
- 复制代码
这样的话当一批消息发送完成后,进行统一的消息确认是否发送成功,就成了多条的消息确认,不过并不推荐使用这种确认消息的方式
在多条的消息确认中,比如我先是发送了一批的消息,比如这批消息有100条,这个时候如果有其中的一条消息没有发送成功,这里返回的也是false,然尔我们并不能知道是具体的哪 一条消息发送失败。
异步的消息确认是通过一个监听器来实现的,当消息发送后,会接着执行下面的逻辑,可能在稍会的一段时间,监听器监听到了Broker的返回,再进行逻辑的处理。
- public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
- ConnectionFactory cf = new ConnectionFactory();
- cf.setHost("host");
- cf.setPort(5672);
- cf.setUsername("账号");
- cf.setPassword("密码");
-
- try(Connection connection = cf.newConnection();
- Channel channel = connection.createChannel()){
- channel.confirmSelect();
- ConfirmListener confirmListener = new ConfirmListener() {
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("发送成功:" + deliveryTag + " multiple:" + multiple);
- }
-
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("发送失败:" + deliveryTag);
- }
- };
-
- channel.addConfirmListener(confirmListener);
- Map<String,String> mes = new HashMap<>();
- mes.put("name","11111");
- String messageStr = objectMapper.writeValueAsString(mes);
- for(int i = 0;i < 100;i++){
- channel.basicPublish(
- "exchange.drinks",
- "drinks.juzi",
- null,
- messageStr.getBytes());
- }
- Thread.sleep(Integer.MAX_VALUE);
- }
- }
- 复制代码
当成功的发送消息的时候会回调监听器中的handleAck方法,如果没有发送成功会回调handleNack方法 在这个监听器里面有两个参数一个deliveryTag和multiple:
这个异步的虽然在听觉上感觉比较厉害些,这里也不推荐使用,原因和上面的一样,我们并不能具休的知道是哪一条消息没有被确认发送。
综上:这里更加推荐单条消息确认,具体选择哪一种还是要用业务做出选择
注:注意一点是当一条消息成功的发送到Broker,但是如果没有正确的路由到队列,那么这时borker也是会返回true,因为Broker确时接收到了消息只是RoutingKey不可达,所以这里也会返回true,并且直接将消息丢弃
这个消息返回机制的作用就是在当一个消息成功的发送,但是并没有正确路由到队列的时候所回调的。这也弥补了上面确认消息是否发送成功但没有路由到队列所返回true的问题 在使用消息返回机制的时候在发送消息时需要将mandatory置成true。再添加对应的监听器。
- public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
- ConnectionFactory cf = new ConnectionFactory();
- cf.setHost("host");
- cf.setPort(5672);
- cf.setUsername("账号");
- cf.setPassword("密码");
-
- try(Connection connection = cf.newConnection();
- Channel channel = connection.createChannel()){
- channel.addReturnListener(new ReturnCallback() {
- @Override
- public void handle(Return returnMessage) {
- System.out.println("replyCode:" + returnMessage.getReplyCode() + " replyText:" + returnMessage.getReplyText() + " routingKey:"
- + returnMessage.getRoutingKey() + " exchange:" + returnMessage.getExchange() + " body:" + new String(returnMessage.getBody()));
- }
- });
- Map<String,String> mes = new HashMap<>();
- mes.put("name","11111");
- String messageStr = objectMapper.writeValueAsString(mes);
- channel.basicPublish(
- "exchange.drinks",
- "drinks.juzi1",
- true,
- null,
- messageStr.getBytes());
- Thread.sleep(Integer.MAX_VALUE);
- }
- }
- 复制代码
这里的addReturnListener方法有两个重载:只不过是handle的参数不同,一个是参数都显示在了参数列表内,一个是将参数封装到了Return对象内。当handle被回调的时候也可以获取到相应的参数比如:exchange routingkey body。