• RabbitMQ队列及交换机的使用


    目录

    一、简单模型

    1、首先控制台创建一个队列

    2、父工程导入依赖 

    3、生产者配置文件

     4、写测试类

    5、消费者配置文件

    6、消费者接收消息

    二、WorkQueues模型

    1、在控制台创建一个新的队列

    2、生产者生产消息

    3、创建两个消费者接收消息

    4、能者多劳充分利用每一个消费者的能力

    三、交换机

    四、Fanout交换机

    1、 声明队列

    2、 创建交换机

    ​编辑 3、 绑定交换机

    4、示例 

    五、Diect交换机

    1、 声明队列

    2、创建交换机

     3、绑定交换机

     4、示例

    六、Topic交换机

    1、创建队列

    2、创建交换机 

    3、绑定队列

    4、示例

    7、、声明队列交换机

    1、SpringAMQP提供的类声明

    2、基于注解声明

    七、消息转换器

    配置JSON转换器


    一、简单模型

    创建一个父工程和两个子工程consumer和publisher

    1、首先控制台创建一个队列

    命名为simple.queue

     

    2、父工程导入依赖 

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.projectlombok</groupId>
    4. <artifactId>lombok</artifactId>
    5. </dependency>
    6. <!--AMQP依赖,包含RabbitMQ-->
    7. <dependency>
    8. <groupId>org.springframework.boot</groupId>
    9. <artifactId>spring-boot-starter-amqp</artifactId>
    10. </dependency>
    11. <!--单元测试-->
    12. <dependency>
    13. <groupId>org.springframework.boot</groupId>
    14. <artifactId>spring-boot-starter-test</artifactId>
    15. </dependency>
    16. </dependencies>

    3、生产者配置文件

    1. spring:
    2. rabbitmq:
    3. host: 192.168.200.129 # 你的虚拟机IP
    4. port: 5672 # 端口
    5. virtual-host: / # 虚拟主机
    6. username: admin # 用户名
    7. password: 123456 # 密码

     4、写测试类

    1. @SpringBootTest
    2. public class SpringAmqpTest {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. @Test
    6. public void testSimpleQueue() {
    7. // 队列名称
    8. String queueName = "simple.queue";
    9. // 消息
    10. String message = "hello, rabbitmq!";
    11. // 发送消息
    12. rabbitTemplate.convertAndSend(queueName, message);
    13. }
    14. }

     查看消息

     

    5、消费者配置文件

    1. spring:
    2. rabbitmq:
    3. host: 192.168.200.129 # 你的虚拟机IP
    4. port: 5672 # 端口
    5. virtual-host: / # 虚拟主机
    6. username: admin # 用户名
    7. password: 123456 # 密码

    6、消费者接收消息

    1. @Component
    2. public class SpringRabbitListener {
    3. // 利用RabbitListener来声明要监听的队列信息
    4. // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    5. // 可以看到方法体中接收的就是消息体的内容
    6. @RabbitListener(queues = "simple.queue")
    7. public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    8. System.out.println("spring 消费者接收到消息:【" + msg + "】");
    9. }
    10. }

     结果

    二、WorkQueues模型

    当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了 

    1、在控制台创建一个新的队列

    命名为work.queue

    2、生产者生产消息

    1. /**
    2. * workQueue
    3. * 向队列中不停发送消息,模拟消息堆积。
    4. */
    5. @Test
    6. public void testWorkQueue() throws InterruptedException {
    7. // 队列名称
    8. String queueName = "work.queue";
    9. // 消息
    10. String message = "hello, message_";
    11. for (int i = 0; i < 50; i++) {
    12. // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
    13. rabbitTemplate.convertAndSend(queueName, message + i);
    14. Thread.sleep(20);
    15. }
    16. }

    3、创建两个消费者接收消息

    1. @RabbitListener(queues = "work.queue")
    2. public void listenWorkQueue1(String msg) throws InterruptedException {
    3. System.out.println("消费者1接收到消息:【" + msg + "】");
    4. }
    5. @RabbitListener(queues = "work.queue")
    6. public void listenWorkQueue2(String msg) throws InterruptedException {
    7. System.err.println("消费者2........接收到消息:【" + msg + "】");
    8. }

    结果 

     

     如果消费者睡眠时间不同

    1. @RabbitListener(queues = "work.queue")
    2. public void listenWorkQueue1(String msg) throws InterruptedException {
    3. System.out.println("消费者1接收到消息:【" + msg + "】");
    4. Thread.sleep(20);
    5. }
    6. @RabbitListener(queues = "work.queue")
    7. public void listenWorkQueue2(String msg) throws InterruptedException {
    8. System.err.println("消费者2........接收到消息:【" + msg + "】");
    9. Thread.sleep(200);
    10. }
    • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
    • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

     

    消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。 

    4、能者多劳充分利用每一个消费者的能力

    在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

    可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢。而最终总的执行耗时也大大提升。 正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

    三、交换机

    在订阅模型中,多了一个exchange角色,而且过程略有变化:

    • Publisher:生产者,不再发送消息到队列中,而是发给交换机
    • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
    • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
    • Consumer:消费者,与以前一样,订阅队列,没有变化

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    交换机的类型有四种:

    • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
    • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
    • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
    • Headers:头匹配,基于MQ的消息头匹配,用的较少。

    四、Fanout交换机

    Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。 在广播模式下,消息发送流程是这样的:

    • 1)  可以有多个队列
    • 2)  每个队列都要绑定到Exchange(交换机)
    • 3)  生产者发送的消息,只能发送到交换机
    • 4)  交换机把消息发送给绑定过的所有队列
    • 5)  订阅队列的消费者都能拿到消息

    1、 声明队列

     创建两个队列fanout.queue1fanout.queue2,绑定到交换机hmall.fanout

    2、 创建交换机

    创建一个名为fanout的交换机,类型是Fanout

     3、 绑定交换机

    4、示例 

     生产者

    1. /**
    2. * Fanout交换机
    3. */
    4. @Test
    5. public void testFanoutExchange() {
    6. // 交换机名称
    7. String exchangeName = "mq.fanout";
    8. // 消息
    9. String message = "hello, everyone!";
    10. rabbitTemplate.convertAndSend(exchangeName, "", message);
    11. }

    消费者

    1. @RabbitListener(queues = "fanout.queue1")
    2. public void listenFanoutQueue1(String msg) {
    3. System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    4. }
    5. @RabbitListener(queues = "fanout.queue2")
    6. public void listenFanoutQueue2(String msg) {
    7. System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    8. }

     

    五、Diect交换机

    Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

    在Direct模型下:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
    • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
    • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息 

    1、 声明队列

    首先在控制台声明两个队列direct.queue1direct.queue2

     

    2、创建交换机

     3、绑定交换机

     一个队列绑定两个RoutingKey

     4、示例

    生产者:RoutingKey为red

    1. /**
    2. * Direct交换机
    3. */
    4. @Test
    5. public void testSendDirectExchange() {
    6. // 交换机名称
    7. String exchangeName = "mq.direct";
    8. // 消息
    9. String message = "hello direct red";
    10. // 发送消息
    11. rabbitTemplate.convertAndSend(exchangeName, "red", message);
    12. }

    消费者

    1. @RabbitListener(queues = "direct.queue1")
    2. public void listenDirectQueue1(String msg) {
    3. System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
    4. }
    5. @RabbitListener(queues = "direct.queue2")
    6. public void listenDirectQueue2(String msg) {
    7. System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
    8. }

     两个队列都收到消息

    RoutingKey为blue

    1. /**
    2. * Direct交换机
    3. */
    4. @Test
    5. public void testSendDirectExchange() {
    6. // 交换机名称
    7. String exchangeName = "mq.direct";
    8. // 消息
    9. String message = "hello direct blue";
    10. // 发送消息
    11. rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    12. }

     只有队列2收到消息

    六、Topic交换机

    Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

    BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

    通配符规则:

    • #:匹配一个或多个词
    • *:匹配不多不少恰好1个词

    举例:

    • item.#:能够匹配item.spu.insert 或者 item.spu
    • item.*:只能匹配item.spu

     

     示例

    publisher发送的消息使用的RoutingKey共有四种:

    • china.news 代表有中国的新闻消息;
    • china.weather 代表中国的天气消息;
    • japan.news 则代表日本新闻
    • japan.weather 代表日本的天气消息;

    解释:

    • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
      • china.news
      • china.weather
    • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
      • china.news
      • japan.news

    1、创建队列

    2、创建交换机 

    3、绑定队列

    4、示例

    生产者:RoutingKey为china.news

    1. /**
    2. * topicExchange
    3. */
    4. @Test
    5. public void testSendTopicExchange() {
    6. // 交换机名称
    7. String exchangeName = "mq.topic";
    8. // 消息
    9. String message = "hello topis china.news";
    10. // 发送消息
    11. rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    12. }

    消费者

    1. @RabbitListener(queues = "topic.queue1")
    2. public void listenTopicQueue1(String msg){
    3. System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
    4. }
    5. @RabbitListener(queues = "topic.queue2")
    6. public void listenTopicQueue2(String msg){
    7. System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
    8. }

     

     RoutingKey为china.people

    1. /**
    2. * topicExchange
    3. */
    4. @Test
    5. public void testSendTopicExchange() {
    6. // 交换机名称
    7. String exchangeName = "mq.topic";
    8. // 消息
    9. String message = "hello topis china.people";
    10. // 发送消息
    11. rabbitTemplate.convertAndSend(exchangeName, "china.people", message);
    12. }

    只有消费者1收到消息 

    7、、声明队列交换机

    SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

    1、Queue:用于声明队列,可以用工厂类QueueBuilder构建
    2、Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
    3、Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

    1、SpringAMQP提供的类声明

    示例:创建Fanout交换机队列

    1. @Configuration
    2. public class FanoutConfig {
    3. /**
    4. * 声明交换机
    5. * @return Fanout类型交换机
    6. */
    7. @Bean
    8. public FanoutExchange fanoutExchange2(){
    9. return new FanoutExchange("mq.fanout2");
    10. }
    11. /**
    12. * 第1个队列
    13. */
    14. @Bean
    15. public Queue fanoutQueue3(){
    16. return new Queue("fanout.queue3");
    17. }
    18. /**
    19. * 绑定队列和交换机1
    20. */
    21. @Bean
    22. public Binding bindingQueue1(Queue fanoutQueue3, FanoutExchange fanoutExchange3){
    23. return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);
    24. }
    25. /**
    26. * 第2个队列
    27. */
    28. @Bean
    29. public Queue fanoutQueue4(){
    30. return new Queue("fanout.queue4");
    31. }
    32. /**
    33. * 绑定队列和交换机2
    34. */
    35. @Bean
    36. public Binding bindingQueue2(){
    37. return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange2());
    38. }
    39. }

     direct示例

    1. @Configuration
    2. public class DirectConfig {
    3. /**
    4. * 声明交换机
    5. * @return Direct类型交换机
    6. */
    7. @Bean
    8. public DirectExchange directExchange(){
    9. return ExchangeBuilder.directExchange("mq.direct2").build();
    10. }
    11. /**
    12. * 第1个队列
    13. */
    14. @Bean
    15. public Queue directQueue1(){
    16. return new Queue("direct.queue1");
    17. }
    18. /**
    19. * 绑定队列和交换机
    20. */
    21. @Bean
    22. public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
    23. return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    24. }
    25. /**
    26. * 绑定队列和交换机
    27. */
    28. @Bean
    29. public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
    30. return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    31. }
    32. /**
    33. * 第2个队列
    34. */
    35. @Bean
    36. public Queue directQueue2(){
    37. return new Queue("direct.queue2");
    38. }
    39. /**
    40. * 绑定队列和交换机
    41. */
    42. @Bean
    43. public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
    44. return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    45. }
    46. /**
    47. * 绑定队列和交换机
    48. */
    49. @Bean
    50. public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
    51. return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    52. }
    53. }

     direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding

    2、基于注解声明

    1. /**
    2. * 注解声明交换机
    3. * @param msg
    4. */
    5. @RabbitListener(bindings = @QueueBinding(
    6. value = @Queue(name = "direct.queue3"),//队列名称
    7. exchange = @Exchange(name = "mq.direct", //交换机名称
    8. type = ExchangeTypes.DIRECT),//交换机类型
    9. key = {"red", "blue"}//RoutingKey
    10. ))
    11. public void listenDirectQueue3(String msg){
    12. System.out.println("消费者3接收到direct.queue3的消息:【" + msg + "】");
    13. }
    14. @RabbitListener(bindings = @QueueBinding(
    15. value = @Queue(name = "direct.queue4"),
    16. exchange = @Exchange(name = "mq.direct", type = ExchangeTypes.DIRECT),
    17. key = {"red", "yellow"}
    18. ))
    19. public void listenDirectQueue4(String msg){
    20. System.out.println("消费者4接收到direct.queue4的消息:【" + msg + "】");
    21. }

     队列

     交换机

    七、消息转换器

    1. @Test
    2. public void testSendMap() throws InterruptedException {
    3. // 准备消息
    4. Map<String,Object> msg = new HashMap<>();
    5. msg.put("name", "张三");
    6. msg.put("age", 21);
    7. // 发送消息
    8. rabbitTemplate.convertAndSend("object.queue", msg);
    9. }

    当发送的数据为Objiet类型时会出现乱码现象,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化

    配置JSON转换器

    publisherconsumer两个服务中都引入依赖:

    1. <dependencies>
    2. <!--mq消息转换为json-->
    3. <dependency>
    4. <groupId>com.fasterxml.jackson.dataformat</groupId>
    5. <artifactId>jackson-dataformat-xml</artifactId>
    6. <version>2.9.10</version>
    7. </dependency>
    8. </dependencies>

    注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

    配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

    1. @Bean
    2. public MessageConverter messageConverter(){
    3. // 1.定义消息转换器
    4. Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    5. // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    6. jackson2JsonMessageConverter.setCreateMessageIds(true);
    7. return jackson2JsonMessageConverter;
    8. }

    结果

     消费者接收Object

    我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:

    1. @RabbitListener(queues = "object.queue")
    2. public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
    3. System.out.println("消费者接收到object.queue消息:【" + msg + "】");
    4. }

  • 相关阅读:
    HashMap关键就这几个点,你Get到了?
    科普:如何应用视觉显著性模型优化远控编码算法?
    docker运行redis镜像
    用代码收集每天热点内容信息,并发送到自己的邮箱
    电力电子转战数字IC20220720day53——同步通信元件
    游族马寅龙:常见信息安全风险及应对方案
    vscode配合gitee同步云设置
    物联网入门系列(一):快速搭建一站式数据存储与实时分析平台
    redis客户端Jedis和Lettuce
    java计算机毕业设计喜枫日料店自助点餐系统源码+lw文档+系统+数据库
  • 原文地址:https://blog.csdn.net/qi341500/article/details/133812139