KafkaProducer简介
KafkaProducer是线程安全的,是一个不可变对象。负责将数据发送的kafka集群。生产者由一个缓冲空间池和一个后台I / O线程组成,该缓冲池保存尚未传输到服务器的记录,该I / O线程负责将这些记录转换为请求并将它们传输到集群。 使用后如果无法关闭生产者,则会泄漏这些资源。生产者发送数据时是异步的,将数据放入缓冲区使得数据可以进行批处理,以提高发送的效率。生产者为每个分区维护未发送记录的缓冲区。 这些缓冲区的大小由 batch.size 配置指定。 增大它可以进行更多的批处理,但是需要更多的内存。
KafkaProducer构造器
- 使用Map作为参数
public KafkaProducer(Map<String, Object> configs) {
this(new ProducerConfig(configs), null, null);
}
- 使用Properties作为参数(官方推荐)
public KafkaProducer(Properties properties) {
this(new ProducerConfig(properties), null, null);
}
传入的参数会新建一个ProducerConfig对象,ProducerConfig类为 Kafka Producer的配置类。还有其他构造器需指定key/value的序列化方式,在此不展开。
KafkaProducer发送数据的流程
KafkaProducer发送数据的send方法,支持回调,也可以直接发送数据。
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
send方法会调用doSend方法真正发送数据。在doSend方法执行前,如果用户指定了拦截器,record首先会先经过拦截器,对数据进行处理。
/**
* 实现异步发送数据到topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 首先确保该topic的元数据信息可用
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
//对key/value进行序列化
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
//对record进行分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
//将数据添加到缓存
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
//当缓冲区满时,唤醒sender线程发送数据到集群
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
生产者发送数据步骤:
1. 主线程对元数据进行检查,确保元数据可用
2. 使用序列化器,主线程对key/value进行序列化
3. 分区器对record进行分区
4. 将数据添加到缓存
5. 唤醒I/O线程,将数据发送到集群
KafkaProducer的重要配置参数
1.acks
acks确认机制,表示必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的。
分区副本: 分区副本指kafka分区复制的副本,分为领导者副本(Leader Replica)和追随者副本(Follower Replica),由replication.factor参数指定。领导者副本负责对外提供服务,追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
同步副本(In-sync replicas):ISR是一个列表,存储的是已经与Leader同步的副本,没有与Leader同步的副本不在此列表中。
min.insync.replicas:最小同步副本数。该参数控制的是消息至少被写入到多少个副本才算是"真正写入",该值默认值为1,生产环境设定为一个大于1的值可以提升消息的持久性. 因为如果同步副本的数量低于该配置值,则生产者会收到错误响应,从而确保消息不丢失。
了解了这些概念之后,我们来看以下acks的值对生产者发送消息的影响。
- acks = 0,表示生产者成功写入消息之前不会等待broker发出的任何响应,生产者不管服务器有没有收到消息,一旦服务器出现问题,生产者无从得知消息有没有发送成功,那么消息就可能会丢失。该配置因为不需要等待服务器的响应,所以可以以网络支持的最大速度发送数据,可以达到很高的吞吐量。
- acks = 1,表示只要Leader分区副本收到消息,就会向生产者发送一个成功的ack,此时生产者就认为消息是发送成功的,一旦无法写入该Leader副本,生产者会收到一个错误相应,为了避免数据丢失,生产者会重新发送数据。但如果此时Leader节点发生了变化,如果新的Leader节点没有收到之前生产者发送的数据,由于Leader节点提供服务,则就会造成之前发送的数据丢失。
- acks = all,表示只有ISR所有同步副本都收到消息时,生产者才认为该消息是写入成功的,而如果min.insync.replicas = 1,最小同步副本数为1的话。则会造成跟acks = 1同样的情况,所以在生产环境中,如果想确保生产者发送的数据是exectly-once的话,除了指定acks = all之外,还要确保min.insync.replicas > 1。
2.batch.size和linger.ms
batch.size是producer批量发送的基本单位,指定各分区的缓冲区大小。
lingger.ms是sender线程在检查batch是否ready时候,判断有没有过期的参数,默认大小是0ms。即生产者发送数据的出发条件为batch.size和lingger.ms满足任意一个即出发sender线程发送数据。
3.enable.idempotence
enable.idempotence指生产者是否开启幂等性,保证消息发送到分区的精确一次性语义。当enable.idempotence = true时,retries将默认为Integer.MAX_VALUE,acks默认为all。注意,幂等性只能保证单分区和单个session中的幂等性,无法保证多分区的幂等性。要想实现多分区的幂等性,可以通过开启事务来实现。
3.enable.idempotence
要使用事务生成器和附带的API,必须设置 transactional.id配置属性。当设置了transactional.id时,则会自动启用等幂性以及幂等性依赖的生产者配置。 此外,交易中包含的主题应配置为具有持久性。 特别是, replication.factor至少应为3 ,并且这些主题的min.insync.replicas 应设置为2。 最后,为了实现端到端的交易保证,必须将消费者配置为仅读取已提交的消息。 transactional.id 的目的是在单个生产者实例的多个会话之间实现事务恢复。 需确保每个生产者由唯一的 transactional.id 。 这样,它对于分区应用程序中运行的每个生产者实例都应该是唯一的。