• 源码分析RocketMQ之broker-处理Consumer请求


    源码分析RocketMQ之broker-处理Consumer请求
    Consumer启动后,会请求broker,broker接收请求网络
    RemotingCommand 消息传输过程中对数据内容的封装结构,主要属性
    code:请求操作码,应答方根据不同的请求码进行不同的业务处理
    language:请求方实现的语言
    version:版本
    opaque:相当于requestId,在同一个连接上的不同请求标识码,与响应消息中相对应
    flag:区分普通RPC还是oneWay RPC标识
    remark:传输自定义文本信息
    extFields:请求自定义扩展信息
    customHeader:自定义头,不进行序列化
    body:消息主体的二进制字节数据内容
    NettyRemotingServer 服务端Netty start如下处理 ServerBootstrap

    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                .addLast(defaultEventExecutorGroup,
                    encoder,
                    new NettyDecoder(),
                    //心跳
                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                    connectionManageHandler,//连接管理
                    serverHandler//真正干活的是这个线程池,有连接来的时候boss处理,channel有事件发生的时候 selecter将事件读出来,然后交给EventExecutor来处理
                );
        }
    });

    NettyRemotingClient 客户端Netty start如下处理 Bootstrap

    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if (nettyClientConfig.isUseTLS()) {
                if (null != sslContext) {
                    pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                    log.info("Prepend SSL handler");
                } else {
                    log.warn("Connections are insecure as SSLContext is null!");
                }
            }
            pipeline.addLast(
                //使用这个线程组来处理processor
                defaultEventExecutorGroup,
                new NettyEncoder(),
                new NettyDecoder(),
                //心跳
                new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                new NettyConnectManageHandler(),//链接管理
                new NettyClientHandler());//处理消息的接收
        }
    });

    NettyEncoder和NettyDecoder 是对RemotingCommand的编码解码,解析消息头,解析出消息内容,粘包拆包

    BrokerController的构造方法创建PullMessageProcessor 通过remotingServer注册PULL_MESSAGE 消息拉取请求
    processRequest负责对consumer的消费请求进行处理
    1、创建响应头RemotingCommand
    2、读取消息响应头
    3、解码拉取消息的请求头拓展字段
    4、权限、订阅关系、topic是否存在、队列有效性、消费者组是否存在、广播模式检查
    5、DefaultMessageStore.getMessage
      入参:
      group:消费者组名
      topic:消息主题
      queueId:消息队列ID
      offset:拉取的消费队列偏移量
      maxMsgNums:一次拉取消息条数,默认32条
      messageFilter:消息过滤器
      1、获取commitLog文件中的最大偏移量
      2、根据topic、queueId获取消息消费队列ConsumeQueue
      3、获取此消费队列最小偏移量和最大偏移量
      4、根据需要拉取消息的偏移量与队列最大最小偏移量对比
        1、maxOffset为0表示队列中没有消息
        2、offset < minOffset 需要的offset比队列中的最新offset小,设置下次拉取最小的offset,
        3、offset == maxOffset 表示超出一个 返回状态:OFFSET_OVERFLOW_ONE,offset 保持不变
        4、offset > maxOffset 表示超出,OFFSET_OVERFLOW_BADLY,计算下一次拉取拉取的开始偏移量
        5、offset 大于minOffset 并小于maxOffset,正常情况
      5、从consuequeue 中从当前 offset 到当前 consueque 中最大可读消息内存
      6、获取最大过滤消息字节数,max(16000, maxMsgNums * 20) 最低16000 因为有消息过滤机制,可能不满足
        指定拉取的消息数,尽量满足返回这么多条消息
      7、循环拉取消息
        1、bufferConsumeQueue消费队列中获取commitLog的偏移量、消息总长度、tag hashcode
        2、如果拉取到的消息偏移量小于下一个要拉取的物理偏移量,跳过该条消息
        3、检查offsetPy,拉取的偏移量是否在磁盘上,maxOffsetPy-offsetPy > memory 的话,memory = 物理内存 * 这个比例
           说明 offsetPy 这个偏移量的消息已经从内存中置换到磁盘中了
        4、判断本次拉取任务是否完成
        5、执行消息过滤
        6、从commitLog文件中根据偏移量和消息大小读取消息 commitLog.getMessage
        7、将读取到的消息结果添加到结果集
        8、设置下次拉取任务开始nextBeginOffset offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)
        9、如果是超过了常驻内存设置下次拉取从从服务器拉取
    6、根据 response.getCode() 分别作出不同处理
      1、SUCCESS 更新broker统计信息
        isTransferMsgByHeap:是否在heap内存汇总直接转换,间获取到的byteBuffer在heap内存汇总转换为字节数组
        普通IO进行转换,将bytebuffer转成字节数组,设置到response中去,使用堆内存来处理的
        通过channel写入fileRegion
      2、PULL_NOT_FOUND
        1、brokerAllowSuspend:构建消息拉取是的拉取标记,默认true
        2、是否支持长轮训,不支持设置1000ms作为下次拉取消息的等待时间
        3、创建pullRequest 提交给ullRequestHoldService 线程去调度,触发消息拉取
        4、设置response = null,此次调用不会向客户端输出任何字节,客户端网络请求客户端的读事件不会触发,不会触发对响应结果的处理,处于等待状态
    7、判断它这次拉取消息请求里面带没带着消费offset,如果带着的话,就找到ConsumerOffset组件,然后更新一下消费offset
       消息消费者除了定时任务5更新一下消费进度,还可以通过拉取消息的时候带着消费offset,进行消费进度的更新
     

  • 相关阅读:
    【超详细~】手把手带你推导傅里叶级数~
    Leetcode 816. 模糊坐标
    【Linux练习生】进程信号(深度理解)
    NosQL之Redis配置与优化
    小满Vue3第四十六章(Proxy跨域)
    Apollo规划代码ros移植-动态障碍物处理(一)
    查找最大元素
    Python爬虫技术系列-03requests库案例-完善
    Java虚拟机JVM简介与理解(四)
    基于chatgpt-on-wechat搭建个人知识库微信群聊机器人
  • 原文地址:https://blog.csdn.net/liangshf520/article/details/125551386