• RabbitMQ笔记(基础篇)


    视频: 

    MQ基础-01.RabbitMQ课程介绍_哔哩哔哩_bilibiliicon-default.png?t=N7T8https://www.bilibili.com/video/BV1mN4y1Z7t9?p=1&vd_source=d0ea58f1127eed138a4ba5421c577eb1

    一、RabbitMQ简介

    1.同步调用

    优势:时效性强,等待结果后才返回

    劣势:拓展性差,性能下降,级联失败问题

    2.异步调用

    异步调用就是基于消息通知的方式,一般含有三个角色

    (1)消息发送者:投递消息的人,原来的调用方

    (2)消息代理:管理、暂存、转发消息,可以理解微信服务器

    (3)消息接受者:接收和处理消息的人,原来服务提供方

    Broker是消息代理

    二.RabbitMQ的安装

    RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:RabbitMQ: One broker to queue them all | RabbitMQicon-default.png?t=N7T8https://www.rabbitmq.com/

    三.RabbitMQ入门

    (1)登录RabbitMQ后添加队列

    (2)交换机先绑定队列名字

    (3)交换机发送消息给队列

     

    队列可以查看接收到的消息

     四、数据隔离

    将Virtual host切换为/

     

     (1)新建一个用户

    (2)为用户创建virtual host

     

    (3)测试不同virtual host直接数据隔离现象,通过修改virtual host即可

    五、Java客户端

    (1)入门示例

    1.引入依赖

    1. <!--AMQP依赖,包含RabbitMQ-->
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-amqp</artifactId>
    5. </dependency>

    2.配置RabbitMQ服务端信息(消费者和生产者都需要配置)

    1. spring:
    2. rabbitmq:
    3. virtual-host: /hamll
    4. port: 5672
    5. host: 192.168.92.136
    6. username: hmall
    7. password: 123

    3.消息发送方

    1. package cn.itcast.mq.helloworld;
    2. import org.junit.jupiter.api.Test;
    3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.boot.test.context.SpringBootTest;
    6. @SpringBootTest
    7. public class SpringAmqpTest {
    8. @Autowired
    9. private RabbitTemplate rabbitTemplate;
    10. @Test
    11. void testSendMessageQueue(){
    12. String queueName = "simple.queue";
    13. String msg = "Hello,amqp";
    14. rabbitTemplate.convertAndSend(queueName,msg);
    15. }
    16. }

    4.消息接收方(不断接收消息)

    1. package cn.itcast.mq.listeners;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Component;
    5. @Slf4j
    6. @Component
    7. public class MqListener {
    8. @RabbitListener(queues = "simple.queue")
    9. public void listenSimpleQueue(String msg){
    10. System.out.println("消费者收到消息:"+msg);
    11. }
    12. }

    (2)消费者消息推送限制

     (3)Fanout交换机

    1.创建hmall.fanout交换机,绑定fanout.queue1和fanout.queue2

    2.消息发送方

    1. @Test
    2. void testSendFanout(){
    3. String exchangeName = "hmall.fanout";
    4. String msg = "Hello,everyone!";
    5. rabbitTemplate.convertAndSend(exchangeName,null,msg);
    6. }

    3.消息接收方

    1. @RabbitListener(queues = "fanout.queue1")
    2. public void listenFanoutQueue(String msg) throws InterruptedException {
    3. System.out.println("消费者1收到消息:"+msg);
    4. }
    5. @RabbitListener(queues = "fanout.queue2")
    6. public void listenFanout2Queue(String msg) throws InterruptedException {
    7. System.err.println("消费者2收到消息:....."+msg);
    8. }

    (4)Direct交换机

    注意:Direct交换机绑定队列时配置Routing Key

    如下图所示:

    绑定queue1要配置blue和red的Routing Key,而绑定queue2要配置yellow和red的Routing Key

    1.创建hmall.direct交换机,绑定direct.queue1和direct.queue2

    2.消息发送方

    1. @Test
    2. void testSendDirect(){
    3. String exchangeName = "hmall.direct";
    4. String msg = "Hello,every Direct!";
    5. rabbitTemplate.convertAndSend(exchangeName,"blue",msg);
    6. }

    3.消息接收方

    1. @RabbitListener(queues = "direct.queue1")
    2. public void listenDirectQueue(String msg) throws InterruptedException {
    3. System.out.println("消费者1收到消息:"+msg);
    4. }
    5. @RabbitListener(queues = "direct.queue2")
    6. public void listenDirect2Queue(String msg) throws InterruptedException {
    7. System.err.println("消费者2收到消息:....."+msg);
    8. }

     (5)Topic交换机

     

    1.Topic交换机绑定队列

    注意:Topic交换机绑定队列时配置Routing Key

    2. 消息发送者

    1. @Test
    2. void testSendTopic(){
    3. String exchangeName = "hmall.topic";
    4. String msg = "Hello,every Topic!";
    5. rabbitTemplate.convertAndSend(exchangeName,"china.hello",msg);
    6. }

    3.消息接收方

    1. @RabbitListener(queues = "topic.queue1")
    2. public void listenTopicQueue(String msg){
    3. System.out.println("消费者1收到消息:"+msg);
    4. }
    5. @RabbitListener(queues = "topic.queue2")
    6. public void listenTopicQueue2(String msg){
    7. System.err.println("消费者2收到消息:....."+msg);
    8. }

    (6)声明队列和交换机方式一

    两种创建交换机、队列、和绑定队列的方式

    1. package cn.itcast.mq.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. @Configuration
    6. public class FanoutConfig {
    7. @Bean
    8. public FanoutExchange fanoutExchange(){
    9. // ExchangeBuilder.fanoutExchange("hmall.fanout").build();
    10. return new FanoutExchange("hmall.fanout1");
    11. }
    12. @Bean
    13. public Queue fanoutQueue3(){
    14. // QueueBuilder.durable("fanout.queue1").build();
    15. return new Queue("fanout.queue3");
    16. }
    17. @Bean
    18. public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){
    19. return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
    20. }
    21. @Bean
    22. public Queue fanoutQueue4(){
    23. return new Queue("fanout.queue4");
    24. }
    25. @Bean
    26. public Binding fanoutBinding4(){
    27. return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
    28. }
    29. }

    (7)声明队列和交换机方式二

     示例代码:

    1. @RabbitListener(bindings = @QueueBinding(
    2. value = @Queue(name = "direct.queue1",durable = "true"),
    3. exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
    4. key = {"red","blue"}
    5. ))
    6. public void listenDirectQueue(String msg) throws InterruptedException {
    7. System.out.println("消费者1收到消息:"+msg);
    8. }
    9. @RabbitListener(bindings = @QueueBinding(
    10. value = @Queue(value = "direct.queue2",durable = "true"),
    11. exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
    12. key = {"red","yellow"}
    13. ))
    14. public void listenDirect2Queue(String msg) throws InterruptedException {
    15. System.err.println("消费者2收到消息:....."+msg);
    16. }

    (8)消息转换器

     1.添加一个队列,名为object.queue

    2.编写单元测试,向队列中发送一条消息,消息的类型为Map

    1. @Test
    2. void testSendObject(){
    3. Map<String, Object> msg = new HashMap<>(2);
    4. msg.put("name","Jack");
    5. msg.put("age",21);
    6. rabbitTemplate.convertAndSend("object.queue",msg);
    7. }

    3.打开控制台,发现发送来的消息是一串乱码,解决方式如下:

    3.1引入依赖:
    1. <!-- Jackson-->
    2. <dependency>
    3. <groupId>com.fasterxml.jackson.dataformat</groupId>
    4. <artifactId>jackson-dataformat-xml</artifactId>
    5. </dependency>
     3.2配置MessageConverter
    1. @Bean
    2. public MessageConverter messageConverter(){
    3. return new Jackson2JsonMessageConverter();
    4. }

  • 相关阅读:
    java实现颜色拾色器并打包成exe文件
    Eureka详解与实践
    湖仓一体电商项目(十):业务实现之编写写入DWD层业务代码
    如何自动获取短信验证码?
    mysql8.0.28下载和安装详细教程,适配win11
    OpenHarmony 服务编译成动态库,而不是进程,问题详解
    shap库源码和代码实现
    Mozilla Firefox侧边栏和垂直标签在131 Nightly版本中开始试用
    (ICRA 2020) Instance Segmentation of LiDAR Point Clouds
    XJAR 混淆加密
  • 原文地址:https://blog.csdn.net/qq_69183322/article/details/137667082