• SpringAMQP之队列和交换机


    目录

    安装配置

    基础使用

    WorkQueue

    FanoutExchange

    DirectExchange


    安装配置

    我们在使用RabbitMQ需要先在Docker中拉取RabbitMQ使用下面命令拉取

    1. systemctl start docker #启动docker
    2. docker pull rabbitmq:3-management
    3. #安装这个版本的rabbitmq

    这里我之前安装了最新的RabbitMQ发现打开不了网站,我用了这个版本的就打开了

    在拉取镜像成功后,使用创建容器的命令创建RabbitMQ容器

    1. docker run \
    2. -e RABBITMQ_DEFAULT_USER=demo \
    3. -e RABBITMQ_DEFAULT_PASS=123456 \
    4. --name rq \
    5. -p 15672:15672 \
    6. -p 5672:5672 \
    7. -d \
    8. rabbitmq:3-management

    这样就算安装完成了RabbitMQ,并且我们登陆RabbitMQ

    • channels:操作MQ的工具
    • exchange:路由消息到队列中(交换机)
    • queue:缓存消息(队列)
    • virtual host:虚拟主机,是对queue\exchange等资源的逻辑分组

     我们现在开始使用Java来发起最基本的publisher->equeue->consume,发布给队列,队列取值这种效果。

    首先先导入依赖和配置yml文件

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>
    1. spring:
    2. rabbitmq:
    3. host: 192.168.3.26
    4. port: 5672
    5. virtual-host: /
    6. username: demo
    7. password: 123456

    写一个测试发布的,在这之前我们需要,先创建一个队列(注意:这里已经使用了SpringBootTest,所以如果在同一个包下,如果有SpringbootApplicaiton的注解需要先注释掉,不然是会报错的)

    基础使用

    1. @RunWith(SpringRunner.class)
    2. @SpringBootTest
    3. public class Publisher {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. @Test
    7. public void test1(){
    8. String queuename = "simple.queue";
    9. String message = "hello,spring";
    10. rabbitTemplate.convertAndSend(queuename,message);
    11. }
    12. }

    这样之后,我们在写一个消费者,用于接收队列中的数据(这个我们需要springboot启动后,一直做监听)消费者中配置一样的yml文件和依赖

    1. @Component
    2. public class Consum {
    3. @RabbitListener(queues="simple.queue")
    4. public void listenT(String msg){
    5. System.out.println("接收到消息:"+msg);
    6. }
    7. }

    我首先没有启动消费者的springboot,所以会在队列中看到有一条数据

    然后我们启动消费者

    成功的接收到了队列中的消息,队列的消息也没有了,已经传送过来了

    WorkQueue

    下面我们将讲解workQueue

    workQueue是一个发布者,和多个消费者,但是当中会有预取机制,会将每一个传送来的值均匀分布在每一个消费者上。

    我将发布的利用循环,让其发送5次信息

    1. @RunWith(SpringRunner.class)
    2. @SpringBootTest
    3. @EnableRabbit
    4. public class Publisher {
    5. @Autowired
    6. private RabbitTemplate rabbitTemplate;
    7. @Test
    8. public void test1(){
    9. String queuename = "simple.queue";
    10. String message = "hello,springmq";
    11. for (int i=0;i<5;i++)
    12. {
    13. rabbitTemplate.convertAndSend(queuename,message);
    14. }
    15. }
    16. }

    消费者中使用两个监听的来获取同一个队列

    1. @Component
    2. public class Consum {
    3. @RabbitListener(queues="simple.queue")
    4. public void listenT1(String msg){
    5. System.out.println("demo1接收到消息:"+msg);
    6. }
    7. @RabbitListener(queues="simple.queue")
    8. public void listenT2(String msg){
    9. System.out.println("demo2接收到消息:"+msg);
    10. }
    11. }

    结果如下

     这就是workQueue的作用,下面我们将讲解FanoutExchange

    FanoutExchange

    交换机在这里作用就是改变我们使用的传输数据的方式,不是通过发布到队列,然后到队列当中去取,而是通过发布者发布给交换机,又交换机交给不同的队列,再由消费者来调用。

    我们更改代码,现在使用注解的方式来进行交换机和队列的创建,并拉取他们之间连接,这里我们首先先创建交换机和队列以及彼此的连接,这里我新建了一个类,和监听的分开,更符合微服务的思想

    1. @Configuration
    2. public class Exchange {
    3. @Bean
    4. public FanoutExchange FanoutExchange1(){
    5. return new FanoutExchange("demo.list");
    6. }
    7. @Bean
    8. public Queue fanoutqueue1(){
    9. return new Queue("fanout.queue1");
    10. }
    11. @Bean
    12. public Queue fanoutqueue2(){
    13. return new Queue("fanout.queue2");
    14. }
    15. @Bean
    16. public Binding fanoutbinding(Queue fanoutqueue1, FanoutExchange FanoutExchange1){
    17. return BindingBuilder.bind(fanoutqueue1).to(FanoutExchange1);
    18. }
    19. @Bean
    20. public Binding fanoutbinding2(Queue fanoutqueue2, FanoutExchange FanoutExchange1){
    21. return BindingBuilder.bind(fanoutqueue2).to(FanoutExchange1);
    22. }
    23. }

    这里我创建两个队列和一个交换机。下面是监听类中添加对两种队列的监听。

    1. @Component
    2. public class Consum {
    3. @RabbitListener(queues="simple.queue")
    4. public void listenT1(String msg){
    5. System.out.println("demo1接收到消息:"+msg);
    6. }
    7. @RabbitListener(queues="simple.queue")
    8. public void listenT2(String msg){
    9. System.out.println("demo2接收到消息:"+msg);
    10. }
    11. @RabbitListener(queues="fanout.queue1")
    12. public void listenT3(String msg){
    13. System.out.println("demofanout1接收到消息:"+msg);
    14. }
    15. @RabbitListener(queues="fanout.queue2")
    16. public void listenT4(String msg){
    17. System.out.println("demofanou2接收到消息:"+msg);
    18. }
    19. }

    发送就改变不是向队列发送,而是向交换机发送

    1. @RunWith(SpringRunner.class)
    2. @SpringBootTest
    3. @EnableRabbit
    4. public class Publisher {
    5. @Autowired
    6. private RabbitTemplate rabbitTemplate;
    7. @Test
    8. public void test1(){
    9. String exchangename = "demo.list";
    10. String message = "hello,springmq";
    11. rabbitTemplate.convertAndSend(exchangename,"",message);
    12. }
    13. }

    在交换机和发送的消息中有一个“”,这个后面给大家说清楚,下面看结果

    成功得到信息,然后讲解DirectExchange

    DirectExchange

    DirectExchange会将接受到消息根据规则路由到指定的Queue,当中重要的是使用bindingkey来控制,可以使用@RabbitListener声明Exange,Queue,Routingkey。

    这里我们讲解了新的注解,就是我们可以直接在@RabbitListener中添加一些参数这样就可以不用使用@Bean的方法去一个个的声明了。从这里大家就可以看出来DirectExchange的不同点就在于使用的bindingkey,而这个值正是""。下面直接上代码

    这边是消费者我们使用注解的形式进行开发,也是最简单的一种,注意因为上面我们使用的是finout,所以我们这里需要先删除一次交换机,再执行代码,让其自动创建新的交换机

    1. @Component
    2. public class Consum {
    3. @RabbitListener(queues="simple.queue")
    4. public void listenT1(String msg){
    5. System.out.println("demo1接收到消息:"+msg);
    6. }
    7. @RabbitListener(queues="simple.queue")
    8. public void listenT2(String msg){
    9. System.out.println("demo2接收到消息:"+msg);
    10. }
    11. @RabbitListener(queues="fanout.queue1")
    12. public void listenT3(String msg){
    13. System.out.println("demofanout1接收到消息:"+msg);
    14. }
    15. @RabbitListener(queues="fanout.queue2")
    16. public void listenT4(String msg){
    17. System.out.println("demofanou2接收到消息:"+msg);
    18. }
    19. @RabbitListener(bindings = @QueueBinding(
    20. value = @Queue(name = "direct.queue1"),
    21. exchange = @Exchange(name = "demo.list", type = ExchangeTypes.DIRECT),
    22. key = {"red", "blue"}
    23. ))
    24. public void listen5(String msg) {
    25. System.out.println("demodirect1接受到消息:" + msg);
    26. }
    27. @RabbitListener(bindings = @QueueBinding(
    28. value = @Queue(name = "direct.queue2"),
    29. exchange = @Exchange(name = "demo.list", type = ExchangeTypes.DIRECT),
    30. key = {"red", "yellow"}
    31. ))
    32. public void listen6(String msg) {
    33. System.out.println("demodirect2接受到消息:" + msg);
    34. }
    35. }

    发布者添加我们需要识别的bindkey就可以了

    1. @RunWith(SpringRunner.class)
    2. @SpringBootTest
    3. @EnableRabbit
    4. public class Publisher {
    5. @Autowired
    6. private RabbitTemplate rabbitTemplate;
    7. @Test
    8. public void test1(){
    9. String exchangename = "demo.list";
    10. String message = "hello,springmq";
    11. rabbitTemplate.convertAndSend(exchangename,"red",message);
    12. }
    13. }

    这里我们先传red的,也就是两个都能收到

     

    我们把当中bindingkey更改为blue也就是只有1能收到

    1. @RunWith(SpringRunner.class)
    2. @SpringBootTest
    3. @EnableRabbit
    4. public class Publisher {
    5. @Autowired
    6. private RabbitTemplate rabbitTemplate;
    7. @Test
    8. public void test1(){
    9. String exchangename = "demo.list";
    10. String message = "hello,springmq";
    11. rabbitTemplate.convertAndSend(exchangename,"blue",message);
    12. }
    13. }

     

    这就是DirectExchange,后面还有一种是TopicExchange,这种的用法没有什么不同,就是增加了可以使用通配符来代替交换机的名称,只是偷偷懒,最后再讲一种:消息转换器

  • 相关阅读:
    从零手写实现 nginx-12-keepalive HTTP 持久连接或连接复用
    Flink开发语言大比拼:Java与Scala怎么选好?
    tomcat 数据库卡死 处理
    Flink+ice 实现可视化规则编排与灵活配置(Demo)
    VS编程技巧——写好枚举后自动补全switch
    Servlet的生命周期
    自动化测试的好处
    计算机图形学中的曲线问题
    高软核心问题
    文举论金:黄金原油全面走势分析策略指导。
  • 原文地址:https://blog.csdn.net/Demolist/article/details/127038574