- ProducerInterceptor对消息进行拦截
- Serializer对消息的key和value进行序列化
- Partitioner为消息选择合适的Partition
- RecordAccumulator收集消息,实现批量发送
- sender从RecordAccumulator获取消息
- 构造一个ClientRequest,这里是KafkaClient
- 将ClientRequest交给NetworkClient,准备发送
- NetworkClient将请求放入KafkaChannel的缓存
- 执行网络IO,发送请求
- 收到相应,调用ClientRequest的回调函数
- 调用RecordBatch的回调函数,最终调用每个消息上注册的回调函数。
消息发送的过程中,涉及到两个线程协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去,需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象
KafkaProducer实现了Producer接口,在Producer接口中定义KafkaProducer对外提供的API
- send()方法,发送消息,实际是将消息放入RecordAccumulator暂存,等待发送
- flush()方法:刷新操作,等待RecordAccumulator中所有消息发送完成,在刷新完成之前会阻塞调用的线程
- partitionsFor()方法:在KafkaProducer中维护了一个Metadata对象用于存储kafka集群的元数据,Metadata中的元数据会定期更新,partitionsFor()方法负责从Metadata中获取指定Topic中的分区信息
- close()方法:关闭此Producer对象,主要操作是设置close标志,等待RecordAccumulator中的消息清空,关闭sender线程
- metrics()方法:用于记录统计信息,与消息发送的流程无关
更多请关注微信公众号