RabbitMQ基本概念、基本使用参考这里:https://blog.csdn.net/m0_55155505/article/details/125360639
docker安装命令:
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
开机自启命令:
docker update rabbitmq --restart=always

图片来源:https://www.bilibili.com/video/BV1np4y1C7Yf?p=251
访问 ip:15672,默认用户名、密码都是guest
pom:
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yaml:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: / #虚拟主句
username: guest
password: guest
# 发送端确认
# 发送端确认类型(发送端到代理端)
publisher-confirm-type: correlated
# 开启消息抵达队列确认
publisher-returns: true
# 只要抵达队列,以异步发送优先回调我们这个confirm return
template:
mandatory: true
# 接收端
# 手动ack消息(默认自动)
listener:
direct:
acknowledge-mode: manual
配置发送消息使用Json进行序列化
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitMqConfig{
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
使用AmqpAdmin 创建交换机、队列,并绑定
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void testCreateAndBinding() {
String eName = "test.java.exchange";
Exchange exchange = new DirectExchange(eName, true, false);
amqpAdmin.declareExchange(exchange);
log.info("Exchange创建完成:{}", exchange);
String qName = "test.java.queue";
Queue queue = new Queue(qName, true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue创建完成:{}", queue);
String routingKey = qName;
Binding binding = new Binding(qName, Binding.DestinationType.QUEUE, eName, routingKey, null);
amqpAdmin.declareBinding(binding);
log.info("绑定成功:{}", binding);
}

发送消息示例
@Test
void send() {
String eName = "test.java.exchange";
String routingKey = "test.java.queue";
rabbitTemplate.convertAndSend(eName, routingKey, new Date());
}
参数可以写以下类型:
T data:T是发送消息的类型,例如:Date dateMessage:原生消息详细信息。头+体Channel:当前传输数据的管道
监听消息注解(必须有@EnableRabbit)
@RabbitListener:可标再类和方法上,使用queues声明要监听的队列(一般标再类上,声明监听哪个队列)@RabbitHandler:标再方法上(当队列可能受到不同的消息时,重载区分不同消息)


Queue可以有很多人来监听,只要收到消息,队列删除消息,而且只能由一个收到此消息



可配置如下内容:
//消息成功抵达代理服务器的回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("成功:关联的数据:{},是否成功收到:{},失败原因:{}", correlationData, ack, cause);
}
});
//消息未抵达队列的回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("失败:消息:{},返回码:{},返回信息:{},发给哪个交换机:{},使用的routingKey:{}", message, replyCode, replyText, exchange, routingKey);
}
});
CorrelationData 可在发送消息时指定:

例如:
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, new CorrelationData(UUID.randomUUID().toString()));

默认是自动ack的,存在问题:
配置手动ack:spring.rabbitmq.listener.direct.acknowledge-mode:manual
确认示例:
public void getData(Date date, Message message, Channel channel){
...
try {
//标签
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//是否批量确认
boolean multiple=false;
channel.basicAck(deliveryTag,multiple);
} catch (IOException e) {
//网络异常
}
}
使用场景:


消息的TTL(time to live)

DLE(dead letter exchanges)

延迟队列的实现:

