• NO1-Kafka如何通过网络接收/发送请求


    NO1-Kafka如何通过网络接收/发送请求 - 知乎

    SocketServer类核心概念

    对于Kafka来说,网络相关操作是通过SocketServer类完成的。

    处理与broker之间的新连接、请求和响应。Kafka支持两种类型的请求:

    data-plane :处理来自集群中的客户机和其他broker的请求。

    线程模型:

    • 每个listener有1个Acceptor线程,Acceptor线程处理新连接
    • 可以在Kafka的配置文件中以逗号分割的方式指定多个listeners,这样就可以配置多data-planes。
    • Acceptor有N个Processor线程,每个Processor线程都有自己的selector,从socket读取数据。
    • M个Handler线程处理请求,发送响应给正在等待的processor线程。

    control-plane :处理来自controller的请求。可以通过参数“control.plane.listener.name”配置是否开启control-plane。如果没有配置“control.plane.listener.name” 默认data-plane处理来自controller的请求。

    线程模型:

    • 1个Acceptor线程处理新连接,
    • Acceptor有1个Processor thread,Processor thread有自己的selector,从socket读取数据。
    • 1个Handler thread用于处理请求,为等待的processor thread发送响应

    这里有两个核心概念:

    1. Processor线程
    2. Handler线程

    Kafka大概的启动流程

    我们使用脚本zookeeper-server-start.sh

    最终会运行我们的这个类

    kafka.Kafka#main

    会先执行buildServer,然后执行server.startup()。

    这里的server就是KafkaServer或者KafkaRaftServer。

    如果指定了config.requiresZookeeper为true,就是KafkaServer,否则就是KafkaRaftServer。

    kafka.server.KafkaServer#startup

    KafkaServer中的startup方法会创建SocketServer;

    1. socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
    2. socketServer.startup(startProcessingRequests = false)

    SocketServer的startup方法最重要的是执行createDataPlaneAcceptorsAndProcessors

    createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners)

    createDataPlaneAcceptorsAndProcessors具体逻辑

    1. private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = {
    2. endpoints.foreach { endpoint =>
    3. connectionQuotas.addListener(config, endpoint.listenerName)
    4. val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
    5. addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
    6. dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
    7. info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
    8. }
    9. }

    dataProcessorsPerListener就是config.controlPlaneListener。如果用户没有定义,但是定义了主机或端口,那么让我们以向后兼容的方式使用主机或端口。如果主机和端口号都没定义就是用默认的PLAINTEXT://:9092。

    dataProcessorsPerListener=3

    endpoint:属性主要有host, port, listenerName,SecurityProtocol

    主要是解析"PLAINTEXT://" + hostName + ":" + port的来的。

    addDataPlaneProcessors方法

    1. private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = {
    2. val listenerName = endpoint.listenerName
    3. val securityProtocol = endpoint.securityProtocol
    4. val listenerProcessors = new ArrayBuffer[Processor]()
    5. val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty && config.interBrokerListenerName == listenerName
    6. for (_ <- 0 until newProcessorsPerListener) {
    7. val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas,
    8. listenerName, securityProtocol, memoryPool, isPrivilegedListener)
    9. listenerProcessors += processor
    10. dataPlaneRequestChannel.addProcessor(processor)
    11. nextProcessorId += 1
    12. }
    13. listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
    14. acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
    15. }

    listenerProcessors类型是

    val listenerProcessors = new ArrayBuffer[Processor]()

    为每个newProcessorsPerListener都创建一个processor。然后调用dataPlaneRequestChannel.addProcessor。

    为dataPlaneProcessors添加元素。

    最后addProcessors

    acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)

    会执行startProcessors

    1. private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
    2. processors.foreach { processor =>
    3. KafkaThread.nonDaemon(
    4. s"${processorThreadPrefix}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
    5. processor
    6. ).start()
    7. }
    8. }

    startProcessors里面就是开始任务,实际执行的就是processor里面的run方法。

    回到kafka.server.KafkaServer#startup方法

    KafkaServer中的startup方法会创建dataPlaneRequestProcessor

    1. dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
    2. autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
    3. fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)

    可以看出dataPlaneRequestProcessor就是KafkaApis。

    KafkaServer中的startup方法还会创建KafkaRequestHandlerPool对象。

    1. dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
    2. config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

    这里的config.numIoThreads默认是8。

    会调用createHandler方法创建8个KafkaRequestHandler。

    1. for (i <- 0 until numThreads) {
    2. createHandler(i)
    3. }

    createHandler方法

    1. def createHandler(id: Int): Unit = synchronized {
    2. runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    3. KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
    4. }

    创建KafkaRequestHandler会传入apis,apis就是KafkaApis。

    这里会开启8个KafkaRequestHandler线程。

    目前我们了解

    再次回到kafka.server.KafkaServer#startup方法

    上面都执行完之后,会执行

    1. socketServer.startProcessingRequests(authorizerFutures)
    2. -----------------------------------------------------------------
    3. def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
    4. info("Starting socket server acceptors and processors")
    5. this.synchronized {
    6. if (!startedProcessingRequests) {
    7. startControlPlaneProcessorAndAcceptor(authorizerFutures)
    8. startDataPlaneProcessorsAndAcceptors(authorizerFutures)
    9. startedProcessingRequests = true
    10. } else {
    11. info("Socket server acceptors and processors already started")
    12. }
    13. }
    14. info("Started socket server acceptors and processors")
    15. }

    startDataPlaneProcessorsAndAcceptors

    Starts processors of all the data-plane acceptors and all the acceptors of this server.
    1. startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
    2. private def startAcceptorAndProcessors(threadPrefix: String,
    3. endpoint: EndPoint,
    4. acceptor: Acceptor,
    5. authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
    6. debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}")
    7. waitForAuthorizerFuture(acceptor, authorizerFutures)
    8. debug(s"Start processors on listener ${endpoint.listenerName}")
    9. acceptor.startProcessors(threadPrefix)
    10. debug(s"Start acceptor thread on listener ${endpoint.listenerName}")
    11. if (!acceptor.isStarted()) {
    12. KafkaThread.nonDaemon(
    13. s"${threadPrefix}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}",
    14. acceptor
    15. ).start()
    16. acceptor.awaitStartup()
    17. }
    18. info(s"Started $threadPrefix acceptor and processor(s) for endpoint : ${endpoint.listenerName}")
    19. }

    Kafka接收-处理-响应请求的大概流程

    Acceptor线程

    注册关心的事件

    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

    处理新连接伪代码

    1. while (isRunning) {
    2. acceptNewConnections()
    3. }

    最终会将新连接入队

    newConnections.put(socketChannel)

    Processor线程

    这里的Processor就是

    kafka.network.SocketServer#dataPlaneProcessors

    SocketServer类中这样定义

    private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()

    到目前为止我们知道的

    kafka.network.Processor#run

    1. // setup any new connections that have been queued up
    2. configureNewConnections()
    3. // register any new responses for writing
    4. processNewResponses()
    5. poll()
    6. processCompletedReceives()
    7. processCompletedSends()
    8. processDisconnected()
    9. closeExcessConnections()

    configureNewConnections()中会获取之前Acceptor线程存入的新连接。

    1. val channel = newConnections.poll()
    2. selector.register(connectionId(channel.socket), channel)

    processCompletedReceives中会调用requestChannel.sendRequest(req)。重点关注这行代码。

    Handler线程

    这里的Handler线程就是KafkaRequestHandler线程。

    处理流程伪代码

    1. while (!stopped) {
    2. val req = requestChannel.receiveRequest(300)
    3. req match {
    4. case RequestChannel.ShutdownRequest =>
    5. shutdownComplete.countDown()
    6. return
    7. case request: RequestChannel.Request =>
    8. apis.handle(request)
    9. case null => // continue
    10. }
    11. }

    这里的apis.handle(request)就是kafka.server.KafkaApis#handle。

    总结:

    1. Kafka支持两种类型的请求:data-plane,control-plane
    2. KafkaRequestHandlerPool里面有8个KafkaRequestHandler
    3. SocketServer会开启Acceptor线程和Processor线程,Acceptor线程将新连接入队,Processor线程获取新连接,注入新连接到selector上。然后执行poll方法读取连接的数据,读取到数据之后会调用requestChannel.sendRequest(req)。Handler线程线程会执行requestChannel.receiveRequest(300),然后调用KafkaApis的handle方法对请求进行处理。
  • 相关阅读:
    【MySQL系列】- MySQL自动备份详解
    CentOS8安装RabbitMQ
    ComfyUI搭建
    生命形式问题
    淘宝分布式文件存储系统(一) -TFS
    高保真神经网络音频编码器
    C++哈希(无序容器,哈希表)
    【算法】中缀表达式转成后缀表达式(逆波兰表达式)再计算得出结果
    11月20日,每日信息差
    国产芯片频频突破!中国“芯“引发全球关注,美媒彻底慌了
  • 原文地址:https://blog.csdn.net/qq_32907195/article/details/127572878