• RabbitMQ快速入门


    日升时奋斗,日落时自省 

    目录

    1、MQ基本概念

    2、RabbitMQ简介

    3、RabbitMQ模式

    3.1、RabbitMQ安装(linux)

    3.1.1、直接安装

    3.1.2、docker拉取

    3.2、项目搭建

    3.2.1、创建项目

    3.2.2、配置

    3.3、简单模式

    3.3、Work queues 工作队列模式

    3.4、Pub/Sub 订阅模式

    3.4.1、fanout类型

    3.4.2、direct类型

    3.4.3、topic类型

    3.5、死信队列

    3.6、Lazy Queue

    4、确认机制

    5、重试机制


    注:该博客有点长,重复内容不多,希望友友们慢慢观看

    1、MQ基本概念

    (1)应用解耦

    服务与服务之前进行远程调用时不需要直接调度,降低了关联性,服务直接给中间件发送消息通过MQ消息给调度服务,做到降低服务与服务之间耦合性

    (2)异步提速

    如何做到异步提速呢???主要就是MQ作为消息中间人,进行转发告诉其他服务这边已经好了,你们可以开始了,为啥说这里的异步快,因为Rabbitmq请求速度是微妙级别的,速度相比远程调用的请求速度是要快的多的

    (3)削峰填谷

    先来理解一下削峰填谷:图有点抽象,麻烦友友们慢慢看一下,暂时也没有想到很突显的图

    如何做到削峰填谷呢

    MQ的产品

    2、RabbitMQ简介

    AMQP, 即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。 

    RabbitMQ基础架构图

    下面来解释以下怎么看这个图:producer就是提供者,通过channel(频道)连接到Exchange(交换机)交换机通过虚拟用户绑定一个Queue(队列);consumer就是消费者,通过channel(频道)直接去Queue队列里拿消息(前提是要有消息)

     Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace (进行隔离)概念。当多个不同的用户使用同一个 RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/ queue 等

    Connection: publisher/ consumer 和 broker 之间的 TCP 连接

    Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。 Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯, AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection极大减少了操作系统建立 TCP connection 的开销

    Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有: direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
    Queue: 消息最终被送到这里等待 consumer 取走
    Binding: exchange 和 queue 之间的虚拟连接, binding 中可以包含 routing key。 Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

    3、RabbitMQ模式

    去官网上看有以下六种模式:友友们可以直接来官网上看RabbitMQ Tutorials — RabbitMQ

    3.1、RabbitMQ安装(linux)

    3.1.1、直接安装

    RabbitMQ安装在linux上需要一定的依赖环境

    这里直接给友友们 配上:(如果是Ubuntu的直接yum换成apt就行)

    yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

    还需要一个前置环境Erlang(从下面进行去安装包,也可以去对应官网取) 

    链接: https://pan.baidu.com/s/1QVOM5FOHFezovA63vyi5Bg?pwd=1234 提取码: 1234 

    rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

    rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

    开启管理界面:

    rabbitmq-plugins enable rabbitmq_management

    修改默认配置信息

    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

    注:进来后找这个位置(只改这一行)改成下面的样子就可以了一会要靠这个登录呢

    启动RabbitMQ

    service rabbitmq-server start

    访问地址:ip:15672 (如果界面打不开,可能是防火墙没有关,或者云服务器安全组没有开放,针对问题稍微百度一下即可)

    注:15672是RabbitMQ的Web管理界面的默认监听端口,5672是AMQP协议的默认端口,25672是RabbitMQ集群间节点通信的默认端口(当前我们使用管理页面访问15672接口即可)

    注:账号密码都是guest

    3.1.2、docker拉取

    mq.tar文件还是从https://pan.baidu.com/s/1QVOM5FOHFezovA63vyi5Bg?pwd=1234取

    docker直接拉取也挺好用的,先把tar文件加载一下

    docker load -i mq.tar

    然后创建一个网络

    docker network create rabbtimq-maxxub

    剩下的跑起来就行 -e 是环境变量其实就是设置登录名和密码

    到时候登录名:maxxub; 密码就是1223456

    docker run \

     -e RABBITMQ_DEFAULT_USER=maxxub \

     -e RABBITMQ_DEFAULT_PASS=123456 \

     -v mq-plugins:/plugins \

     --name mq \

     --hostname mq \

     -p 15672:15672 \

     -p 5672:5672 \

     --network rabbtimq-maxxub \

     -d \

     rabbitmq:3.8-management

    3.2、项目搭建

    3.2.1、创建项目

    (1)父工程

    直接上springboot先创建父工程目起个名字(我这里叫mq-demo)以下是需要的依赖

    1. <module>publishermodule>
    2. <module>consumermodule>
    3. pom
    4. org.springframework.boot
    5. spring-boot-starter-parent
    6. 2.7.12
    7. 8
    8. 8
    9. org.projectlombok
    10. lombok
    11. org.springframework.boot
    12. spring-boot-starter-amqp
    13. org.springframework.boot
    14. spring-boot-starter-test
    15. org.springframework.boot
    16. spring-boot-starter-logging

    (2)创建消费者模块和提供者模块(子模块不需要依赖)

    3.2.2、配置

    application.yml 消费者和提供者都可以使用同一份,复制过去就行了

    下面需要友友们知道有虚拟用户这个东西;

    1. logging:
    2. pattern:
    3. dateformat: MM-dd HH:mm:ss:SSS
    4. level:
    5. com.itheima: debug
    6. spring:
    7. rabbitmq:
    8. host: 这里写友友们自己的虚拟机ip或者云服务器ip
    9. port: 5672
    10. virtual-host: / #虚拟用户
    11. username: maxxub #登录rabbitmq的账号
    12. password: 123456 #登录rabbitmq的密码
    13. listener:
    14. simple:
    15. prefetch: 1
    16. acknowledge-mode: auto #模式:自动 进行一定的重试次数
    17. retry:
    18. enabled: true # 超时重试
    19. initial-interval: 1000ms
    20. multiplier: 2 #连接失败的重试
    21. max-attempts: 3 #最大重试次数的
    22. stateless: true #针对事务,默认为falsetrue针对上下文有定义保存,是一个状态

    3.3、简单模式

    P表示:producer 提供者

    红色框:Queue 队列

    C表示:consumer 消费者

    简介叙述:直接通过队列进行,比较简单,没有啥条件限制

    提供者:这里直接就在测试进行开始写了

    消息发送依赖于RabbitTemplate类,这个类直接注入就行了

    1. @Slf4j
    2. @SpringBootTest
    3. public class SpringAmqpTest {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. @Test
    7. void testSendMessage2Queue(){
    8. String queueName="simple.queue"; //我们这里写队列名称一会在管理页面创建
    9. String msg="hello,ampq!"; //消息
    10. rabbitTemplate.convertAndSend(queueName,msg); //参数队列 + 消息
    11. }
    12. }

    消费者:

    先去创建一个队列,直接到rabbitmq管理页面进行创建就行了,ip:15672 直接登录既可以

    消费者的代码

    1. @SpringBootApplication
    2. public class ConsumerApplication { //启动类
    3. public static void main(String[] args) {
    4. SpringApplication.run(ConsumerApplication.class, args);
    5. }
    6. }
    1. @Slf4j
    2. @Component
    3. public class MqListener {
    4. @RabbitListener(queues = "simple.queue") //使用该注解监听队列消息
    5. public void listenSimpleQueue(String msg){
    6. System.out.println("消费者收到simple.queue的消息"+msg);
    7. }
    8. }

    注:先启动消费者的启动类 ,再启动提供者的测试代码消费者会收到消息并且直接打印

    3.3、Work queues 工作队列模式

    P表示:producer 提供者

    红色框:Queue 队列

    C表示:consumer 消费者

    注:这次有两个消费者,其实也可以是多个消费者

    与简单模式没有很大的区别,使用同一个队列

    消费者:(很简单没有任何区别,就是添加了一个方法) 该方法还是写在MqListener 类中

    1. @Slf4j
    2. @Component
    3. public class MqListener {
    4. @RabbitListener(queues = "simple.queue")
    5. public void listenSimpleQueue1(String msg) throws InterruptedException {
    6. System.out.println("消费者1收到simple.queue的消息"+msg);
    7. Thread.sleep(20);
    8. }
    9. @RabbitListener(queues = "simple.queue")
    10. public void listenSimpleQueue2(String msg) throws InterruptedException {
    11. System.err.println("消费者2收到simple.queue的消息"+msg);
    12. Thread.sleep(200);
    13. }
    14. }

    注:这里加了睡眠时间是为了下面演示做准备

    提供者:(测试一下)写在producer测试代码 其实也没用改啥,就是给这个队列多发点消息

    1. @Slf4j
    2. @SpringBootTest
    3. public class SpringAmqpTest {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. @Test
    7. void testSendMessage2Queue() throws InterruptedException {
    8. String queueName="simple.queue";
    9. for(int i=1;i<50;i++){
    10. String msg="hello,ampq!";
    11. rabbitTemplate.convertAndSend(queueName,msg);
    12. Thread.sleep(20);
    13. }
    14. }
    15. }

    注:先启动消费者启动类,在启动提供者测试类

    1. listener:
    2. simple:
    3. prefetch: 1 #每次每次只处理一个消息
    4. acknowledge-mode: auto
    5. retry:
    6. enabled: true # 超时重试
    7. initial-interval: 1000ms #设定重试时间
    8. multiplier: 2 #连接失败的重试
    9. max-attempts: 3 #最大重试次数
    10. stateless: true

    这里给消费者2设置等待时间是200ms秒,消费者设置等待时间是20ms,处理是及时处理,也就是一部分是消费者1一部分是消费者2

    这里展示的结果跟我们前面的配置参数有关

    1. listener:
    2. simple:
    3. prefetch: 1 #每次每次只处理一个消息
    4. acknowledge-mode: auto
    5. retry:
    6. enabled: true # 超时重试
    7. initial-interval: 1000ms #设定重试时间
    8. multiplier: 2 #连接失败的重试
    9. max-attempts: 3 #最大重试次数
    10. stateless: true

    如果把prefetch: 1该参数去掉消费者2等待时间为200ms>>20ms所以会进行等待处理导致消息处理饥饿

    3.4、Pub/Sub 订阅模式

    P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    C:消费者,消息的接收者,会一直等待消息到来
    Queue:消息队列,接收消息、缓存消息
    Exchange:交换机(X)一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。 Exchange有常见以下3种类型:

    简单说三种常见交换机类型:fanout、direct、topic

    有交换机和队列了,我们简单教友友们了解一下如何绑定交换机和队列关系在管理页面

    后面就开始给友友们代码直接创建交换机和队列并且进行绑定(这是比较好的操作)

    3.4.1、fanout类型

    下面这个位置创建一个类

    直接上代码:(代码创建交换机和队列并且进行绑定,这里创建队列与管理页面创建效果都是一样的,就是更快,也不容易出问题,消费者跑起来以后,可以去管理页面看,交换机和队列都是存在的)

    1. @Configuration
    2. public class FanoutConfiguration {
    3. @Bean //创建FanoutExchange类型交换机 以下是两种方法 两个选择一个就行
    4. public FanoutExchange fanoutExchange(){
    5. // ExchangeBuilder.fanoutExchange("maxxub.fanout").build(); //使用Builder创建
    6. return new FanoutExchange("maxxub.fanout2"); //直接new一个交换机对象 参数:填写交换机名称
    7. }
    8. @Bean //创建Queue类型队列 以下是两种方法 两个选择一个就行
    9. public Queue fanoutQueue3(){
    10. // QueueBuilder.durable("fanout.queue3").build(); //使用Builder创建
    11. return new Queue("fanout.queue3"); //直接new一个队列对象 参数:填写队列名称
    12. }
    13. @Bean //进行队列和交换机绑定 使用绑定构建
    14. public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){
    15. //直接看 BindingBuilder 就是 bind(绑定)fanoutQueue3队列 to(到) fanoutExchange交换机上
    16. return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
    17. }
    18. @Bean
    19. public Queue fanoutQueue4(){
    20. // QueueBuilder.durable("fanout.queue4").build();
    21. return new Queue("fanout.queue4");
    22. }
    23. @Bean
    24. public Binding fanoutBinding4(){
    25. //可以直接调用方法
    26. return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
    27. }
    28. }

    消费者:(MqListener类中写就行) 还是同样的监听

    1. @RabbitListener(queues = "fanout.queue3")
    2. public void listenFanoutQueue3(String msg) throws InterruptedException {
    3. System.out.println("消费者收到fanout.queue3的消息"+msg);
    4. Thread.sleep(200);
    5. }
    6. @RabbitListener(queues = "fanout.queue4")
    7. public void listenFanoutQueue4(String msg) throws InterruptedException {
    8. System.err.println("消费者收到fanout.queue4的消息"+msg);
    9. Thread.sleep(200);
    10. }

    提供者:(写在producer的测试类中)

    注:fanout刻画类型是没有路由的,所这里就是null,我们本身也没有设置路由

    1. @Test
    2. void testFanoutQueue(){
    3. //前面创建的是maxxub.fanout2交换机所以这里也绑定maxxub.fanout2
    4. String exchangeName="maxxub.fanout2"; //交换机名称
    5. String msg="hello , amqp"; //消息
    6. //参数:交换机、路由、消息
    7. rabbitTemplate.convertAndSend(exchangeName,null,msg);
    8. }

    注:先启动消费者启动类 ,再启动提供者测试方法(效果如下)

    3.4.2、direct类型

    就拿这个图来分析一下:这里的P(提供者)连接X(交换机)设置不同Routing(路由)给两个队列,相当于给两个队列加上了特殊的标签,black路由(标签)和green路由(标签)给了Q2队列,这个队列就负责接收black,green标签的消息其他的消息不归我管,orange路由(标签)给Q1队列,这个队列就负责接收orange标签的消息

    创建Direct交换机和队列并设置路由,进行绑定 (不用的时候,把@Configuration注解注释掉)

    1. @Configuration
    2. public class DirectConfiguration {
    3. @Bean //创建DirectExchange交换机 以下是两种方法 进行创建
    4. public DirectExchange directExchange(){
    5. //参数就是 : 交换机的名称
    6. // ExchangeBuilder.fanoutExchange("maxxub.direct").build(); //使用Builder进行创建交换机
    7. return new DirectExchange("maxxub.direct");
    8. }
    9. @Bean //创建 队列
    10. public Queue directQueue3(){
    11. //填写参数就是 : 队列名称
    12. // QueueBuilder.durable("direct.queue1").build();
    13. return new Queue("direct.queue1");
    14. }
    15. @Bean //进行交换机和队列的绑定 同时添加上路由
    16. public Binding directBinding1(Queue directQueue3,DirectExchange directExchange){
    17. //解释 : BindingBuilder bind(绑定)directQueue3队列 to(到)directExchange with(带上路由)red
    18. return BindingBuilder.bind(directQueue3).to(directExchange).with("red");
    19. }
    20. @Bean
    21. public Binding directBinding2(Queue directQueue3,DirectExchange directExchange){
    22. //解释 : BindingBuilder bind(绑定)directQueue3队列 to(到)directExchange with(带上路由)blue
    23. return BindingBuilder.bind(directQueue3).to(directExchange).with("blue");
    24. }
    25. @Bean
    26. public Queue directQueue4(){
    27. // QueueBuilder.durable("direct.queue2").build();
    28. return new Queue("direct.queue2");
    29. }
    30. @Bean
    31. public Binding directBinding4(){
    32. //可以直接调用方法 于上面是雷同的,交换机一个就够了,这里再给交换机绑定队列和路由就行了
    33. return BindingBuilder.bind(directQueue4()).to(directExchange()).with("red");
    34. }
    35. @Bean
    36. public Binding directBinding3(){
    37. //可以直接调用方法
    38. return BindingBuilder.bind(directQueue4()).to(directExchange()).with("yellow");
    39. }
    40. }

    注:这种方法比较麻烦,如果需要绑定多个路由或者队列还是很不方便,这里只作为演示,下面开始使用注解进行创建交换机和队列进行绑定并且带上路由

    消费者:(MqListener类中写就行) 还是同样的监听

    参数解释:

    bindings就是绑定参数;

    @QueueBinding注解中的参数value就是需要绑定的队列(name队列名称,durable就是消息持久化)

    @QueueBinding注解中的参数exchange就是被绑定的交换机(name交换机名称,type交换机类型)

    参数key就是路由:{可以直接写多个路由参数}

    注:好处就是能节省很多代码,很简单,并且很很清晰

    1. @RabbitListener(bindings = @QueueBinding(
    2. value =@Queue(name = "direct.queue1",durable = "true"),
    3. exchange = @Exchange(name = "maxxub.direct",type = ExchangeTypes.DIRECT),
    4. key = {"red","blue"}))
    5. public void listenDirectQueue1(String msg) throws InterruptedException {
    6. System.out.println("消费者1收到direct.queue1的消息"+msg);
    7. Thread.sleep(200);
    8. }
    9. @RabbitListener(bindings = @QueueBinding(
    10. value =@Queue(name = "direct.queue2",durable = "true"),
    11. exchange = @Exchange(name = "maxxub.direct",type = ExchangeTypes.DIRECT),
    12. key = {"red","yellow"}))
    13. public void listenDirectQueue2(String msg) throws InterruptedException {
    14. System.out.println("消费者2收到direct.queue2的消息"+msg);
    15. Thread.sleep(200);
    16. }

    提供者:(写在producer的测试类中)参数如果真的不知道怎么办了:ctrl+p就会有提示

    1. @Test
    2. void testDirectQueue(){
    3. String exchangeName="maxxub.direct";
    4. String msg="红色警报";
    5. //参数:交换机,路由,消息
    6. rabbitTemplate.convertAndSend(exchangeName,"red",msg);
    7. }

    注:先启动消费者启动类,在启动提供者的测试方法(因为两个队列都有路由“red”,所以两个对哦都会收到消息)

    3.4.3、topic类型

    路由使用通配符的形式进行

    *:代表一个字符

    #:代表0个或者多个字符

    英文大概意思就是这里路由是通过带有通配符进行的,和direct类型不同的就是通配符能更灵活的设置路由(类似标签)

    消费者:(MqListener类中写就行) 还是同样的监听 这里没有啥改变基本就是key改了一点点

    1. @RabbitListener(bindings = @QueueBinding(
    2. value =@Queue(name = "topic.queue1",durable = "true"),
    3. exchange = @Exchange(name = "maxxub.topic",type = ExchangeTypes.TOPIC),
    4. key = {"#.new"}))
    5. public void listenTopicQueue1(String msg) throws InterruptedException {
    6. System.out.println("消费者1收到topic.queue1的消息");
    7. Thread.sleep(200);
    8. }
    9. @RabbitListener(bindings = @QueueBinding(
    10. value =@Queue(name = "topic.queue2",durable = "true"),
    11. exchange = @Exchange(name = "maxxub.topic",type = ExchangeTypes.TOPIC),
    12. key = {"china.#"}))
    13. public void listenTopicQueue2(String msg) throws InterruptedException {
    14. System.out.println("消费者2收到topic.queue2的消息"+msg);
    15. Thread.sleep(200);
    16. }

    提供者:(写在producer的测试类中)参数如果真的不知道怎么办了:ctrl+p就会有提示

    1. @Test
    2. void testTopicQueue(){
    3. String exchangeName="maxxub.topic";
    4. String msg="中国繁荣";
    5. rabbitTemplate.convertAndSend(exchangeName,"china.strong",msg);
    6. }

    注:先启动消费者启动类,在启动提供者的测试方法(因为两个队列都有路由“red”,所以两个对哦都会收到消息) 

    解释:这里china.#的路由才能满足条件,所以只有topic.queue2队列接收到了消息

    3.5、死信队列

    死信队列,英文缩写: DLX 。 Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX

    主要就是为了处理消息丢失的现象,如果消息长时间处理不带,或者确认机制设置的问题导致消息丢失了怎么办,不能说丢了就丢了吧,死信队列就是一个比较好的办法去解决

    就是给普通的消息队列设置一个消息过期时间,在一定时间内没有被处理掉的,那就把它放到私信队列中

    思路:我们给simple.queue队列发消息设置过期时间10000ms,但是这个队列没有提供者,同时给他绑定一个死信交换机和死信队列,消息在10000ms以后会流入死信队列中

    先做前置操作:

    上面是一步一步的过程,下面是基本操作流程

    消费者:MqListener类中写就行)这里只写死信队列的就可以了,simple队列是故意不让他拿到消息的,为了能够超过过期时间

    1. @RabbitListener(queues = "dlx.queue")
    2. public void listenDlxQueue2(String msg) throws InterruptedException {
    3. log.info("消费者收到dlx.queue的消息"+msg);
    4. Thread.sleep(200);
    5. }

    提供者:(写在producer的测试类中)参数如果真的不知道怎么办了:ctrl+p就会有提示

    参数没有很多的变化,消息队列、路由、消息、一个消息处理器的对象进行设置消息过期时间

    1. //延迟队列
    2. @Test
    3. void testSendTTLMessage(){
    4. rabbitTemplate.convertAndSend("simple.direct", "hi", "hello", new MessagePostProcessor() {
    5. @Override
    6. public Message postProcessMessage(Message message) throws AmqpException {
    7. //设置消息拿到消息参数,进行设置过期时间
    8. message.getMessageProperties().setExpiration("10000");
    9. return message;
    10. }
    11. });
    12. //下面打印一下消息是执行成功的
    13. log.info("消息发送成功");
    14. }

    注:先启动消费者启动类,在启动提供者的测试方法 (这里不好具体展示)

    3.6、Lazy Queue

    Rabbitmq在3.6.0版本开始就添加了Lazy Queue概念,惰性队列

    惰性队列的特征如下:

    接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认是2048条)

    消费者要消费消息时才会从磁盘中读取并加载到内存

    支持数百万条的消息存储

    但是现在会直接进行默认Lazy Queue模式

    官方的大体意思就是:3.12版本开始这个参数就可以进行忽略了,开始默认为Lazy Queue队列,但是仍然有用户是哟弄过CQv1队列,也就是经典队列,这里是可以通过参数配置进行调整为CQv2,配置在rabbitmq的配置文件中

    classic_queue.default_version=2

    我这里使用的是rabbitmq3.8版本,所以这里展示还是通过代码方式给友友们展示一下

    在管理界面添加一个lazy.queue队列

    按照以上步骤会有这么个提示:(将鼠标放上去)

    提供者:(写在producer的测试类中)参数如果真的不知道怎么办了:ctrl+p就会有提示

    1. @Test
    2. void testPageOut(){
    3. Message msg = MessageBuilder
    4. .withBody("hello".getBytes(StandardCharsets.UTF_8)) //带有消息参数消息只能以byte类型发送所以这里自动转化一下
    5. .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build(); //设置一下队列的模式,这里NON_PERSISTENT表示是数据不是持久的,重启mq消息会丢失
    6. //尝试发送10万条消息
    7. for(int i=0;i<100000;i++){
    8. rabbitTemplate.convertAndSend("lazy.queue",msg);
    9. }
    10. }

    会有以下这两个点需要友友们去看 ,消息处理时间很快,并且也大量数据情况下消息不会断

    当然了,也不仅仅就是了看消息处理的有多快的,也是为了实用的,前面说去配置配置文件也是可以简经典队列调节为lazy队列的,如果想要仅仅只是个别创建队列能够是lazy队列Java代码也可以是可以实现的(以下就是Lazy队列的使用代码创建的方法,同时可以直接接收消息,效果是一样的)

    1. @RabbitListener(queuesToDeclare = @Queue(
    2. name = "lazy.queue", //设置队列名称
    3. durable = "true", //设置持久化
    4. arguments = @Argument(name = "x-queue.mode",value = "lazy") //设置队列模式
    5. ))
    6. public void LazyQueue(String msg){
    7. log.info("接收到 lazy.queue对的消息 :"+msg);
    8. }

    4、确认机制

    RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制,开启确认机制会消耗一定资源也就会降低消息传输的速度,返回确认消息给生产者,返回的结果有这么几种

    消息投递到了MQ,但是路由失败,此时会通过PublisherReturn返回路由异常原因,然后返回Ack

    临时消息,入队成功就可以返回Ack

    持久消息,完成持久化就返回Ack

    其他情况也就是失败情况返回NACK,投递失败

    上面的不要记住那么多,简单分为成功就是Ack不成功就是Nack

    直接在yml配置文件中配置的即可:有以下三种模式

    none:关闭confirm机制(默认机制)

    simple:同步阻塞等待MQ的回执消息

    correlated:MQ异步回调方式返回回执消息

    下面直接上步骤:

    每个RabbitTemplate只能配置一个ReturnCallback,在项目创建之前就需要配置好

    这个配置是给提供者的,是提供者消费发送的成功与失败进行返回消息

    提供者:这里只是配置一个消息回调

    1. @Slf4j
    2. @Configuration
    3. public class MqConfirmConfig implements ApplicationContextAware {
    4. @Override
    5. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    6. RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
    7. //配置回调 这里配置confirm是无效的,idea会给你提示confirm的方法,但是效果不是,这里需要的是回调
    8. rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    9. @Override
    10. public void returnedMessage(ReturnedMessage returnedMessage) {
    11. log.debug("收到消息的return callback",
    12. returnedMessage.getExchange(),returnedMessage.getRoutingKey(),
    13. returnedMessage.getMessage(),returnedMessage.getReplyCode(),
    14. returnedMessage.getReplyText());
    15. }
    16. });
    17. }
    18. }

    提供者:下面开始发送消息进行确认

    下面我们根据需要的参数进行写这个id代码;

     

    1. @Test
    2. void testConfirmCallback() throws InterruptedException {
    3. //创建一个CorrelationData 对象, 我们只取id 即可 调用一个带参的构造函数
    4. CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    5. /*
    6. * 添加 CorrelationData 对象中提供了一个异步处理方法也就是多线程可以看见的方法,getFuture()
    7. * 这里给添加一个回调方法 参数是一个对象,里面内部方法需要重写里面的 失败和成功的方法 ,
    8. * 但是这里失败是spring操作的失败不是Rabbitmq的失败所以不会返回Nack
    9. * */
    10. cd.getFuture().addCallback(new ListenableFutureCallback() {
    11. @Override
    12. public void onFailure(Throwable ex) {
    13. log.debug("消息回调失败",ex);
    14. }
    15. @Override
    16. public void onSuccess(CorrelationData.Confirm result) {
    17. log.debug("收到confirm callback回执");
    18. /*
    19. * 这里才是排定rabbitmq的消息是否成功 判断一下结果是否是有ack
    20. * 如果是的话 返回一个我们自己打印的消息
    21. * */
    22. if(result.isAck()){
    23. log.debug("消息发送成功 收到ack");
    24. }else{
    25. log.error("消息发送失败 收到nack"+result.getReason());
    26. }
    27. }
    28. });
    29. // CorrelationData 对象创建好了 ,其实这里也可以说是id创建好了,添加进发送的消息中
    30. rabbitTemplate.convertAndSend("maxxub.direct","red2","hello",cd);
    31. Thread.sleep(2000);
    32. }

    注:这样的情况一般使用于检查问题,因为需要一定的开销,会大大降低rabbitmq处理消息的速度

    5、重试机制

    针对这部分参数:(前面已经给友友们提供了)

    1. listener:
    2. simple:
    3. prefetch: 1
    4. acknowledge-mode: auto #模式:自动 进行一定的重试次数
    5. retry:
    6. enabled: true # 超时重试
    7. initial-interval: 1000ms
    8. multiplier: 2 #连接失败的重试
    9. max-attempts: 3 #最大重试次数的
    10. stateless: true #针对事务,默认为falsetrue针对上下文有定义保存,是一个状态

    前面都是直接给友友们提供了配置参数,这里稍微具体说一下 MessageRecoverer就是消息恢复

    继承该接口类有三个 :

    (1)RejectAndDontRequeueRecoverer :重试耗尽后消息会被直接扔掉

    (2)ImmediateRequeueMessageRecoverer:重试耗尽后,会重新进入队列(消耗太大了)

    (3)RepublishMessageRecoverer:重试耗尽后,会去进入指定队列先存着

    最优的当然是第三种了,对消息做到了保护作用,同时也不会无限进入队列4

    下面就来实现一下

    消费者:

    1. @Slf4j
    2. @Configuration
    3. //@ConditionalOnProperty(prefix = "spring",name = "rabbitmq.listener.simple.retry.enabled",havingValue = "true")
    4. public class ErrorConfiguration {
    5. /*创建一个交换机 队列 绑定一下并设置路由*/
    6. @Bean
    7. public DirectExchange directExchange(){
    8. return new DirectExchange("error.direct");
    9. }
    10. @Bean
    11. public Queue errorQueue(){
    12. return new Queue("error.queue");
    13. }
    14. @Bean
    15. public Binding errorBinding(DirectExchange directExchange,Queue errorQueue){
    16. return BindingBuilder.bind(errorQueue).to(directExchange).with("error");
    17. }
    18. //这里 使用 MessageRecoverer 进行接收
    19. @Bean
    20. public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
    21. log.debug("MessageRecoverer消息");
    22. //调用 RepublishMessageRecoverer 返回机制 前面说了只能绑定一个rabbitTemplate ,第一个参数填写即可
    23. //第二个参数 交换机 刚刚创建的指定交换机 和 指定交换机的路由
    24. return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    25. }
    26. }

    注:@ConditionalOnProperty注解指定配置参数没有弄好,但是这里是可以使用的,如果有佬们能够很好的使用,请评论下怎么弄,很感谢

  • 相关阅读:
    数据库备份
    Linux入门教程||Shell echo命令||Shell printf 命令
    Python对象复制竟然有这么多种方式,赶紧学起来!
    Spring的开幕式——Spring概述与设计思想
    如何在 Mac 上卸载 Java?
    Linux中每当执行‘mount’命令(或其他命令)时,自动激活执行脚本:输入密码,才可以执行mount
    多目标进化算法——NSGA-II(python实现)
    matlab 矩阵逆运算的条件数
    y80.第四章 Prometheus大厂监控体系及实战 -- kube-state-metrics组件介绍和监控扩展(十一)
    2022.8.25-----leetcode.658
  • 原文地址:https://blog.csdn.net/c_study__c/article/details/133964806