首先一个普通的produce代码如下:
final String kafkazk="localhost:9092";
String topic="testAPI";
Properties properties = new Properties() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkazk);
put(ProducerConfig.ACKS_CONFIG, "all");
put(ProducerConfig.RETRIES_CONFIG, 0);
put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
put(ProducerConfig.LINGER_MS_CONFIG, 1);
put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
}};
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for(int i=0;i<10000000;i++){
Future<RecordMetadata> send = producer.send(new ProducerRecord<>(topic, UUID.randomUUID().toString(), String.valueOf(i)));
try {
System.out.println(send.get().partition());
}catch (Exception e){
e.printStackTrace();
}
查看源码,找到KafkaProducer(位于package org.apache.kafka.clients.producer.KafkaProducer),并找到send部分代码可得:
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
send代码本质上调用的是doSend方法,返回的是Future<RecordMetadata>,里面包含的是本次发送的数据返回的meradata即数据所在的分区、数据偏移量等相关信息。 再查看dosend方法
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
dosend中包含两个参数,record和callback函数,前面文章中已介绍多它的含义,再次不再赘述。发送第一个创建的是ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);这个是集群的元数据相关,它的作用是更新集群的元数据,因为即将要插入数据,为了能让数据正确的插入,必须保证集群的分区、偏移量等是最新的,所以要WaitTime,怎么保证是最新的呢?
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
metadata.add(topic);
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded.
// In case we already have cached metadata for the topic, but the requested partition is greater
// than expected, issue an update request only once. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
//检测超时时间
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
//检测权限
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}
第一行 metadata.add(topic);
public synchronized void add(String topic) {
if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
requestUpdateForNewTopics();
}
}
private synchronized void requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0;
requestUpdate();
}
public synchronized int requestUpdate() {
this.needUpdate = true;
return this.version;
}
如果将topic加入时,发现元数据中并没有该topic即topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null,那么就将this.needUpdate设置为true,这是要更新元数据的标志,再返回当前元数据的version。
Integer partitionsCount = cluster.partitionCountForTopic(topic);
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
首先从cluster中根据topic获取partition相关信息,如果能够获取到即partitionsCount!=null,说明当前元数据中包含有该topic的元数据,则不需要更新,否则sender.wakeup();唤醒线程,metadata.awaitUpdate(version, remainingWaitMs);等待更新,更新的代码如下:
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
//比较当前的版本号是不是比最近的版本号大来判断是否更新完成
while (this.version <= lastVersion) {
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
只有更新之后的this.version>当前的version才能算是更新成功,否则,会等待,知道超时。 下面,我们来看看metadata相关代码:
public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
private final long refreshBackoffMs;
private final long metadataExpireMs;
private int version;
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
private Cluster cluster;
private boolean needUpdate;
/* Topics with expiry time */
private final Map<String, Long> topics;
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
private boolean needMetadataForAllTopics;
private final boolean topicExpiryEnabled;
/**
* Create a metadata instance with reasonable defaults
*/
public Metadata() {
this(100L, 60 * 60 * 1000L);
}
public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
}
/**
* Create a new Metadata instance
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
//两次发出更新cluster保存元数据信息的最小时间差,默认是100ms
//方式更新操作过于频繁而造成的网络阻塞
this.refreshBackoffMs = refreshBackoffMs;
//每隔多久更新一次,默认是300*1000,也就是5分钟
this.metadataExpireMs = metadataExpireMs;
this.topicExpiryEnabled = topicExpiryEnabled;
//上次跟新元数据的时间戳
this.lastRefreshMs = 0L;
//上次更新成功的时间戳,如果成功,lastRefreshMs=lastSuccessfulRefreshM
// 否则,不相等
this.lastSuccessfulRefreshMs = 0L;
//纪录kafka集群元数据的版本号,kafka集群数据每更新成功
//一次,version字段加1,
this.version = 0;
this.cluster = Cluster.empty();
//标识是否强制更新cluster,是触发sender线程更新集群元数据的条件
this.needUpdate = false;
//纪录当前一直的所有topic,在cluster中纪录了topic最新元数据
this.topics = new HashMap<>();
//监听metadata更新的的监听器集合,更新之前会通知所有的监听器
this.listeners = new ArrayList<>();
this.clusterResourceListeners = clusterResourceListeners;
//是否需要更细全部的topic的元数据,一般情况下,kafka只会维护用到的topic元数据,是所有topic的子集
this.needMetadataForAllTopics = false;
}
/**
* Get the current cluster info without blocking
*/
public synchronized Cluster fetch() {
return this.cluster;
}
/**
* Add the topic to maintain in the metadata. If topic expiry is enabled, expiry time
* will be reset on the next update.
*/
public synchronized void add(String topic) {
if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
requestUpdateForNewTopics();
}
}
/**
* The next time to update the cluster info is the maximum of the time the current info will expire and the time the
* current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
* is now
*/
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}
/**
* Request an update of the current cluster metadata info, return the current version before the update
*/
public synchronized int requestUpdate() {
this.needUpdate = true;
return this.version;
}
/**
* Check whether an update has been explicitly requested.
* @return true if an update was requested, false otherwise
*/
public synchronized boolean updateRequested() {
return this.needUpdate;
}
/**
* Wait for metadata update until the current version is larger than the last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
//比较当前的版本号是不是比最近的版本号大来判断是否更新完成
while (this.version <= lastVersion) {
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
/**
* Replace the current set of topics maintained to the one provided.
* If topic expiry is enabled, expiry time of the topics will be
* reset on the next update.
* @param topics
*/
public synchronized void setTopics(Collection<String> topics) {
if (!this.topics.keySet().containsAll(topics)) {
requestUpdateForNewTopics();
}
this.topics.clear();
for (String topic : topics)
this.topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE);
}
/**
* Get the list of topics we are currently maintaining metadata for
*/
public synchronized Set<String> topics() {
return new HashSet<>(this.topics.keySet());
}
/**
* Check if a topic is already in the topic set.
* @param topic topic to check
* @return true if the topic exists, false otherwise
*/
public synchronized boolean containsTopic(String topic) {
return this.topics.containsKey(topic);
}
/**
* Updates the cluster metadata. If topic expiry is enabled, expiry time
* is set for topics if required and expired topics are removed from the metadata.
*
* @param cluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose
* leader is not known
* @param now current time in milliseconds
*/
public synchronized void update(Cluster cluster, Set<String> unavailableTopics, long now) {
Objects.requireNonNull(cluster, "cluster should not be null");
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
if (topicExpiryEnabled) {
// Handle expiry of topics from the metadata refresh set.
for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, Long> entry = it.next();
long expireMs = entry.getValue();
if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
entry.setValue(now + TOPIC_EXPIRY_MS);
else if (expireMs <= now) {
it.remove();
log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
}
}
}
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster, unavailableTopics);
String previousClusterId = cluster.clusterResource().clusterId();
if (this.needMetadataForAllTopics) {
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
this.needUpdate = false;
this.cluster = getClusterForCurrentTopics(cluster);
} else {
this.cluster = cluster;
}
// The bootstrap cluster is guaranteed not to have any useful information
if (!cluster.isBootstrapConfigured()) {
String clusterId = cluster.clusterResource().clusterId();
if (clusterId == null ? previousClusterId != null : !clusterId.equals(previousClusterId))
log.info("Cluster ID: {}", cluster.clusterResource().clusterId());
clusterResourceListeners.onUpdate(cluster.clusterResource());
}
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
/**
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
*/
public synchronized void failedUpdate(long now) {
this.lastRefreshMs = now;
}
/**
* @return The current metadata version
*/
public synchronized int version() {
return this.version;
}
/**
* The last time metadata was successfully updated.
*/
public synchronized long lastSuccessfulUpdate() {
return this.lastSuccessfulRefreshMs;
}
/**
* Set state to indicate if metadata for all topics in Kafka cluster is required or not.
* @param needMetadataForAllTopics boolean indicating need for metadata of all topics in cluster.
*/
public synchronized void needMetadataForAllTopics(boolean needMetadataForAllTopics) {
if (needMetadataForAllTopics && !this.needMetadataForAllTopics) {
requestUpdateForNewTopics();
}
this.needMetadataForAllTopics = needMetadataForAllTopics;
}
/**
* Get whether metadata for all topics is needed or not
*/
public synchronized boolean needMetadataForAllTopics() {
return this.needMetadataForAllTopics;
}
/**
* Add a Metadata listener that gets notified of metadata updates
*/
public synchronized void addListener(Listener listener) {
this.listeners.add(listener);
}
/**
* Stop notifying the listener of metadata updates
*/
public synchronized void removeListener(Listener listener) {
this.listeners.remove(listener);
}
/**
* MetadataUpdate Listener
*/
public interface Listener {
/**
* Callback invoked on metadata update.
*
* @param cluster the cluster containing metadata for topics with valid metadata
* @param unavailableTopics topics which are non-existent or have one or more partitions whose
* leader is not known
*/
void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics);
}
private synchronized void requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0;
requestUpdate();
}
private Cluster getClusterForCurrentTopics(Cluster cluster) {
Set<String> unauthorizedTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
Set<String> internalTopics = Collections.emptySet();
String clusterId = null;
if (cluster != null) {
clusterId = cluster.clusterResource().clusterId();
internalTopics = cluster.internalTopics();
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
unauthorizedTopics.retainAll(this.topics.keySet());
for (String topic : this.topics.keySet()) {
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
if (partitionInfoList != null) {
partitionInfos.addAll(partitionInfoList);
}
}
nodes = cluster.nodes();
}
return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics);
}
}
从metadata源码中可以发现,元数据更新有两种情况,一个是5分钟的定时更新,还有一个就是强制更新,每次更新version就回加1,在update方法中,当更新元数据时,cluster监听器就会从zk中读取最新的元数据,更新当前内存中的,然后返回。
未完待续....