• RabbitMQ快速入门


    消息队列是目前最常见的微服务中间件之一,而RabbitMq在全球范围内的使用率也是名列前茅。

    下面带大家快速入门RabbitMQ


    了解 RabbitMQ

     

    • publisher生产者,也就是发送消息的一方

    • consumer消费者,也就是消费消息的一方

    • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

    • exchange交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

    • virtual host虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue


     使用Java代码使用RabbitMQ

    Spring的官方基于RabbitMQ提供了这样一套消息收发的模板工具:Spring AMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

    首先我们要在pom.xml文件里面导入依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>

    然后在配置文件里面配置好RabbitMQ的相关配置

    1. spring:
    2. rabbitmq:
    3. host: 127.0.0.1 # 你的IP
    4. port: 5672 # 端口 - 注意这里的端口一定要是5672
    5. virtual-host: /mall # 虚拟主机
    6. username: guest # 用户名
    7. password: guest # 密码

    配置好相关配置以后就可以编写代码了

    WorkQueues模型

    实际上该模型就是让多个消费者绑定到一个队列,共同消费队列中的消息。从而提高消息处理的速度

    Fanout交换机 - 广播

    广播,将消息交给所有绑定到交换机的队列

    只要我们向Fanout发送了消息,那么所有绑定了Fanout交换机的队列都会接收到消息

    假如生产者推送了这么一条消息给交换机,那么所有的交换机都可以接收到这条消息

    1. @Test
    2. public void testFanoutExchange(){
    3. String fanoutExchange = "hmall.fanout";
    4. String message = "hello,everyone";
    5. rabbitTemplate.convertAndSend(fanoutExchange,"",message);
    6. }

    同时消费者也可以获取到消息

    1. @RabbitListener(queues = "work.queue")
    2. public void listenWorkQueue1(String msg){
    3. System.out.println("消费者1....接收到消息:【" + msg + "】");
    4. }
    5. @RabbitListener(queues = "work.queue")
    6. public void listenWorkQueue2(String msg) throws InterruptedException {
    7. System.out.println("消费者2....接收到消息:【" + msg + "】");
    8. Thread.sleep(20);//模拟缓慢的处理
    9. }

    为了更好的利用各个消费者的性能,我们可以设置”处理完一条数据才能获取数据“ 的设定

    1. spring:
    2. rabbitmq:
    3. host: 127.0.0.1
    4. port: 5672
    5. virtual-host: /hmall
    6. username: hmall
    7. password: 123
    8. listener:
    9. simple:
    10. prefetch: 1

    Direct - 订阅

    订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

    只要我们向direct发送了消息,那么所有绑定了direct交换机的且RoutingKey相同的队列才会接收消息

    由于我这里的RoutingKey是red,所以只有RoutingKey是red的队列才能接收这条消息

    1. @Test
    2. public void testDirectExchange(){
    3. String directExchange = "hmall.direct";
    4. String message = "hello,red";
    5. rabbitTemplate.convertAndSend(directExchange,"red",message);
    6. }
    1. @RabbitListener(queues = "direct.queueRed")
    2. public void listenDirectQueue1(String msg) {
    3. System.out.println("消费者Red接收到direct.queueRed的消息:【" + msg + "】");
    4. }

    Topic - 通配符匹配

    通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

    只要我们向topic发送了消息,那么所有绑定了topic交换机的且RoutingKey匹配成功的队列才会接收消息

    假如说有两个队列 一个RoutingKey是 china. 一个RoutingKey是 .news

    那么此时我发送一条消息

    1. @Test
    2. public void testSendTopicExchange() {
    3. // 交换机名称
    4. String exchangeName = "hmall.topic";
    5. // 消息
    6. String message = "hello china.news";
    7. // 发送消息
    8. rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    9. }

    那么两个队列都可以接收到

    但是如果我发送一条 

    1. @Test
    2. public void testSendTopicExchange() {
    3. // 交换机名称
    4. String exchangeName = "hmall.topic";
    5. // 消息
    6. String message = "hello china.weather";
    7. // 发送消息
    8. rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
    9. }

    那么就只有china.这个队列可以接受到了 

    利用Java代码生成交换机/队列并且完成绑定

    首先我们要创建一个在Config包里面创一个类,这个类的作用是用来生成交换机/队列的,同时要加上@Configuartion注解,在生成交换机或者队列或者绑定的方法上一定要加@Bean注解

     这里就直接贴上代码了

    Fanout

    1. @Configuration
    2. public class DirectConfiguration {
    3. @Bean
    4. public DirectExchange directExchange(){
    5. return new DirectExchange("hmall.direct");
    6. }
    7. @Bean
    8. public Queue directQueue1(){
    9. return new Queue("direct.queue1");
    10. }
    11. @Bean
    12. public Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){
    13. return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    14. }
    15. @Bean
    16. public Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){
    17. return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    18. }
    19. @Bean
    20. public Queue directQueue2(){
    21. return new Queue("direct.queue2");
    22. }
    23. @Bean
    24. public Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange){
    25. return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    26. }
    27. @Bean
    28. public Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange){
    29. return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    30. }
    31. }

    Direct

    1. @Configuration
    2. public class DirectConfiguration {
    3. @Bean
    4. public DirectExchange directExchange(){
    5. return new DirectExchange("hmall.direct");
    6. }
    7. @Bean
    8. public Queue directQueue1(){
    9. return new Queue("direct.queue1");
    10. }
    11. @Bean
    12. public Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){
    13. return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    14. }
    15. @Bean
    16. public Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){
    17. return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    18. }
    19. @Bean
    20. public Queue directQueue2(){
    21. return new Queue("direct.queue2");
    22. }
    23. @Bean
    24. public Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange){
    25. return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    26. }
    27. @Bean
    28. public Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange){
    29. return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    30. }
    31. }

    Topic

    1. @Configuration
    2. public class TopicConfiguration {
    3. //创建Topic交换机
    4. @Bean
    5. public TopicExchange topicExchange(){
    6. return new TopicExchange("hmall.topic2");
    7. }
    8. //创建topic队列
    9. @Bean
    10. public Queue topicQueue3(){
    11. return new Queue("topic.queue3");
    12. }
    13. //绑定
    14. @Bean
    15. public Binding bindingQueue1(Queue topicQueue3, TopicExchange topicExchange){
    16. return BindingBuilder.bind(topicQueue3).to(topicExchange).with("china.");
    17. }
    18. //创建topic队列
    19. @Bean
    20. public Queue topicQueue4(){
    21. return new Queue("topic.queue4");
    22. }
    23. @Bean
    24. public Binding topicBinding(Queue topicQueue4,TopicExchange topicExchange){
    25. return BindingBuilder.bind(topicQueue4).to(topicExchange).with(".news");
    26. }
    27. }

    处理JSON数据类型

    如果我们想要处理JSON数据的话就要在启动类上添加

    1. @Bean
    2. public MessageConverter jacksonMessageConvertor(){
    3. return new Jackson2JsonMessageConverter();
    4. }

    不管是生产者还是消费者都需要在启动类上添加这么一条代码 

  • 相关阅读:
    【jmeter】接口测试流程
    Pandas Excel数据处理指南
    数据结构初阶 · 链式二叉树的部分问题
    师爷,翻译翻译什么叫AOP
    Linux定时任务调度
    QCC51XX-QCC30XX系列开发教程(实战篇) 之 12.4-空间音频手机侧和耳机侧接口设计时序图
    【Nacos无压力源码领读】(二) 集成 LoadBalancer 与 OpenFeign
    Docker网络模型(五)使用 overlay 网络
    基于某钉探索针对CEF框架的一些逆向思路
    报错:npm ERR code EPERM
  • 原文地址:https://blog.csdn.net/Superkom666/article/details/133847864