RabbitMQ(四) --消费者Consumer一文中详细讲解了MQ消息消费的相关问题,在SpringAMQP中基本都会选择针对Connetion配置队列的监听器进行消息消费。配置默认的监听实例类SimpleMessageListenerContainer中对于消息消费的确认默认为autoAck,接下来本文将围绕手动ACK讲述
首先需要做的就是在消费监听中配置属性acknowledge为manual表示手动确认,然后就是在属性ref中配置MessageListener的实现
<!--配置消费者监听-->
<rabbit:listener-container connection-factory="cacheConnectionFactory" message-converter="jackson2JsonMessageConvert"
acknowledge="manual">
<rabbit:listener id="jjDeadBaseQueueListener" queues="jjDeadBaseQueue" ref="jjConsumerListener"/>
</rabbit:listener-container>
ChannelAwareMessageListener是SpringAMQP中封装提供的一个可以实现手动确认的接口,相对于默认的MessageListener接口多了一个返回参数channel,通过该信道参数就可以完成手动ACK的研发。如果有关消息确认方面的问题可以跳转到另外一篇文章RabbitMQ(四) --消费者Consumer
/**
* @author zsl
* @version 1.0.0
* @date: 2020/5/11 14:24
**/
@Component
public class JjConsumerListener implements ChannelAwareMessageListener {
@Override
public void onMessage (Message message, Channel channel) throws Exception {
// 消息
byte[] body = message.getBody();
// 确认ID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 信道确认
channel.basicAck(deliveryTag,false);
}
}
消费端每次访问RabbitMQ取出一条消息?这明显是不机智的设计,前面的文章也提到过这样的网络访问消耗是巨大的。所以有消息预取的概念,在SpringAMQP中如果没有限制那么将会是250数量消息的预取,假设这时消息体较大取到程序内存中,那就是自我炸裂。所以针对消息情况进行该参数控制很有必要
在MessageListener中进行配置,具体如下所示:配置参数prefetch
<rabbit:listener-container connection-factory="cacheConnectionFactory" message-converter="jackson2JsonMessageConvert"
acknowledge="manual" prefetch="10">
<rabbit:listener id="jjDeadBaseQueueListener" queues="jjDeadBaseQueue" ref="jjConsumerListener"/>
</rabbit:listener-container>
这时再通过监控WEB端可以看到其预取数量变为设置的数量10
消息的积压大家都遇到过,消费者消息消费的速度跟不上生产者速度就会导致消息积压,对于很多消息及时性场景来讲都是不能接受的。所以SpringAMQP中提供并发消费的配置,如下所示:没有配置消费者数量时默认都是只有一个消费者
再上控制WEB界面结果如下所示:
并发消费很重要的一点就是要知道这对于消息消费顺序有要求的场景千万不能用!!!