RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
Broker
代理人,消息队列的主体,负责管理 消息 的接收、发送和存储。
VHost
虚拟主机,发挥 消息队列 内部系统隔离的作用。
Exchange
交换机,完成 消息 的特定传递。
常用有以下三种:
DirectExchange
直接交换机,点对点传输。
FanoutExchange
扇出交换机,广播传输。
TopicExchange
主题交换机,可根据 routing key 完成分组传输。
Queue
队列,存储消息的地方。
Binding
绑定关系,交换机 和 交换机、交换机 和 队列 都可完成绑定。
Message(routing key)
消息,指定 路由键(routing key)可使 消息 传递到特定的队列。
Connect
连接,客户端 和 Broker 的连接是一种长连接,消息 在该连接中的一条 信道(channel)中完成传输。
Producer
生产者,生产消息。
Consumer
消费者,消费消息。
/**
* 1、创建 Exchange、Queue、Binding
* 1.1、AmqpAdmin
* 2、收发消息
* 2.1、RabbitTemplate
* 2.2、RabbitListener 类、方法
* 2.3、RabbitHandler 方法
* 2.3.1 用于区分 同一队列中的不同消息,不同队列中的不同消息。
* 3、消息确认机制
* 3.1、发送方
* 3.1.1、confirmCallback 确认模式
* producer -> Exchange
* 3.1.2、returnCallback 回退模式
* Exchange -> Queue
* 3.2、接收方
* 3.2.1、ACK机制
* Queue -> Consumer
*/
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
@Test
void createExchange() {
/*
DirectExchange(String name, boolean durable, boolean autoDelete, Map arguments)
直接交换机 点对点
*/
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
amqpAdmin.declareExchange(directExchange);
log.info("DirectExchange 创建成功");
}
@Test
void createQueue() {
/*
Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map arguments)
队列
*/
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue 创建成功");
}
@Test
void createBinding() {
/*
Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map arguments)
绑定关系
*/
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
"hello-java-exchange", "hello-java", null);
amqpAdmin.declareBinding(binding);
log.info("Binding 创建成功");
}
@Test
void sendMessageString() {
/*
void convertAndSend(String exchange, String routingKey, Object object);
*/
rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", "hello world");
log.info("消息已发送");
}
@Test
void sendMessageObj() {
/*
对象需要实现 Serializable 接口
*/
Data data = new Data();
data.setId("1");
data.setName("小米");
data.setTime(LocalDateTime.now());
rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data);
log.info("消息已发送");
}
@Test
void sendMessageJson() {
/*
配置 MessageConverter -> new Jackson2JsonMessageConverter()
*/
Data data = new Data();
data.setId("1");
data.setName("小米");
data.setTime(LocalDateTime.now());
rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data);
log.info("消息已发送");
}
@lombok.Data
static class Data implements Serializable {
private String id;
private String name;
private LocalDateTime time;
}
注:启动类需要添加 @EnableRabbit 注解。
/*
Message message, Data content, Channel channel
1、客户端 接收到消息后,Queue 中该消息会自动删除
2、客户端 有序接收消息
*/
@RabbitListener(queues = "hello-java-queue")
void receiveMessage(Message message, Data content, Channel channel) {
System.out.println("接收到消息,内容为:" + message + ", 类型为:" + message.getClass());
}
@Service
@RabbitListener(queues = "hello-java-queue")
class TestService {
@RabbitHandler
void manualAck(Message message, Data content, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
/*
签收
void basicAck(long deliveryTag, boolean multiple);
deliveryTag 通道自增
multiple 是否为批量模式
拒收
void basicNack(long deliveryTag, boolean multiple, boolean requeue);
requeue 是否重新入队
void channel.basicReject(long deliveryTag, boolean multiple, boolean requeue);
*/
channel.basicAck(deliveryTag, false);
// channel.basicNack(deliveryTag, false, true);
// channel.basicReject(deliveryTag, true);
}
}
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
# 异步回调 returnCallback
mandatory: true
listener:
simple:
# 手动 ack
acknowledge-mode: manual
package com.hong.changfeng.information.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Slf4j
@Configuration
public class RabbitConfig {
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate(rabbitTemplate);
return rabbitTemplate;
}
/**
* 使用 JSON 序列化机制 进行消息转换
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制 RabbitTemplate
*/
public void initRabbitTemplate(RabbitTemplate rabbitTemplate) {
// 设置 消息到达 Broker 的确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 消息到达 Broker 时触发
*
* @param correlationData 当前消息的唯一关联数据(id)
* @param ack 消息是否到达 Broker(Exchange)
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.debug("RabbitConfig -> initRabbitTemplate " +
" correlationData: " + correlationData + " ack: " + ack + " cause: " + cause);
}
});
// 设置 消息到达 Queue 的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 消息未到达 Queue 时触发
*
* @param message 消息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.debug("RabbitConfig -> initRabbitTemplate " +
" message: " + message + " replyCode: " + replyCode + " replyText: " + replyText +
" exchange: " + exchange + " routingKey: " + routingKey);
}
});
}
}