Springboot 版本: 2.7.0
Spring-RabbitMQ 异步消息接收实现主要有2个核心组件:
RabbitMQ 配置类:
@Slf4j
@Configuration
public class RabbitConfiguration {
public final static String TOPIC_EXCHANGE = "myExchange";
public final static String QUEUE_NAME = "myPushQueue";
@Bean
public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setExchange(TOPIC_EXCHANGE);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息:{}发送成功", correlationData.getId());
} else {
log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
}
});
template.setMandatory(true);
template.setReturnsCallback(returned -> {
log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
});
return template;
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(queue());
container.setMessageListener(exampleListener());
return container;
}
// 消费者接收消息
@Bean
public MessageListener exampleListener() {
return message -> log.info("received -> {}", message);
}
}
配置文件:
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: my_vhost
# 消息确认(ACK)
publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
生产者:
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend("my.test.msg", new User("Kleven", 18), correlationData);
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User implements Serializable {
private static final long serialVersionUID = -5079682733940745661L;
private String name;
private Integer age;
}
结果:
: received -> (Body:'{"name":"Kleven","age":18}' MessageProperties [headers={spring_listener_return_correlation=4c01ac5f-196e-498f-973b-429d023087e3, spring_returned_message_correlation=bd9d577f-c427-4fa2-b37e-423665f59e4f, __TypeId__=info.kleven.javacamp.spring.rabbitmq.domain.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myExchange, receivedRoutingKey=my.test.msg, deliveryTag=1, consumerTag=amq.ctag-UvD_6hpDGZVjIeGDtoOtXw, consumerQueue=myPushQueue])
: 消息:bd9d577f-c427-4fa2-b37e-423665f59e4f发送成功
可以同时获得Channel对象实例,只修改案例一配置中的消费者代码。
配置类:
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(queue());
container.setMessageListener(exampleChannelAwareMessageListener());
return container;
}
//消费者
@Bean
public ChannelAwareMessageListener exampleChannelAwareMessageListener() {
return (message, channel) -> {
log.info("channel -> {}", channel);
log.info("received -> {}", message);
};
}
结果:
: channel -> Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/my_vhost,1), conn: Proxy@5101669b Shared Rabbit Connection: SimpleConnection@bc510fd6 [delegate=amqp://admin@127.0.0.1:5672/my_vhost, localPort= 56290]
: received -> (Body:'{"name":"Kleven","age":18}' MessageProperties [headers={spring_listener_return_correlation=71e02a0e-2067-43ac-bf4e-6eb561df9a6b, spring_returned_message_correlation=2cab0928-19c1-49fb-a02a-e75331665207, __TypeId__=info.kleven.javacamp.spring.rabbitmq.domain.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myExchange, receivedRoutingKey=my.test.msg, deliveryTag=1, consumerTag=amq.ctag-q-VGpyVJmm9ikD_Ku4jMZg, consumerQueue=myPushQueue])
: 消息:2cab0928-19c1-49fb-a02a-e75331665207发送成功
多数情况下都会使用这个方案,很方便也很灵活。下面是个简单的例子。
从案例一的配置类中移除消费者相关代码,其他不变。
消费者:
@RabbitListener(queues = "myPushQueue")
public void processOrder(String message) {
log.info("received -> {}", message);
}
结果:
: 消息:9bacf20e-3bb1-4f5e-95a9-a5fb9cd7ff9a发送成功
: received -> {"name":"Kleven","age":18}