对于Kafka来说,网络相关操作是通过SocketServer类完成的。
处理与broker之间的新连接、请求和响应。Kafka支持两种类型的请求:
data-plane :处理来自集群中的客户机和其他broker的请求。
线程模型:
control-plane :处理来自controller的请求。可以通过参数“control.plane.listener.name”配置是否开启control-plane。如果没有配置“control.plane.listener.name” 默认data-plane处理来自controller的请求。
线程模型:
这里有两个核心概念:

我们使用脚本zookeeper-server-start.sh
最终会运行我们的这个类
kafka.Kafka#main
会先执行buildServer,然后执行server.startup()。
这里的server就是KafkaServer或者KafkaRaftServer。
如果指定了config.requiresZookeeper为true,就是KafkaServer,否则就是KafkaRaftServer。
kafka.server.KafkaServer#startup
KafkaServer中的startup方法会创建SocketServer;
- socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
- socketServer.startup(startProcessingRequests = false)
SocketServer的startup方法最重要的是执行createDataPlaneAcceptorsAndProcessors
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners)
createDataPlaneAcceptorsAndProcessors具体逻辑
- private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = {
- endpoints.foreach { endpoint =>
- connectionQuotas.addListener(config, endpoint.listenerName)
- val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
- addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
- dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
- info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
- }
- }
dataProcessorsPerListener就是config.controlPlaneListener。如果用户没有定义,但是定义了主机或端口,那么让我们以向后兼容的方式使用主机或端口。如果主机和端口号都没定义就是用默认的PLAINTEXT://:9092。
dataProcessorsPerListener=3
endpoint:属性主要有host, port, listenerName,SecurityProtocol
主要是解析"PLAINTEXT://" + hostName + ":" + port的来的。
addDataPlaneProcessors方法
- private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = {
- val listenerName = endpoint.listenerName
- val securityProtocol = endpoint.securityProtocol
- val listenerProcessors = new ArrayBuffer[Processor]()
- val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty && config.interBrokerListenerName == listenerName
-
- for (_ <- 0 until newProcessorsPerListener) {
- val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas,
- listenerName, securityProtocol, memoryPool, isPrivilegedListener)
- listenerProcessors += processor
- dataPlaneRequestChannel.addProcessor(processor)
- nextProcessorId += 1
- }
- listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
- acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
- }
listenerProcessors类型是
val listenerProcessors = new ArrayBuffer[Processor]()
为每个newProcessorsPerListener都创建一个processor。然后调用dataPlaneRequestChannel.addProcessor。
为dataPlaneProcessors添加元素。
最后addProcessors
acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
会执行startProcessors
- private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
- processors.foreach { processor =>
- KafkaThread.nonDaemon(
- s"${processorThreadPrefix}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
- processor
- ).start()
- }
- }
startProcessors里面就是开始任务,实际执行的就是processor里面的run方法。
KafkaServer中的startup方法会创建dataPlaneRequestProcessor
- dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
- autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
- fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
可以看出dataPlaneRequestProcessor就是KafkaApis。
KafkaServer中的startup方法还会创建KafkaRequestHandlerPool对象。
- dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
- config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
这里的config.numIoThreads默认是8。
会调用createHandler方法创建8个KafkaRequestHandler。
- for (i <- 0 until numThreads) {
- createHandler(i)
- }
createHandler方法
- def createHandler(id: Int): Unit = synchronized {
- runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
- KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
- }
创建KafkaRequestHandler会传入apis,apis就是KafkaApis。
这里会开启8个KafkaRequestHandler线程。
目前我们了解

上面都执行完之后,会执行
- socketServer.startProcessingRequests(authorizerFutures)
- -----------------------------------------------------------------
- def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
- info("Starting socket server acceptors and processors")
- this.synchronized {
- if (!startedProcessingRequests) {
- startControlPlaneProcessorAndAcceptor(authorizerFutures)
- startDataPlaneProcessorsAndAcceptors(authorizerFutures)
- startedProcessingRequests = true
- } else {
- info("Socket server acceptors and processors already started")
- }
- }
- info("Started socket server acceptors and processors")
- }
startDataPlaneProcessorsAndAcceptors
Starts processors of all the data-plane acceptors and all the acceptors of this server.
- startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
- private def startAcceptorAndProcessors(threadPrefix: String,
- endpoint: EndPoint,
- acceptor: Acceptor,
- authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
- debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}")
- waitForAuthorizerFuture(acceptor, authorizerFutures)
- debug(s"Start processors on listener ${endpoint.listenerName}")
- acceptor.startProcessors(threadPrefix)
- debug(s"Start acceptor thread on listener ${endpoint.listenerName}")
- if (!acceptor.isStarted()) {
- KafkaThread.nonDaemon(
- s"${threadPrefix}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}",
- acceptor
- ).start()
- acceptor.awaitStartup()
- }
- info(s"Started $threadPrefix acceptor and processor(s) for endpoint : ${endpoint.listenerName}")
- }
注册关心的事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
处理新连接伪代码
- while (isRunning) {
- acceptNewConnections()
- }
最终会将新连接入队
newConnections.put(socketChannel)
这里的Processor就是
kafka.network.SocketServer#dataPlaneProcessors
SocketServer类中这样定义
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
到目前为止我们知道的

- // setup any new connections that have been queued up
- configureNewConnections()
- // register any new responses for writing
- processNewResponses()
- poll()
- processCompletedReceives()
- processCompletedSends()
- processDisconnected()
- closeExcessConnections()
configureNewConnections()中会获取之前Acceptor线程存入的新连接。
- val channel = newConnections.poll()
- selector.register(connectionId(channel.socket), channel)
processCompletedReceives中会调用requestChannel.sendRequest(req)。重点关注这行代码。
这里的Handler线程就是KafkaRequestHandler线程。
处理流程伪代码
- while (!stopped) {
- val req = requestChannel.receiveRequest(300)
- req match {
- case RequestChannel.ShutdownRequest =>
- shutdownComplete.countDown()
- return
- case request: RequestChannel.Request =>
- apis.handle(request)
- case null => // continue
- }
- }
这里的apis.handle(request)就是kafka.server.KafkaApis#handle。