• 微服务08-认识和使用SpringAMQP


    1.AMQP的认识

    1.1 介绍

    AMQP是什么?看完你就知道了_hello_读书就是赚钱的博客-CSDN博客_amqp

    在这里插入图片描述
    好处:
    什么connection消息队列的连接、channel:服务发送接收消息的通道、Queue:消息队列——>这些你都不需要自己编写

    工作过程:
    发布者(Publisher)发布消息(Message),经由交换机(Exchange)。

    交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。

    最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

    深入理解:
    1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。

    2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

    3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除

    4、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

    1.2 实战:消息发送与消息接收

    消息发送
    在这里插入图片描述

    @RunWith(SpringRunner.class)
    @EnableRabbit
    @SpringBootTest
    public class SpringAmqpTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
     
        @Test
        public void testSendMessageSimpleQueue(){
            String queueName="simple.queue";
            String messgae="Hello,SpringAMQP";
            rabbitTemplate.convertAndSend(queueName,messgae);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    可以发现队列simple.queue中 多了一条消息
    在这里插入图片描述

    AMOP如何发送消息?

    引入amqp的starter依赖,然后我们配置RabbitMQ的地址,最后利用RabbitTemplate中的convertAndSend方法发送消息

    在这里插入图片描述

    消息接收:
    在这里插入图片描述

    1、首先进行yaml配置,RabbitMQ连接信息

    2、新建一个组件,里面编写消费逻辑(利用@RabbitListener监听队列,只要队列一有消息就进行接收)

     
    @Component
    public class SpringRabbitListener {
     
        /**
         * @RabbitListener监听队列
         * @param msg
         */
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueue(String msg){
            System.out.println("消费者接收到的simple.queue中消息为:"+msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在这里插入图片描述
    在这里插入图片描述

    注意:消息一旦被消费就会从队列删除,RabbitMQ没有消息回溯功能

    2.WorkQueue

    场景:
    采用多个工作队列处理消息,避免消息堆积;

    在这里插入图片描述

    2.1 实战:多个消费者绑定一个队列

    在这里插入图片描述

    1.首先配置类中把队列支棱起来,Publisher发布消息给消息队列simple.queue

    @RunWith(SpringRunner.class)
    @EnableRabbit
    @SpringBootTest
    public class SpringAmqpTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
     
        @Test
        public void testSendMessageWorkQueue() throws InterruptedException {
            String queueName="simple.queue";
            String messgae="华为手机,遥遥领先--";
            for (int i = 0; i < 50; i++) {
                rabbitTemplate.convertAndSend(queueName,messgae+i);
                Thread.sleep(20);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.consumer服务中定义两个消费者,利用@RabbitListener监听队列

    @Component
    public class SpringRabbitListener {
        /**
         * 我们的思想是:两个消费者,一个1s能够消费50条,一个消费1s5条,一共50条信息,能者多劳
         * 现实:默认是平均分配->造成处理消息时间过长(因为第二个消费者处理消息很慢)
         * @param msg
         * @throws InterruptedException
         */
        @RabbitListener(queues = "simple.queue")
        public void listenWorkQueue(String msg) throws InterruptedException {
            System.out.println("消费者1......接收到消息为:"+msg+"当前时间:"+ LocalTime.now());
            //每s50个消息
            Thread.sleep(20);
        }
     
        @RabbitListener(queues = "simple.queue")
        public void listenWorkQueue2(String msg) throws InterruptedException {
            System.err.println("消费者2.......接收到消息为:"+msg+"当前时间:"+ LocalTime.now());
            Thread.sleep(200);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述
    我们可以采用prefetch设置消费者预取的消息数量,这样就不会出现平均分配消息的现象了——>而是根据每个消费者的能力来处理消息

    在这里插入图片描述

    3.Fanout Exchange

    前言:

    发布(Publish)和订阅(Subscribe)

    因为一个消息只可能被一个消费者消费,消费完就会删除该消费,所以我们需要利用发布和订阅来进行处理,这样就可以将同一个消息发送给多个消费者——>
    利用exchange交换机将消息发送给多个队列,然后队列是可以将消息进行储存的,发给多个消费者

    在这里插入图片描述
    常见的exchange类型:Fanout(广播)、Direct(路由)、Topic(话题)

    注意:exchange负责消息路由,而不是储存,路由失败则会造成消息丢失(也就是说路由到哪个消息队列)。

    3.1 介绍

    exchange: 只能作为消息的转发,记住不能作为消息的缓存,如果路由失败消息就丢了

    交换机exchange:通过FanoutExchange返回交换机

    消息队列Queue:通过Queue返回消息队列

    将消息队列与交换机进行绑定Bingding:可以通过Binding中的bind方法进行绑定

    3.2 实战

    在这里插入图片描述

    1:
    在这里插入图片描述
    consumer服务中声明交换机和队列,将队列与交换机进行绑定(利用Binding中bind方法)

    @Configuration
    public class FanoutConfig {
        //itcast.fanout交换机
        @Bean
        public FanoutExchange fanoutExchange(){
           return new FanoutExchange("itcast.fanout");
        }
     
        //fanout.queue1任务队列
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanout.queue1");
        }
     
        //消息队列2
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanout.queue2");
        }
     
        //将消息队列与交换机进行绑定:类型和名称要保持一致,不然无法注入
        @Bean
        public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
            return BindingBuilder
                    .bind(fanoutQueue1)
                    .to(fanoutExchange);
        }
     
        //第二个消息队列与交换机进行绑定
        @Bean
        public Binding fanouting2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
            return BindingBuilder
                    .bind(fanoutQueue2)
                    .to(fanoutExchange);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    2.在这里插入图片描述

    @Component
    public class SpringRabbitListener {
        /**
         * 以下是利用交换机通知信息给处理消息的消费者
         */
        @RabbitListener(queues = "fanout.queue1")
        public void listenFanoutQueue1(String msg){
            System.out.println("消费者接收到的fanout.queue1的消息为:"+msg);
        }
     
        @RabbitListener(queues = "fanout.queue2")
        public void listenFanoutQueue2(String msg){
            System.out.println("消费者收到fanout.queue2的消息为:"+msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.在这里插入图片描述

    @RunWith(SpringRunner.class)
    @EnableRabbit
    @SpringBootTest
    public class SpringAmqpTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
     
        /**
         * 发送信息给交换机exchange
         */
        @Test
        public void testSendFanoutExchange(){
            //1.交换机名称
            String exchangeName="itcast.fanout";
            //2.消息
            String message="hello,every one!";
            //3.发送消息
            rabbitTemplate.convertAndSend(exchangeName,"",message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述

    3.3 总结

    在这里插入图片描述

    4.Direct Exchang

    DirectExchang:可以根据规则路由到指定的消息队列

    4.1实战

    在这里插入图片描述
    1.在consumer服务中声明Exchange交换机与不同key之消息队列的绑定,利用@RabbitListener->还监听了一波消息队列

    对比与之前的Fanout,它们的队列以及交换机是在一个配置类中定义并绑定的,利用了Binding,并注入容器

    消费者代码
    在这里插入图片描述

    提供者代码

    @Test
        public void testSendDirectExchange() {
            //交换机名称
            String exchangeName = "denghao.direct";
            //消息
            String message = "hello, blue!";
            //发送消息
            rabbitTemplate.convertAndSend(exchangeName,"blue",message);
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    个人编写与老师资料略有不同 敬请谅解
    在这里插入图片描述

    Direct交换机与Fanout交换机之间的差异+@RabbitListener注解中声明队列与交换机的常见注解
    在这里插入图片描述

    5.TopicExchange在这里插入图片描述

    为什么要用Topic交换机,因为以后有的时候,比如地区天气新闻,有“长沙的,成都的,四川的,重庆的。。。。。。 如果使用DirectExchange的话,那么会要定义很多很多的key 而用TopicExchange的话 可以直接用 #.news 表示接收所有地方的天气新闻”

    5.1 实战

    介绍:

    两个队列:一个是中国的新闻,一个所有的新闻消息;

    两个消费者分别监听这两个队列

    Publisher将消息发送给交换机Exchange,然后交换机根据key(+通配符)->决定将消息路由到哪个队列中

    区别:

    与DirectExchange区别就是,DirectExchange没有通配符,TopicExchange有通配符:xxx.xxx

    最明显的地方就是:队列与交换机进行绑定时,key不一样

    在这里插入图片描述

    消费者的监听器

      @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "topic.queue1"),
                exchange = @Exchange(name = "denghao.topic",type = ExchangeTypes.TOPIC),
                key = "china.#"
        ))
        public void listenTopicQueue1(String msg){
            System.out.println("消费者接收到topic.queue1的消息:【"+ msg +"】");
        }
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "topic.queue2"),
                exchange = @Exchange(name = "denghao.topic",type = ExchangeTypes.TOPIC),
                key = "#.news"
        ))
        public void listenTopicQueue2(String msg){
            System.out.println("消费者接收到topic.queue2的消息:【"+ msg +"】");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    服务者测试方,发布消息

    @Test
        public void testSendTopicExchange() {
            //交换机名称
            String exchangeName = "denghao.topic";
            //消息
            String message1 = "红色天气预报警告,马上长沙迎来大升温,明天提升六度,请注意,不要感冒,不要鼻炎!!!";
            String message2 = "重磅新闻来袭!!!";
            //发送消息
            rabbitTemplate.convertAndSend(exchangeName,"china.news",message1);
            rabbitTemplate.convertAndSend(exchangeName,"jp.news",message2);
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    结果
    在这里插入图片描述
    在这里插入图片描述

    6. 消息转换器

    场景: 因为我们Publisher服务者发送消息到消息队列(将消息序列化为二进制),消息是乱码的——>因为消息类型content-type是java序列化变来的类型:缺点:消息体大、传输速度慢、安全问题、占内存;

    在这里插入图片描述
    处理:

    1.我们定义一个bean,利用boot的自动配置思想,将默认的替换

    @Bean
        public Jackson2JsonMessageConverter jsonMessageConverter() {
          return new Jackson2JsonMessageConverter();
        }
    
    • 1
    • 2
    • 3
    • 4

    2.然后导入依赖,Jacksonxxxx

      
            <dependency>
                <groupId>com.fasterxml.jackson.coregroupId>
                <artifactId>jackson-databindartifactId>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.Publisher发送消息给到消息队列

     //Publisher发送消息给到队列object.queue
        @Test
        public void testSendObjectQueue(){
            HashMap<Object, Object> msg = new HashMap<>();
             msg.put("姓名","柳岩");
             msg.put("年龄",38);
            //发送消息到队列
            rabbitTemplate.convertAndSend("object.queue",msg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    4.消费者接收消息

    @RabbitListener(queues = "object.queue")
        public void listenObjectQueue(Map<String,Object> msg){
            System.out.println("消费者接收到object.queue的消息:【"+ msg +"】");
        }
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

  • 相关阅读:
    基于Java咖啡商品管理系统设计实现(源码+lw+部署文档+讲解等)
    nginx的安装与nginx降权+匹配php
    深度解读财团参与Twitter私有化的投资逻辑
    Web3中文|元宇宙在商业中的最佳应用
    Redis应用场景
    探索Java异常处理的奥秘:源码解析与高级实践
    Matlab:矩阵分解
    宋明的结局揭示了什么
    [sd_scripts]之train
    如何缩减layout电路面积?减少晶体管的数量——以全加器为例【VLSI】
  • 原文地址:https://blog.csdn.net/weixin_67201964/article/details/132855939