• springboot整合消息队列——RabbitMQ


    RabbitMQ常用的三种Exchange Type:fanout、direct、topic。

    1. fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。
    2. direct:把消息投递到那些binding key与routing key完全匹配的队列中。
    3. topic:将消息路由到binding key与routing key模式匹配的队列中。

    这里基于springboot整合​ ​消息队列​ ​,测试这三种Exchange。

    • 启动RabbitMQ

    双击运行rabbitmq-server.bat

    • SpringBoot整合RabbitMQ——Direct模式(默认模式)

    创建springboot web项目——发送者 springboot-sender

    追加测试和rabbitmq所需的依赖

    
    
      org.springframework.boot
      spring-boot-starter-amqp
    
    
    
      org.springframework.boot
      spring-boot-test
    
    
      junit
      junit
    
    
      org.springframework
      spring-test
      5.0.9.RELEASE
    

    修改配置文件 application.yml 或  application.properties:

    server:
      port: 7001
    spring:
      application:
        name: spirngboot-sender
        rabbitmq:
          host: 127.0.0.1
          port: 5672
          username: guest
          password: guest

    发送的信息可以是 基本数据类型 也可以是 对象 ,这里创建一个用户对象

    public class User implements Serializable{
        private String username;
        private String password;
     
        public String getUsername() {
            return username;
        }
     
        public void setUsername(String username) {
            this.username = username;
        }
     
        public String getPassword() {
            return password;
        }
     
        public void setPassword(String password) {
            this.password = password;
        }
    }

    创建一个配置类: SenderConfiguration.java

    一个名为 queue1 的​ ​队列​ ​

    @Configuration
    public class SenderConfiguration {
        @Bean
        public Queue directQueue() {
            return new Queue("queue1");
        }
    }

    创建一个发送信息类: SenderService.java

    发送 user 对象给 queue1 队列

    @Component
    public class SenderService {
        @Autowired
        private AmqpTemplate template;
     
        public void sendUser() {
            User user=new User();
            user.setUsername("张三");
            user.setPassword("123456");
            template.convertAndSend("queue1",user);
        }
    }

    创建一个测试类: TestRabbitMQ.java

    @SpringBootTest(classes=SpringbootSenderApplication.class)
    @RunWith(SpringJUnit4ClassRunner.class)
    public class TestRabbitMQ {
        @Autowired
        private SenderService senderService;
     
        @Test
        public void testRabbit() {
            senderService.sendUser();
        }
    }

    运行 testRabbit 方法:

    创建springboot web项目——接收者 springboot-receiver

    修改配置文件 application.yml 或  application.properties:

    server:
      port: 7002
    spring:
      application:
        name: spirngboot-receiver
        rabbitmq:
          host: 127.0.0.1
          port: 5672
          username: guest
          password: guest

    添加接收类: ReceiverService.java

    @Component
    public class ReceiverService {
        @RabbitListener(queues="queue1")
        public void receiveUser(User user) {
            System.out.println("username:"+user.getUsername()+" password:"+user.getPassword());
        }
    }

    运行启动类:SpringbootApplication.java,结果:

    信息成功被接收。

    SpringBoot整合RabbitMQ——Topic模式(模糊匹配)

    步骤与Direct差不多。

    发送者:

    修改配置类SenderConfiguration.java:

    创建两个队列 topic1,topic2,创建一个topic交换器,绑定交换机和队列以及绑定规则

    @Test
    public void testRabbit() {
        senderService.sendUser();
    }@Bean(name="topic1")
    public Queue topicQueue1() {
        return new Queue("topic1");
    }
    @Bean(name="topic2")
    public Queue topicQueue2() {
        return new Queue("topic2");
    }
     
    @Bean
    public TopicExchange exchange() {
        //创建一个topic交换器
        return new TopicExchange("topicExchange");
    }
    @Bean
    Binding bindingExchangeTopic1(@Qualifier("topic1") Queue queueMessage, TopicExchange exchange) {
        //设置topic1绑定规则
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.queue");
    }
    @Bean
    Binding bindingExchangeTopic2(@Qualifier("topic2") Queue queueMessages, TopicExchange exchange) { 
        //设置topic2绑定规则 *表示一个词,#表示零个或多个词
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

    修改发送类 SenderService.java :

    User user=new User();
    user.setUsername("张三");
    user.setPassword("123456");
    //发送给topicExchange的交换机
    template.convertAndSend("topicExchange","topic.queue",user);
    template.convertAndSend("topicExchange","topic.anyting",user);

    运行 testRabbit 方法:

    成功广播出去两条信息

    接收者:

    修改接收类 ReceiverService.java :

    @RabbitListener(queues="fanout1")
    public void receiveFanout1(User user) {
        System.out.println("队列:fanout1 username:"+user.getUsername()+" password:"+user.getPassword());
    }
    @RabbitListener(queues="fanout2")
    public void receiveFanout2(User user) {
        System.out.println("队列:fanout2 username:"+user.getUsername()+" password:"+user.getPassword());
    }

    运行启动类,结果:

    消息成功被发送接收

  • 相关阅读:
    《嵌入式虚拟化技术与应用》:深入浅出阐述嵌入式虚拟机原理,实现“小而能”嵌入式虚拟机!
    【无标题】
    EEG-fNIRS跨模态迁移学习优化BCI系统分类精度
    探索精彩世界,畅享短视频直播平台
    【21天学习挑战赛】顺序查找
    LeetCode 0813. 最大平均值和的分组
    docker-compose快速部署nginx
    Linux本地部署开源AI的PDF工具—Stirling PDF并实现公网随时访问
    IDEA启动报错Failed to create JVM. JVM path的解决办法
    面试官:说说SSO单点登录的实现原理?
  • 原文地址:https://blog.csdn.net/Candyz7/article/details/126481018