• rabbitmq 使用SAC队列实现顺序消息


    rabbitmq 使用SAC队列实现顺序消息

    前提

    SAC: single active consumer, 是指如果有多个实例,只允许其中一个实例消费,其他实例为空闲

    目的

    实现消息顺序消费,操作:

    • 创建4个SAC队列,
    • 消息的路由key 取队列个数模,这里是4
    • 发送消息到每个队列,保证每个队列只有一个消费者!!

    实现

    定义消息 SeqMessage
    @Data
    @AllArgsConstructor
    public class SeqMessage implements Serializable {
    
        //消息id
        private String requestNo;
        //消息中顺序,1,2,3,4
        private int order;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    创建 队列 绑定
    @Configuration
    public class OrderQueueConfiguration {
    
        public static final String EXCHANGE = "order-ex";
        public static final String RK_PREFIX = "rk-";
        public static final String ONE_QUEUE = "one-queue";
        public static final String TWO_QUEUE = "two-queue";
        public static final String THREE_QUEUE = "three-queue";
        public static final String FOUR_QUEUE = "four-queue";
    
        @Bean
        public DirectExchange exchange() { // 使用直连的模式
            return new DirectExchange(EXCHANGE, true, false);
        }
    
        @Bean
        public Binding oneBinding() {
            return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX + 1);
        }
        @Bean
        public Binding twoBinding() {
            return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX + 2);
        }
        @Bean
        public Binding threeBinding() {
            return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX + 3);
        }
        @Bean
        public Binding fourBinding() {
            return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX + 4);
        }
    
    
        @Bean
        public Queue oneQueue() {
            return createSacQueue(ONE_QUEUE);
        }
    
        @Bean
        public Queue twoQueue() {
            return createSacQueue(TWO_QUEUE);
        }
    
        @Bean
        public Queue threeQueue() {
            return createSacQueue(THREE_QUEUE);
        }
    
        @Bean
        public Queue fourQueue() {
            return createSacQueue(FOUR_QUEUE);
        }
    
        private static Queue createSacQueue(String queueName) {
            Map<String, Object> arguments = new HashMap<>(2);
            arguments.put("x-single-active-consumer", true);
            return new Queue(queueName, true, false, false, arguments);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    重要的是 x-single-active-consumer 这个属性, 只有一个实例生效

    在这里插入图片描述

    创建 消费者

    为每个队列创建一个监听消费者

    @Slf4j
    @Component
    public class OrderListener {
    
    
        @RabbitListener(bindings = @QueueBinding(
                        exchange = @Exchange(value = EXCHANGE,declare = "false"),
                        value = @Queue(value = ONE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 1))
        public void onMessage1(Message message, @Headers Channel channel) {
            String messageStr = "";
            try {
                messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
                log.info("{} recv: {}", ONE_QUEUE, messageStr);
            } catch (Exception e) {
                log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
            }
        }
    
        @RabbitListener(bindings = @QueueBinding(
                exchange = @Exchange(value = EXCHANGE,declare = "false"),
                value = @Queue(value = TWO_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 2))
        public void onMessage2(Message message, @Headers Channel channel) {
            String messageStr = "";
            try {
                messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
                log.info("{} recv: {}", TWO_QUEUE, messageStr);
            } catch (Exception e) {
                log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
            }
        }
        @RabbitListener(bindings = @QueueBinding(
                exchange = @Exchange(value = EXCHANGE,declare = "false"),
                value = @Queue(value = THREE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 3))
        public void onMessage3(Message message, @Headers Channel channel) {
            String messageStr = "";
            try {
                messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
                log.info("{} recv: {}", THREE_QUEUE, messageStr);
            } catch (Exception e) {
                log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
            }
        }
    
        @RabbitListener(bindings = @QueueBinding(
                exchange = @Exchange(value = EXCHANGE,declare = "false"),
                value = @Queue(value = FOUR_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 4))
        public void onMessage4(Message message, @Headers Channel channel) {
            String messageStr = "";
            try {
                messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
                log.info("{} recv: {}", FOUR_QUEUE, messageStr);
            } catch (Exception e) {
                log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    生产者发送消息
    @GetMapping("/send/seq/messqge")
     public String sendSeqMessage() throws JsonProcessingException {
         int cnt = 100;
         int mod = 4;
         int seqSize = 6;
         for (int i = 0; i < cnt; i++) {
             for (int j = 0; j < seqSize; j++) {
                 int rk = i % mod + 1;
                 SeqMessage seqMessage = new SeqMessage("seq-" + i, j);
                 String s = objectMapper.writeValueAsString(seqMessage);
                 log.info("routeKey: {}, send msg: {}", rk, s);
                 rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX + rk, s);
             }
         }
         return "success";
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    运行结果:

    two-queue recv: {"requestNo":"seq-1","order":0}
    two-queue recv: {"requestNo":"seq-1","order":1}
    two-queue recv: {"requestNo":"seq-1","order":2}
    two-queue recv: {"requestNo":"seq-1","order":3}
    two-queue recv: {"requestNo":"seq-1","order":4}
    two-queue recv: {"requestNo":"seq-1","order":5}
    two-queue recv: {"requestNo":"seq-5","order":0}
    two-queue recv: {"requestNo":"seq-5","order":1}
    two-queue recv: {"requestNo":"seq-5","order":2}
    two-queue recv: {"requestNo":"seq-5","order":3}
    two-queue recv: {"requestNo":"seq-5","order":4}
    two-queue recv: {"requestNo":"seq-5","order":5}
    
    three-queue recv: {"requestNo":"seq-2","order":0}
    three-queue recv: {"requestNo":"seq-2","order":1}
    three-queue recv: {"requestNo":"seq-2","order":2}
    three-queue recv: {"requestNo":"seq-2","order":3}
    three-queue recv: {"requestNo":"seq-2","order":4}
    three-queue recv: {"requestNo":"seq-2","order":5}
    three-queue recv: {"requestNo":"seq-6","order":0}
    three-queue recv: {"requestNo":"seq-6","order":1}
    three-queue recv: {"requestNo":"seq-6","order":2}
    three-queue recv: {"requestNo":"seq-6","order":3}
    three-queue recv: {"requestNo":"seq-6","order":4}
    three-queue recv: {"requestNo":"seq-6","order":5}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    可以发现,消息消费是顺序的

    good luck!

  • 相关阅读:
    java毕业设计企业员工业绩考核系统mybatis+源码+调试部署+系统+数据库+lw
    SA8155P Flat Build QFIL刷机
    (二十九)大数据实战——kafka集群节点服役与退役案例实战
    mysql 综合练习
    小程序中自定义组件
    【黄啊码】MySQL入门—9、什么?都2022了,还不知道什么是事务?
    蓝牙SDK状态机与车载音频HSM状态机比较
    Linux删除文件后没有释放空间解决办法
    excel中怎么用乘法、加法来替代AND和OR函数
    逆向-beginners之指针变量
  • 原文地址:https://blog.csdn.net/u013887008/article/details/137955920