Kafka网络模型

  Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的。Kafka的网络通信层模型,主要采用了 1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程) 。

线程数 线程名 线程具体说明
1 kafka-socket-acceptor Acceptor线程,负责监听Client端发起的请求
N kafka-network-thread Processor线程,负责对Socket进行读写
M kafka-request-handler Worker线程,处理具体的业务逻辑并生成Response返回

Kafka网络通信层的完整框架图如下图所示:
在这里插入图片描述
Kafka网络通信模型中的几个重要概念:

(1)Accept Thread:负责与客户端建立连接链路,然后把Socket轮转交给Process Thread

(2)Process Thread:负责接收请求和响应数据,Process Thread每次基于Selector事件循环,首先从Response Queue读取响应数据,向客户端回复响应,然后接收到客户端请求后,读取数据放入Request Queue

(3)KafkaRequestHandler :M个请求处理线程,包含在线程池—KafkaRequestHandlerPool内部,从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给KafkaApis处理,M的大小由 “num.io.threads” 决定

(4)RequestChannel :其为Kafka服务端的请求通道,该数据结构中包含了一个全局的请求队列 requestQueue和多个与Processor处理器相对应的响应队列responseQueue,提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方

(5)NetworkClient :其底层是对 Java NIO 进行相应的封装,位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送

(6)SocketServer :其是一个NIO的服务,它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式,将接收客户端请求和处理请求相分离

(7)KafkaServer :代表了一个Kafka Broker的实例;其startup方法为实例启动的入口

(8)KafkaApis :Kafka的业务逻辑处理Api,负责处理不同类型的请求;比如 “发送消息”、 “获取消息偏移量—offset” 和 “处理心跳请求” 等

Kafka网络通信层的设计与具体实现

在这里插入图片描述
Kafka的通信流程:

  • Client向Server发送请求时,Acceptor负责接收TCP请求,连接成功后传递给Processor线程
  • Processor线程接收到新的连接后,将其注册到自身的Selector中,并监听READ事件
  • 当Client在当前连接对象上写入数据时,会触发READ事件,根据TCP协议调用Handler进行处理
  • Handler处理完成后,可能会有返回值给Client,并将Handler返回的结果绑定Response端进行发送

  网络通信层包括几个重要元素:SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。

SocketServer
  SocketServer是接收客户端Socket请求连接、处理请求并返回处理结果的核心类,Acceptor及Processor的初始化、处理逻辑都是在这里实现的。在KafkaServer实例启动时会调用其startup的初始化方法,会初始化1个 Acceptor和N个Processor线程(每个EndPoint都会初始化,一般来说一个Server只会设置一个端口)

Acceptor
  Acceptor是一个继承自抽象类AbstractServerThread的线程类。Acceptor的主要任务是监听并且接收客户端的请求,同时建立数据传输通道—SocketChannel,然后以轮询的方式交给一个后端的Processor线程处理(具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理)。

在该线程类中主要有两个重要的变量:
(1)nioSelector :通过NSelector.open()方法创建的变量,封装了JAVA NIO Selector的相关操作
(2)serverChannel :用于监听端口的服务端Socket套接字对象

  Acceptor线程启动后,首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OP_ACCEPT 事件。然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用accept()方法对OP_ACCEPT事件进行处理。这里,Processor是通过 round robin 方法选择的,这样可以保证后面多个Processor线程的负载基本均匀。

Acceptor的accept()方法的作用主要如下:

(1)通过SelectionKey取得与之对应的serverSocketChannel实例,并调用它的accept()方法与客户端建立连接

(2)调用connectionQuotas.inc()方法增加连接统计计数;并同时设置第(1)步中创建返回的socketChannel属性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)

(3)将socketChannel交给processor.accept()方法进行处理。这里主要是将socketChannel加入Processor处理器的并发队列newConnections队列中,然后唤醒Processor线程从队列中获取socketChannel并处理。其中,newConnections会被Acceptor线程和Processor线程并发访问操作,所以newConnections是ConcurrentLinkedQueue队列(一个基于链接节点的无界线程安全队列)

Processor
  Processor同Acceptor一样,也是一个线程类,继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。在该线程类中有以下几个重要的变量:
(1)newConnections :在上面的 Acceptor 一节中已经提到过,它是一种ConcurrentLinkedQueue[SocketChannel]类型的队列,用于保存新连接交由Processor处理的socketChannel

(2)inflightResponses :是一个Map[String, RequestChannel.Response]类型的集合,用于记录尚未发送的响应

(3)selector :是一个类型为KSelector变量,用于管理网络连接

Processor处理器线程run方法执行的流程如图所示:
在这里插入图片描述
上面的流程图中能够可以看出Processor处理器线程在其主流程中主要完成了这样子几步操作:

(1)处理newConnections队列中的socketChannel 。遍历取出队列中的每个socketChannel并将其在selector上注册OP_READ事件

(2)处理RequestChannel中与当前Processor对应响应队列中的Response 。在这一步中会根据responseAction的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OP_WRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的

(3)调用selector.poll()方法进行处理 。该方法底层即为调用nioSelector.select()方法进行处理

(4)处理已接受完成的数据包队列—completedReceives 。在processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的OP_READ事件

(5)处理已发送完的队列—completedSends 。当已经完成将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OP_READ事件

(6)处理断开连接的队列 。将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1

RequestChannel
  在Kafka的网络通信层中,RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,是通信过程中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中;KafkaRequestHandler线程从请求队列中获取并处理,处理完以后将Response添加至RequestChannel的响应队列—responseQueue中,并通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送至客户端

KafkaRequestHandler
  KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干个KafkaRequestHandler线程),这些线程以守护线程的方式在后台运行。在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,读取后再交由KafkaApis来具体处理

用这样以下几点优势:

  • 能够单独指定Handler的线程数,便于调优和管理
  • 防止一个过大的请求阻塞一个Processor线程
  • Request、Handler、Response之间都是通过队列来进行连接的,这样它们彼此之间不存在耦合现象,对提升Kafka系统的性能很有帮助

KafkaApis
  KafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。该组件反映出Kafka Broker Server可以提供哪些服务

猜你喜欢

转载自blog.csdn.net/ThreeAspects/article/details/105938148