• 【RabbitMQ 实战】09 客户端连接集群生产和消费消息


    一、部署一个三节点集群

    下面的链接是最快最简单的一种集群部署方法
    3分钟部署一个RabbitMQ集群
    上的的例子中,没有映射端口,所以没法从宿主机外部连接容器,下面的yml文件中,暴露了端口。
    每个容器应用都映射了宿主机的端口,分别是5602,5612,5622
    docker compse文件如下

    version: '3'
    
    services:
      stats:
        image: bitnami/rabbitmq
        environment:
          - RABBITMQ_NODE_TYPE=stats
          - RABBITMQ_NODE_NAME=rabbit@stats
          - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
        ports:
          - '15672:15672'
          - '5602:5672'
        volumes:
          - 'rabbitmqstats_data:/bitnami/rabbitmq/mnesia'
      queue-disc1:
        image: bitnami/rabbitmq
        environment:
          - RABBITMQ_NODE_TYPE=queue-disc
          - RABBITMQ_NODE_NAME=rabbit@queue-disc1
          - RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats
          - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
        ports:
          - '5612:5672'
        volumes:
          - 'rabbitmqdisc1_data:/bitnami/rabbitmq/mnesia'
      queue-ram1:
        image: bitnami/rabbitmq
        environment:
          - RABBITMQ_NODE_TYPE=queue-ram
          - RABBITMQ_NODE_NAME=rabbit@queue-ram1
          - RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats
          - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
        ports:
          - '5622:5672'
        volumes:
          - 'rabbitmqram1_data:/bitnami/rabbitmq/mnesia'
    
    volumes:
      rabbitmqstats_data:
        driver: local
      rabbitmqdisc1_data:
        driver: local
      rabbitmqram1_data:
        driver: local
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    通过docker-compose up命令,就可以启动三个集群的容器了

    [root@localhost mycompose]# docker-compose up
    
    • 1

    二、配置文件

    原来的单节点只配置host和port,现在集群节点,就要配置addresses了,如下所示:

    server:
      port: 8080
    spring:
      application:
        name: rabbitmq-demo
      #配置rabbitMq 服务器
      rabbitmq:
    #单节点直接可以写host和port
    #    host: 192.168.56.201
    #    port: 5672
        #集群连接写ip和端口
        addresses: 192.168.56.202:5602,192.168.56.202:5612,192.168.56.202:5622
        username: user
        password: bitnami
        #虚拟host
        virtual-host: virtual01
        template:
          mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息
        publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功
        publisher-returns: true #是否开启生产者returns
        listener:
          simple:
            acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法
            prefetch: 10 #每个消费者可拉取的,还未ack的消息数量
            concurrency: 3 #消费端(每个Listener)的最小线程数
            max-concurrency: 10 #消费端(每个Listener)的最大线程数
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    三、代码

    生产者

    和单节点的发送和消费代码一致,没有变化

    @Slf4j
    @RestController
    @RequestMapping("/rabbit")
    public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
        private static final String EXCHANGE_NAME = "my_exchange";
        private static final String ROUTING_KEY = "my_routing";
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 正常发送并被broker接收
         * @return
         */
        @RequestMapping("send")
        public String send() {
            for (int i = 0; i < 10; i++) {
                OrderInfo orderInfo = new OrderInfo();
                orderInfo.setAddress("成都市高新区");
                orderInfo.setOrderId(String.valueOf(i));
                orderInfo.setProductName("华为P60:" + i);
    
                //设置回调关联的一个id
                String messageId = UUID.randomUUID().toString();
                log.info("开始发送消息,当前消息关联id为:{}", messageId);
                CorrelationData correlationData = new CorrelationData(messageId);
    
                MessageProperties messageProperties = new MessageProperties();
                messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8))
                        .andProperties(messageProperties).build();
                //设置ack回调
                rabbitTemplate.setConfirmCallback(this);
                //退回消息的回调
                rabbitTemplate.setReturnCallback(this);
                rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);
            }
            return "ok";
        }
    
        /**
         * 设置一个非法的路由键,模拟消息被broker退回的情况,前提是
         * spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息
         * <p>
         * spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功
         *
         * @return
         */
        @RequestMapping("send-return")
        public String sendAndReturn() {
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setAddress("成都市高新区");
            orderInfo.setOrderId("111");
            orderInfo.setProductName("小米13");
    
            //设置回调关联的一个id
            String messageId = UUID.randomUUID().toString();
            log.info("开始发送消息,当前消息关联id为:{}", messageId);
            CorrelationData correlationData = new CorrelationData(messageId);
    
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8))
                    .andProperties(messageProperties).build();
            //设置ack回调
            rabbitTemplate.setConfirmCallback(this);
            //退回消息的回调
            rabbitTemplate.setReturnCallback(this);
            //下面这个RoutingKey是没有绑定的,所以发不出去
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData);
            return "ok";
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (correlationData == null) {
                return;
            }
            String messageId = correlationData.getId();
            if (ack) {
                log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId);
            } else {
                log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId);
            }
        }
    
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}",
                    new String(message.getBody()), replyCode, replyText, exchange, routingKey);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    消费者

    @Slf4j
    @Component
    public class RabbitOrderConsumer {
        private static final String EXCHANGE_NAME = "my_exchange";
        private static final String QUEUE_NAME = "my_queue";
        private static final String ROUTING_KEY = "my_routing";
    
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),
                exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)})
        public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
            //上面这个tag是这么写的么,为什么每次传过来都是1?导致channel被重新创建
            log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag);
            channel.basicAck(tag, false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    访问地址:http://localhost:8080/rabbit/send,然后就可以发送消息了,输出日志如下:

    开始发送消息,当前消息关联id为:18049efe-a624-4288-a8f0-9c28fd776773
    开始发送消息,当前消息关联id为:83d93f90-62f4-41cf-af02-03d496812561
    开始发送消息,当前消息关联id为:f83257b2-95b6-408e-a5b9-74d0ec9f30b0
    开始发送消息,当前消息关联id为:16a7e471-23ba-408b-9095-6add9ad1e270
    开始发送消息,当前消息关联id为:152b0fb0-3a22-452d-93fe-662252c2fd8c
    开始发送消息,当前消息关联id为:ade4f703-6075-485f-8e34-ec9b95bf59de
    开始发送消息,当前消息关联id为:e4511f82-476a-4f4c-b704-4399baadeaf4
    接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1
    接收到消息:{"orderId":"0","productName":"华为P60:0","address":"成都市高新区"},deliveryTag:1
    开始发送消息,当前消息关联id为:d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
    开始发送消息,当前消息关联id为:76950a93-5887-43c1-adef-edc1e29e2fab
    开始发送消息,当前消息关联id为:f08a7a68-60da-4c5d-b1b8-c9e4d9453969
    【confirm回调方法】,消息发布成功,messageId=18049efe-a624-4288-a8f0-9c28fd776773
    【confirm回调方法】,消息发布成功,messageId=83d93f90-62f4-41cf-af02-03d496812561
    接收到消息:{"orderId":"3","productName":"华为P60:3","address":"成都市高新区"},deliveryTag:2
    接收到消息:{"orderId":"2","productName":"华为P60:2","address":"成都市高新区"},deliveryTag:1
    接收到消息:{"orderId":"6","productName":"华为P60:6","address":"成都市高新区"},deliveryTag:3
    接收到消息:{"orderId":"5","productName":"华为P60:5","address":"成都市高新区"},deliveryTag:2
    接收到消息:{"orderId":"9","productName":"华为P60:9","address":"成都市高新区"},deliveryTag:4
    接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2
    接收到消息:{"orderId":"7","productName":"华为P60:7","address":"成都市高新区"},deliveryTag:3
    接收到消息:{"orderId":"8","productName":"华为P60:8","address":"成都市高新区"},deliveryTag:3
    【confirm回调方法】,消息发布成功,messageId=f83257b2-95b6-408e-a5b9-74d0ec9f30b0
    【confirm回调方法】,消息发布成功,messageId=16a7e471-23ba-408b-9095-6add9ad1e270
    【confirm回调方法】,消息发布成功,messageId=152b0fb0-3a22-452d-93fe-662252c2fd8c
    【confirm回调方法】,消息发布成功,messageId=ade4f703-6075-485f-8e34-ec9b95bf59de
    【confirm回调方法】,消息发布成功,messageId=e4511f82-476a-4f4c-b704-4399baadeaf4
    【confirm回调方法】,消息发布成功,messageId=d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
    【confirm回调方法】,消息发布成功,messageId=76950a93-5887-43c1-adef-edc1e29e2fab
    【confirm回调方法】,消息发布成功,messageId=f08a7a68-60da-4c5d-b1b8-c9e4d9453969
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    上述代码仓库:https://gitee.com/syk1234/mqdmo

    四、后台管理

    登录管理后台页面:http://192.168.56.202:15672/
    在这里插入图片描述

    共有三个节点,两个磁盘节点,一个内存节点。如果你还不清楚什么是磁盘节点,什么是内存节点,可以参考【RabbitMQ 实战】08 集群原理剖析

    查看连接情况,发现是连接的是节点rabbit@stats节点在这里插入图片描述
    查看队列的情况,队列是在rabbit@stats节点上
    在这里插入图片描述

  • 相关阅读:
    Linux小白零基础必备基础操作【上】
    喜报 | 热烈祝贺思腾合力成功挂牌天津OTC专精特新板
    架设好传奇登录器显示无法连接服务器,完美登录器使用常见问题解决办法
    《动手学深度学习》d2l 李沐B站视频配套学习笔记
    什么是智慧校园?
    Redis基础运维及问题排查
    微信小程序导航组件 navigator使用
    Linux C/C++实现SSL的应用层VPN (MiniVPN)
    开发网页版加密访问文档
    蓝桥杯之模拟与枚举day1
  • 原文地址:https://blog.csdn.net/suyuaidan/article/details/133632329