
先装配一个RabbitTemplate
@Bean(name = "rabbitTemplate")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate(@Qualifier("rabbitConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//使用jackson消息转换器
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper));
rabbitTemplate.setEncoding("UTF-8");
/*
* 开启ReturnCallback
* 为false时,匹配不到会直接被丢弃
* 为true时,消息通过交换器无法匹配到队列会返回给生产者,并触发ReturnedMessage
*/
rabbitTemplate.setMandatory(true);
//当消息进入Exchange交换器,但是未进入队列时回调
rabbitTemplate.setReturnsCallback((ReturnedMessage returned) -> {
String correlationId = returned.getMessage().getMessageProperties().getCorrelationId();
log.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, returned.getReplyCode(),
returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
});
/*
* 消息确认yml需要配置publisher-returns: true
* 当消息进入Exchange交换器时就进入回调,不管是否进入队列
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.info("消息发送到exchange失败,原因: {}", cause);
}
});
return rabbitTemplate;
}
发送消息
/*
1. CusRabbitMQConfig.MESSAGE_NOTICE 交换机名称,RabbitMQ中如果不存在该交换机就会报错,并回调ConfirmCallback
2. 第二个参数是RoutingKey,设置为空字符串,那么只能批配到"#",匹配不到(mandatory为true)会回调ReturnsCallback
3. 第三个参数是消息体
4. 这里没有指定交换机的类型,因为交换机是在消费端创建的
*/
rabbitTemplate.convertAndSend(CusRabbitMQConfig.MESSAGE_NOTICE, "", message);
@RabbitListener(bindings = @QueueBinding(
//指定交换机,容器启动时(后置增强器RabbitListenerAnnotationBeanPostProcessor)RabbitMQ中如果不存在就创建该交换机
exchange = @Exchange(value = StreamInputInterface.RECEIVE_NOTICE_MESSAGE, type = ExchangeTypes.TOPIC),
//指定队列,同样在后置增强器里面会与RabbitMq通信,自动创建一个队列与交换机绑定,队列名称自动生成
value = @Queue(durable = "true"),
//指定BindingKey交换机与队列的绑定关系,井号表示匹配所有,意味着只要发送到该交换机的消息都会进入这个队列,从而被消费
key = "#"
), containerFactory = "rabbitListenerContainerFactory")
启动应用
看输出的日志就知道,自动创建了一个队列,该队列持久化并且自动删除。具体过程可以看这篇队列如何自动创建
在RabbitMQ后台也可以看到该队列已经绑定到交换机。停止工程之后队列就消失了。
根据第6条,我们可以设计一个类似于Kafka消费者组的功能,如图:
