• SpringAMQP


    消息队列是实现异步通讯的一种方式,我们将从RabbitMQ为例开始介绍SpringAMQT。

    RabbitMQ

    RabbitMQ是基于Erlang语言开发的开源消息通信中间件。

    安装与部署

    由于RabbitMQ运行需要安装Erlang,为了方便部署,我们采用docker的方式来部署RabbitMq

    首先拉取RabbitMQ的镜像,带有management的Tag的说明该镜像含有Web控制台

    docker pull rabbitmq:3-management
    
    • 1

    执行下面的命令来运行RabbitMQ容器

    docker run \
    -e RABBITMQ_DEFAULT_USER=username \
    -e RABBITMQ_DEFAULT_PASS=password \
    --name mq \
    -p 15672:15672 \
    -p 5672:5672 \
    -d \
    rabbitmq:3-management
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    username和password是进入RabbitMQ控制台时使用的账号密码,15672端口是控制台所占用的端口,5672是MQ服务所占用的端口。

    RabbitMQ结构

    • channel: 操作MQ的工具
    • exchange: 路由消息到队列中
    • queue: 缓存消息
    • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
      在这里插入图片描述

    简单队列模型

    消息发布者

    @SpringBootTest
    class PublisherApplicationTests {
    
        @Test
        void publisher() throws IOException, TimeoutException {
            //建立连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.111.135");
            factory.setPort(5672);
            factory.setUsername("username");
            factory.setPassword("password");
            factory.setVirtualHost("/");
            Connection connection=factory.newConnection();
            //创建通道
            Channel channel=connection.createChannel();
            //创建队列
            String queueName="simple.queue";
            channel.queueDeclare(queueName,false,false,false,null);
            //发布消息
            String message="hello,rabbitmq!";
            for (int i = 0; i < 100; i++) {
                channel.basicPublish("",queueName,null,message.getBytes());
            }
    
            channel.close();
            connection.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    消息消费者

    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //建立连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.111.135");
            factory.setPort(5672);
            factory.setUsername("username");
            factory.setPassword("password");
            factory.setVirtualHost("/");
            Connection connection=factory.newConnection();
            //建立通道
            Channel channel=connection.createChannel();
            //消费端也创建队列是为了防止消费端先启动找不到队列
            String queueName="simple.queue";
            channel.queueDeclare(queueName,false,false,false,null);
    
            //为通道绑定消费者
            channel.basicConsume(queueName,true,new DefaultConsumer(channel){
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息已被处理");
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    其他的模型将在SpringAMPQ中进行介绍

    SpringAMQP

    AMQP (Adavance Message Queuing Protocol 高级消息队列协议)是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

    Spring AMQP 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两个部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

    SpringAMQP的特点:

    • 提供监听容器用于异步处理入站消息
    • 提供RabbitTemplate用于发送和接收消息
    • 提供RabbitAdmin来自动声明队列,交换机和绑定

    依赖引入

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    配置RabbitMQ连接信息

    spring:
      rabbitmq:
        host: 192.168.111.135
        port: 5672
        username: username
        password: password
        virtual-host: /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    基本模型

    简单队列模型

    消息发布端

    @SpringBootTest
    class PublisherApplicationTests {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Test
        void publisher(){
    
            String queueName="simple.queue";
            String message="hello,rabbitmq!";
            rabbitTemplate.convertSendAndReceive(queueName,message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消息消费端

    @Component
    public class RabbitListenerTest {
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String message){
            System.out.println(message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    WorkQueue模型

    模型特点:多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。

    在默认情况下,消费者会进行消息预取,预取的数量为无限大,这会导致性能不同的消费者处理相同数量的消息,可以通过设置prefetch来控制消费者预取的消息数量

    spring:
      rabbitmq:
        listener:
          simple:
            prefetch: 1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    发布订阅模型

    发布订阅模式允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)

    常见的exchange类型有:

    • Fanout: 广播
    • Direct: 路由
    • Topic: 话题

    exchange只负责消息路由而不进行存储,路由失败则消息丢失

    发布订阅模型分三部

    1. 在consumer服务中声明Exchange、Queue、Binding
    2. 在consumer服务中声明多个消费者
    3. 在publisher服务发送消息到Exchange

    FanoutExchange

    声明Exchange、Queue、Binding。除了通过在配置类中通过@Bean注解绑定队列和交换机外还可以在@RabbitListener注解中绑定,第二种绑定方式将在下一部分使用

    @Configuration
    public class FanoutConfig {
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("simple.fanout");
        }
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanout.queue1");
        }
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanout.queue2");
        }
    
        @Bean
        public Binding bingingQueue1(Queue fanoutQueue1,FanoutExchange exchange){
            return BindingBuilder.bind(fanoutQueue1).to(exchange);
        }
        @Bean
        public Binding bingingQueue2(Queue fanoutQueue2,FanoutExchange exchange){
            return BindingBuilder.bind(fanoutQueue2).to(exchange);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    声明消费者与简单模型没什么差别,就不赘述了。

    publisher端将消息发送给交换机有一点小区别

    @SpringBootTest
    class PublisherApplicationTests {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Test
        void publisher(){
    
            String exchangeName="simple.fanout";
            String message="hello,rabbitmq!";
            //三个参数分别为交换机名、routingkey和消息
            rabbitTemplate.convertSendAndReceive(exchangeName,"",message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    DirectExchange

    在上一种交换机中发送消息的第二个参数routingkey我们设置为了空,实际上,每一个Queue与Exchange间都可以设定多个bindingkey,通过routingkey参数,交换机将把消息路由到与其匹配的队列中。

    @Component
    public class RabbitListenerTest {
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "simple.queue1"),
                exchange = @Exchange(name = "simple.direct",type = ExchangeTypes.DIRECT),
                key = {"red","blue"}
        ))
        public void listenSimpleQueueMessage(String message){
            System.out.println(message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    TopicExchange

    TopicExchange与DirectExchange类似,区别是:

    • TopicExchange的routingkey必须是多个单词的列表,并且以英文句号.分隔。
    • bindingkey支持使用通配符,#匹配零或多个单词,*匹配一个单词。

    通过以下代码,你很容易可以发现它的特点

    @SpringBootTest
    class PublisherApplicationTests {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Test
        void publisher(){
    
            String exchangeName="simple.fanout";
            String message="hello,rabbitmq!";
            rabbitTemplate.convertSendAndReceive(exchangeName,"china.weather",message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    @Component
    public class RabbitListenerTest {
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "simple.queue1"),
                exchange = @Exchange(name = "simple.direct",type = ExchangeTypes.Topic),
                key = "china.*"
        ))
        public void listenSimpleQueueMessage(String message){
            System.out.println(message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    消息转换器

    我们在使用RabbitTemplate的时候可以发现,它所发送的消息的类型为Object,这意味着它可以发送所有对象。默认它所使用的是jdk的序列化,这样的效率较低。

    Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,只需定义一个MessageConverter类型的Bean即可修改序列化方式。

    以jackson的序列化为例

    引入jackson的依赖

    <dependency>
        <groupId>com.fasterxml.jackson.dataformatgroupId>
        <artifactId>jackson-dataformat-xmlartifactId>
        <version>2.9.10version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    定义MessageConverter类型的Bean

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    JSON数据处理工具-在线工具箱网站tool.qqmu.com的使用指南
    【光学】基于matlab模拟单缝夫琅禾费衍射
    Android使用Glide类加载服务器中的图片
    精通git,没用过git cherry-pick?
    JavaScript面向对象和原型
    (c语言)二维数组求最大值
    LaTex学习笔记(二):文档的基本结构2【book】,part chapter section subsection toc title
    【CVE-2023-4357】Chrome-XXE 任意文件读取漏洞复现及原理解析
    20款短视频自媒体必备工具,让你的运营效率翻倍
    Python中Tkinter模块的Canvas控件绘制jpg图片到指定区域
  • 原文地址:https://blog.csdn.net/m0_67713667/article/details/133965796