RabbitMQ是一个由erlang语言编写的、基于AMQP协议的、开源的消息中间件。支持多种语言使用。
MQ是消息队列(Message Queue)的简称,是应用与程序的通信管道。
AMQP:一种提供统一消息服务的应用层标准消息协议,专门为面向消息的中间件设计的。
工作原理:
生产者生产消息发送到队列,消费者从队列拿消息并处理。生产者在发送完消息后代表着生产者的业务流程已经完成了,消费者也只需要关心消息后的业务。在众多可异步的场景均可使用到。
例如注册完成后需要给账户派送虚拟奖品,派送奖品这一业务就可丢给消费者处理
version: '3.7'
services:
rabbitmq01:
image: rabbitmq:3.9-management
container_name: rabbitmq01
hostname: rabbithost01
# restart: always
ports:
#程序连接使用的端口
- "5672:5672"
#管理控制台使用的端口
- "15672:15672"
environment:
- TZ=Asia/Shanghai
volumes:
- /root/local/rabbitmq:/var/lib/rabbitmq
networks:
- my-net
networks:
#新增的网络 内部服务名调用
my-net:
external: true
启动命令就不贴了,看下启动完成的容器信息
[root@m rabbitmq]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d7875aef4384 rabbitmq:3.9-management "docker-entrypoint.s…" 12 seconds ago Up 10 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq01
从上述内容可以得知,15672是管理后台,直接访问看下http://ip:15672,默认的账后密码是guest/guest,不能外网访问的情况需要去配置文件修改权限。
这个版本不用去修改,可以在日志中看到
rabbitmq01 | 2022-06-22 01:51:33.880125+00:00 [info] <0.222.0> Created user 'guest'
rabbitmq01 | 2022-06-22 01:51:33.881562+00:00 [info] <0.222.0> Successfully set user tags for user 'guest' to [administrator]
rabbitmq01 | 2022-06-22 01:51:33.883061+00:00 [info] <0.222.0> Successfully set permissions for 'guest' in virtual host '/' to '.*', '.*', '.*'

overview:总览页面,如果是集群,nodes就会显示多个节点的信息
connections:已连接的客户端
channel :管道
exchange :交换机
queue:队列
admin:管理信息,里面的virtual-host表示虚拟主机
一条消息的走向:某一个的虚拟主机中的管道channel -> 交换机exchange -> 分发到queue队列

可以看到当前绑定的队列,如果有设置key也会显示

在Bindings这里可以看到绑定的交换机exchange,如果有设置key也会在这里显示

可以对不同的角色设置不同的虚拟主机空间,在配置中也可以直接配置虚拟主机
关于虚拟主机、自定义exchange、队列的详细内容会在下篇文章中分享
本次demo基于springboot2.5.6,mq的pom依赖如下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
addresses: 192.168.0.221:5672
username: guest
password: guest
virtual-host: /
#如果想默认指定发送的exchange和key 则打开下面配置
# template:
# exchange: test_exchange
# routing-key: 123456
listener:
direct:
prefetch: 1000000
simple:
#手动确认 当有自动确认机制 又手动ACK会报406错误
acknowledge-mode: manual
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send() {
JSONObject object = new JSONObject();
object.put("12345", "22222");
//convertAndSend也支持直接传递消息 会走默认配置的交换机和key
rabbitTemplate.convertAndSend("test_exchange","123456",object.toJSONString());
return "success";
}
使用默认交换机时,发送消息会报ACCESS_REFUSED - 默认交换不允许操作异常。所以这里手动创建交换机和队列并进行绑定。
@Bean
public boolean createQueues(@Autowired ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.declareExchange(new DirectExchange("test_exchange"));
//如果有需要每次重启都重新创建队列的 可以设置Queue构造方法的autoDelete=true
rabbitAdmin.declareQueue(new Queue("test_queue"));
rabbitAdmin.declareBinding(new Binding("test_queue", Binding.DestinationType.QUEUE,
"test_exchange", "123456", null));
//这里是循环创建队列并关联的代码 有需要可以取用
// String exchangeName = "test1234";
// for (int i = 0; i < 5; i++) {
// String queueName = "test_queue_" + i;
// rabbitAdmin.declareExchange(new DirectExchange(exchangeName));
// rabbitAdmin.declareQueue(new org.springframework.amqp.core.Queue(queueName));
// rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, "key_" + i, null));
// }
return true;
}
//消息队列监听,可多个
@RabbitHandler
@RabbitListener(queues = {"test_queue"})
public void onMessage(Message message, Channel channel) throws Exception {
String msgBodyString = new String(message.getBody());
JSONObject json = JSONObject.parseObject(msgBodyString);
log.info("消费消息:{},{}", json, message.getMessageProperties().getConsumerQueue());
//进行消费 只有确认消费了的才不会从新进入队列
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
调用接口发送消息,查看消费者日志
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"12345":"22222"},test_queue
以下数据转=转载自@m0_61174282,仅为记录用。
spring.rabbitmq.host: 服务Host
spring.rabbitmq.port: 服务端口
spring.rabbitmq.username: 登陆用户名
spring.rabbitmq.password: 登陆密码
spring.rabbitmq.virtual-host: 连接到rabbitMQ的vhost
spring.rabbitmq.addresses: 指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
spring.rabbitmq.requested-heartbeat: 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.publisher-confirms: 是否启用【发布确认】
spring.rabbitmq.publisher-returns: 是否启用【发布返回】
spring.rabbitmq.connection-timeout: 连接超时,单位毫秒,0表示无穷大,不超时
spring.rabbitmq.parsed-addresses:
# ssl
spring.rabbitmq.ssl.enabled: 是否支持ssl
spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径
spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码
spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码
spring.rabbitmq.ssl.algorithm: ssl使用的算法,例如,TLSv1.1
# cache
spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
spring.rabbitmq.cache.connection.size: 缓存的连接数,只有是CONNECTION模式时生效
spring.rabbitmq.cache.connection.mode: 连接工厂缓存模式:CHANNEL 和 CONNECTION
# listener
spring.rabbitmq.listener.simple.auto-startup: 是否启动时自动启动容器
spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
spring.rabbitmq.listener.simple.prefetch: 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
spring.rabbitmq.listener.simple.transaction-size: 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
spring.rabbitmq.listener.simple.idle-event-interval: 多少长时间发布空闲容器时间,单位毫秒
spring.rabbitmq.listener.simple.retry.enabled: 监听重试是否可用
spring.rabbitmq.listener.simple.retry.max-attempts: 最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.listener.simple.retry.multiplier: 应用于上一重试间隔的乘数
spring.rabbitmq.listener.simple.retry.max-interval: 最大重试时间间隔
spring.rabbitmq.listener.simple.retry.stateless: 重试是有状态or无状态
# template
spring.rabbitmq.template.mandatory: 启用强制信息;默认false
spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
spring.rabbitmq.template.retry.enabled: 发送重试是否可用
spring.rabbitmq.template.retry.max-attempts: 最大重试次数
spring.rabbitmq.template.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
spring.rabbitmq.template.retry.multiplier: 应用于上一重试间隔的乘数
spring.rabbitmq.template.retry.max-interval: 最大重试时间间隔
以上就是本章的全部内容了。
上一篇:Zookeeper第四话 – zookeeper高可用集群搭建
下一篇:RabbitMQ第二话 – Springboot基于四种Exchange(Direct、Fanout、Topic、Heders、延时队列)的实现和多虚拟主机下的生产消费者实现
书山有路勤为径,学海无涯苦作舟