• Springboot整合RabittMQ


    01.MQ简介和使用场景

    MQ(Message Quene):消息队列,是一种应用程序对应用程序的通信方法

    • 队列特点:先进先出

    使用场景:

    • 流量削峰
    • 日志处理
    • 应用解耦

    02.RabbitMQ简介

    RabittMQ采用Erlang语言开发,实现了高级消息队列协议(AMQP Advance Message Queuing Protocol)的开源消息中间件。

    优点:

    • 性能很好,低延时
    • 吞吐量万级,功能完备
    • 有良好的管理界面,管理工具‘
    • 社区活跃

    缺点

    • 吞吐量小

    AMQP协议:高级消息队列协议
    在这里插入图片描述

    • Publisher:生产消息,发送到消息队列服务器
    • Broker:消息队列服务器,一个消息队列服务器里面可以有多个Virtual Host虚拟主机:隔离不同用户的exchange、queue),每个Virtual Host里面可以有多个Exchange(交换机),每个交换机里面可以有多个消息队列。
    • Consumer:消息的消费者,用来消费队列里面的消息

    03.Windows下搭建单节点RabbitMQ服务

    由于RabbitMQ是使用Erlang语言开发的,我们需要先安装Erlang,不同版本的RabbitMQ对应的Erlang版本也不一样,查看版本的对应链接如下:https://www.rabbitmq.com/which-erlang.html

    RabittMQ下载地址:https://www.rabbitmq.com/download.html
    在这里插入图片描述
    Erlang的下载地址:https://www.erlang.org/downloads
    在这里插入图片描述
    Erlang的安装:

    • 使用下载好的Erlang安装器,来安装Erlang环境,除了选择一下安装路径之外,一路next即可,路径不能出现中文,安装好之后,将bin目录配置到环境变量中。
    • 验证方式,控制台输入:erl 打印出版本号即可

    RabittMQ安装:

    • 安装方式和安装Erlang环境一样,除了选择路径之外,一路next即可,安装好之后,wins图标—>左键,会有一个RabittMQ Command pPrompt。
      在这里插入图片描述
    • 打开RabittMQ Command pPrompt,在里面输入rabbitmq-plugins.bat enable rabbitmq_management激活RabbitMQ的UI管理界面。
    • 激活UI界面之后,我们需要重启一下RabittMQ服务:
      • 先关闭,执行命令:net stop RabbitMQ
      • 在启动,执行命令:net start RabbitMQ
    • 浏览器访问UI管理界面:localhost:15672
      在这里插入图片描述

    04.RabbitMQ管理界面使用

    主页面:
    在这里插入图片描述
    Exchanges页面:创建Virtual Host之后,会给Virtual Host默认创建七个交换机,当然我们也可以手动创建交换机。

    在这里插入图片描述

    Admin页面:

    • 用户页面:可以添加用户,然后给用户分配权限

    在这里插入图片描述

    • Virtual Hosts页面:可以添加Virtual Host,我们需要先创建一个用户,然后再创建一个Virtual Host,然后点击创建的用户,给用户分配Virtual Host,这样该用户就可以访问这个Virtual Host。

    在这里插入图片描述

    • 创建Virtual Host之后,会给Virtual Host默认创建七个交换机。

    在这里插入图片描述

    05.RabbitMQ消息模型

    官网Demo地址:https://www.rabbitmq.com/getstarted.html
    在这里插入图片描述

    06.HelloWorld

    官网第一种为例,https://www.rabbitmq.com/tutorials/tutorial-one-java.html,下面来创建一个HelloWorld使用案例:

    • 导包
    <dependency>
         <groupId>com.rabbitmqgroupId>
         <artifactId>amqp-clientartifactId>
         <version>5.7.1version>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • MQsend
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    /*
    *  如果发送消息的时候,不给队列绑定交换机,则默认将消息发送到default交换机之中
    * */
    public class MQSend {
        // 定义队列的名字
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] arg) throws Exception {
            // 连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // MQ服务器的IP地址
            factory.setHost("localhost");
            // 端口号
            factory.setPort(5672);
            // MQ服务器里面的虚拟主机
            factory.setVirtualHost("Vhost01");
            // 虚拟主机隶属用户的账号
            factory.setUsername("admin");
            // 虚拟主机隶属用户的密码
            factory.setPassword("admin");
            //通过工厂获取链接
            Connection connection = factory.newConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
            //创建并声明一个队列,如果队列存在则使用这个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //消息内容
            String message = "Hello World!";
            //发送消息:第一个参数可以设置交换机的名字,不设定发送到default交换机之中
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            channel.close();
            connection.close();
            System.out.println(" Send '" + message + "'");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    在这里插入图片描述
    在这里插入图片描述

    • MQrecv
    import com.rabbitmq.client.*;
    import java.io.IOException;
    /*
     * 消费者
     * */
    public class MQRevc {
        //消费的队列
        private final static String QUEUE_NAME = "hello";
        public static void main(String[] arg) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("Vhost01");
            factory.setUsername("admin");
            factory.setPassword("admin");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //创建并声明一个队列,如果队列存在则使用这个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义一个消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String mess = new String(body);
                    System.out.println("接收到消息" + mess);
                }
            };
            // 创建一个消费者监听器
            // 确认我们是否收到消息,true是自动确认,false是手动确认
            String res = channel.basicConsume(QUEUE_NAME, true, consumer);
            System.out.println("消费成功:" + res);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在这里插入图片描述

    小结:

    • 基本消息队列的消息发送流程:

      建立connection
      
      创建channel
      
      利用channel声明队列
      
      利用channel向队列发送消息
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
    • 基本消息队列的消息接收流程:

      建立connection
      
      创建channel
      
      利用channel声明队列
      
      定义consumer的消费行为handleDelivery()
      
      利用channel将消费者与队列绑定
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

    07.Springboot整合RabittMQ

    • 导包
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 交换机初始化
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    @Bean(name = FANOUT_EXCHANGE)
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 队列初始化
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    @Bean(name = FANOUT_QUEUE1)
    public Queue fanoutQueue1() {
        return new Queue(FANOUT_QUEUE1, true, false, false);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 交换机队列绑定
    @Bean
    public Binding bindingSimpleQueue1(@Qualifier(FANOUT_QUEUE1) Queue fanoutQueue1,
                                       @Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 发送者
    @Component
    public class MQSender{
        // 交换机
        public static final String FANOUT_EXCHANGE = "fanout.exchange";
        // 队列
        public static final String FANOUT_QUEUE1 = "fanout.queue1";
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //发送消息,不需要实现任何接口,供外部调用。
        public void send(String message) {
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(FANOUT_EXCHANGE, FANOUT_QUEUE1, message, correlationId);
            System.out.println("消息发送成功 : " + message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 接收者
    @Component
    public class MQReceiver {
        @RabbitListener(queues = MQConfig.FANOUT_QUEUE1)
        public void receiver(String msg, Channel channel) {
            // 只包含发送的消息
            System.out.println("receiver消费成功:" + msg);
            // channel 通道信息
            // message 附加的参数信息
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 配置信息
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    spring.rabbitmq.virtual-host=Vhost01
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    放弃60万年薪考公!程序员完败公务员?
    excel表格xlsx解密在线网站,excel表格xlsx权限密码多少?
    CommonsCollection6反序列化链学习
    Java 数据结构总结
    VMware vSphere 中的 DRS(分布式资源调度)、HA(高可用性)和Fault Tolerance(FT,容错)区别
    shell脚本实现Hbase服务的监控报警和自动拉起
    MySQL数据库基础知识要点总结
    绘制一条透明背景的trace,并保存;
    经典面试题 | 讲一讲 JVM 的组成
    STM32Cube工程转为Keil工程的方法介绍
  • 原文地址:https://blog.csdn.net/weixin_45583303/article/details/125730495