码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • 【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack


    前景回顾

    【mq】从零开始实现 mq-01-生产者、消费者启动

    【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

    【mq】从零开始实现 mq-03-引入 broker 中间人

    【mq】从零开始实现 mq-04-启动检测与实现优化

    【mq】从零开始实现 mq-05-实现优雅停机

    【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat

    【mq】从零开始实现 mq-07-负载均衡 load balance

    【mq】从零开始实现 mq-08-配置优化 fluent

    【mq】从零开始实现 mq-09-消费者拉取消息 pull message

    【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack

    状态回执

    大家好,我是老马。

    上一节我们只实现了拉取消息的实现,但是缺少了消费状态回执。

    这一节我们一起来学习下如何实现状态回执。

    10

    代码实现

    回执状态的设计

    我们规定如下几种回执状态:

    package com.github.houbb.mq.common.constant;
    
    /**
     * @author binbin.hou
     * @since 0.0.3
     */
    public final class MessageStatusConst {
    
        private MessageStatusConst(){}
    
        /**
         * 待消费
         * ps: 生产者推送到 broker 的初始化状态
         */
        public static final String WAIT_CONSUMER = "W";
    
        /**
         * 推送给消费端处理中
         * ps: broker 准备推送时,首先将状态更新为 P,等待推送结果
         * @since 0.1.0
         */
        public static final String TO_CONSUMER_PROCESS = "TCP";
    
        /**
         * 推送给消费端成功
         * @since 0.1.0
         */
        public static final String TO_CONSUMER_SUCCESS = "TCS";
    
        /**
         * 推送给消费端失败
         * @since 0.1.0
         */
        public static final String TO_CONSUMER_FAILED = "TCF";
    
        /**
         * 消费完成
         */
        public static final String CONSUMER_SUCCESS = "CS";
    
        /**
         * 消费失败
         */
        public static final String CONSUMER_FAILED = "CF";
    
        /**
         * 稍后消费
         * @since 0.1.0
         */
        public static final String CONSUMER_LATER = "CL";
    
    }
    

    消费者状态回执

    我们在消费之后,添加状态回执:

    for(MqMessage mqMessage : mqMessageList) {
        IMqConsumerListenerContext context = new MqConsumerListenerContext();
        final String messageId = mqMessage.getTraceId();
        ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);
        log.info("消息:{} 消费结果 {}", messageId, consumerStatus);
    
        // 状态同步更新
        MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);
        log.info("消息:{} 状态回执结果 {}", messageId, JSON.toJSON(ackResp));
    }
    

    回执实现,根据 messageId 更新对应的消息消费状态。

    public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) {
        final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq();
        req.setMessageId(messageId);
        req.setMessageStatus(consumerStatus.getCode());
        final String traceId = IdHelper.uuid32();
        req.setTraceId(traceId);
        req.setMethodType(MethodType.C_CONSUMER_STATUS);
    
        // 重试
        return Retryer.<MqCommonResp>newInstance()
                .maxAttempt(consumerStatusMaxAttempt)
                .callable(new Callable<MqCommonResp>() {
                    @Override
                    public MqCommonResp call() throws Exception {
                        Channel channel = getChannel(null);
                        MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
                        if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
                            throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED);
                        }
                        return resp;
                    }
                }).retryCall();
    }
    

    Broker 回执处理

    消息分发

    // 消费者消费状态 ACK
    if(MethodType.C_CONSUMER_STATUS.equals(methodType)) {
        MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class);
        final String messageId = req.getMessageId();
        final String messageStatus = req.getMessageStatus();
        return mqBrokerPersist.updateStatus(messageId, messageStatus);
    }
    

    简单实现

    这里是基于本地 map 更新状态的,性能比较差。

    后续会以 mysql 实现。

    public MqCommonResp updateStatus(String messageId, String status) {
        // 这里性能比较差,所以不可以用于生产。仅作为测试验证
        for(List<MqMessagePersistPut> list : map.values()) {
            for(MqMessagePersistPut put : list) {
                MqMessage mqMessage = put.getMqMessage();
                if(mqMessage.getTraceId().equals(messageId)) {
                    put.setMessageStatus(status);
                    break;
                }
            }
        }
    
        MqCommonResp commonResp = new MqCommonResp();
        commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        return commonResp;
    }
    

    小结

    对于消息状态的细化,更加便于我们后续的管理,和问题的定位。

    希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

    我是老马,期待与你的下次重逢。

    开源地址

    The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

    拓展阅读

    rpc-从零开始实现 rpc https://github.com/houbb/rpc

  • 相关阅读:
    LLM之RAG理论(十)| RAT:一种协同CoT和RAG的 AI 提示策略,助力解决长任务推理和生成
    【软件测试】测试人的我们,咋做一个如鱼得水的测试员?
    Influxdb数据库部署文档
    如何使用virtualenv的虚拟环境
    Ant Design Vue UI框架的基础使用,及通用后台管理模板的小demo【简单】
    [附源码]Python计算机毕业设计Django社区人员信息管理系统设计与实现
    VS Code配置matlab
    python打包exe
    c++八股day3-c++什么时候生成默认拷贝构造函数
    统计教程|PASS实现单因素二元Logistic回归分析且自变量为二分类的优势比检验的样本量估计
  • 原文地址:https://www.cnblogs.com/houbbBlogs/p/16264702.html
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号