一、Kafka 生产者工作流程
在发送消息的过程中,有两个线程协同工作 —— Main 线程和 Sender 线程。Main 线程负责处理数据,指定发送的位置,然后暂存起来,Sender 线程负责把暂存的数据传输给 Kafka Broker。
图自:尚硅谷
1. Main 线程详解
1)创建 RecordAccumulator
Main 线程会创建一个容器,我们暂且称之为记录累加器(RecordAccumulator),其默认大小为 32m,这是一个缓冲区,缓冲区内有一个容器ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
,用于存放要被发送的数据,其中 Key 为指向分区,Value 存放将要被发送的数据,ProducerBatch
是一个双端队列。
2)处理数据
数据在存入缓冲区前,需要经过拦截器、序列化器和分区器的处理:
- 拦截器在生产端用的较少,我们可以自定义拦截规则拦截数据
- 序列化器很好理解,我们可以通过指定
key.serializer
和value.serializer
参数,来指定如何序列化数据 - 分区器通过解析参数,确定数据需要发送的分区,发往对应的 ProducerBatch 中
Main 线程工作流程图,来自我寄几:
2. Sender 线程详解
Sender 线程会拉去 ProduceBatch 中的数据,再通过 Http 请求的方式,向 Kafka Broker 发送数据。那么 Sender 线程就会涉及几个问题:
- 何时拉取数据?
- 如何确认消息发送成功?
- 可以同时进行多少个请求?
1)何时拉取 ProducerBatch 中的数据?
在生产者参数中,有一个 batch.size
项,默认是 16k,这个配置项就是控制 ProducerBatch 这个双端队列的大小,当数据累计到配置的值时,Sender 线程就会将里面的数据拉走。
但如果数据一直达不到配置的大小呢?总不能一直不拉取数据吧,这样在使用者看起来,消费者迟迟收不到生产的数据,这是不合理的,因此有另一个配置项 linger.ms
,当数据迟迟达不到 batch.size
时,Sender 线程等待了超过 linger.ms
设置的时间,也会拉取数据,linger.ms
的默认值是 0ms,也就是说有数据就会被立即拉走。
2)如何确认消息发送成功?
在生产环境下,消息的发送往往都不是一帆风顺,如网络波动、Kafka Broker 挂掉,等情况都有可能导致消息持久化失败,这就涉及一个问题,在什么情况下 Producer 会认为消息已经发送成功了呢?这里引入一个参数 acks
,它有三个可配置的值:
acks=0
:生产者将不会等待来自服务器的任何确认,该记录将立即添加到缓冲区并视为已发送acks=1
(默认值):Leader 会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果 Leader 在确认记录后立即失败,则记录将会丢失acks=all
:相当于acks=-1
,Leader 将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证
如果害怕消息发送失败,还可以通过配置
retries
参数来激活重试机制,发送失败 Sender 线程会自动重试
3)可以同时进行多少个请求?
生产端的 Sender 线程会缓存一个请求队列,默认每个 Broker 最多可以缓存 5 个请求,可以通过配置 max.in.flight.requests.per.connection
值来改变。
由于在 Kafka 1.X 以后,Kafka 服务端可以缓存生产者发来的最近的五个请求元数据,所以在五个请求内,都能保证数据的顺序。
Sender 线程工作流程图,来自我寄几:
二、生产者常用参数
这个小节列举一些生产者常用的配置项,有印象、了解即可
参数名 | 作用 |
---|---|
key.serializer 和 value.serializer |
指定发送消息的 key 和 value 的序列化类型,一定要写类的全限定名 |
buffer.memory |
缓冲区 RecordAccumulator 总大小,默认 32m |
batch.size |
缓冲区内的批次队列 ProducerBatch 大小,默认 16k |
linger.ms |
如果数据迟迟未达到 batch.size,Sender 等待 linger.time 之后就会发送数据。默认值是 0ms,表示没 有延迟。生产环境一般设置为 50ms |
acks |
0:生产者发送过来的数据,不需要等数据落盘应答 1:生产者发送过来的数据,Leader 收到数据后应答 -1(默认值):生产者发送过来的数据,Leader 和 ISR 队列里面的所有节点收齐数据后应答 |
max.in.flight.requests.per.connection |
Sender 线程缓存的请求数,也是就是允许没有 ack 的请求次数,默认为 5 |
retries |
消息发送失败后重试的次数,如果需要保证数据的顺序性 应该把 Sender 线程缓存的请求数设置为1,否则其他消息可能先发送成功 |
retry.backoff.ms |
两次重试之间的时间间隔,默认是 100ms |
compression.type |
生者者发送数据的时候是否压缩,默认是 none,支持 gzip、snappy、lz4 和 zstd,生产环境一般使用 snappy |
三、示例:使用 API 向 Kafka 发送消息
版本:
- Kafka 3.2
- kafka-client 3.2
引入依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
1. 同步发送
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class BaseProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//配置生产者
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 往指定的 Topic 里面发送 数据 hello,同步发送
RecordMetadata hello = producer.send(new ProducerRecord<>("topic-test", "hello")).get();
System.out.println(hello);
producer.close();
}
}
2. 异步发送,带会回调函数
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class BaseProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//配置生产者
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 往指定的 Topic 里面发送 数据 hello,同步发送
producer.send(new ProducerRecord<>("topic-test", "hello"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("消息发送失败,原因:" + exception.getMessage());
}
System.out.println("消息发送成功," + metadata.offset() + metadata.hasOffset() + metadata.topic() + metadata.hasTimestamp() + metadata.timestamp() + metadata.partition());
}
});
producer.close();
}
}
四、生产者分区策略
**Kafka 为什么要分区?**分区有两个好处:
- 便于合理使用存储资源:一个主题可以有多个分区,分区可以分布在不同的 Broker 上,海量的数据被分成一块一块存储在不同的服务器,合理控制分区的任务,可以实现负载均衡的效果
- 提高并行度:生产者可以指定分区发送数据,消费者可以指定分区进行消费,达到了类似多线程的效果
1. 默认分区器
默认分区器 DefaultPartitioner
描述了默认的分区策略,在源码中可以找到它的注释。
我来翻译翻译:
- 有 Partition:直接将数据存入对应的分区
- 没有 Partition 有 Key:将通过
Key的Hash值 % 主题的分区数
来得到一个 Partition 值 - 没有 Partition 没有 Key:采用 Sticky Partition(粘性分区器),会随机选择一个分区,并尽可能的一直使用这个分区,待该分区的 ProducerBatch 满了或者已完成,再随机选择其他的分区(不会重复使用上一次的分区)
2. 自定义分区器
首先我们有创建一个分区器,通过实现 org.apache.kafka.clients.producer.Partitioner
接口,可以得到一个自定义分区器,通过实现方法,就可以自定义分区规则:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partition = 0;
if (key.equals("1")) {
partition = 1;
}
return partition;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
在创建生产者的时候,我们可以将自定义分区器配置到参数中即可:
Properties properties = new Properties();
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);