Kafka producer客户端源码解析
为分析Kafka producer源码,我们先看一下producer发送消息的示例,通过示例跟踪源码,研究producer在整个发送过程中做了那些操作,以及架构设计
发送消息示例
构造属性对象
属性对象设置必要的Kafka producer端配置信息
//创建属性对象
Properties properties = new Properties();
//设置broker的地址,可以支持多个,这样当某些broker挂掉还可以使用其它broker。必须指定
properties.put("bootstrap.servers", "172.23.16.84:9092");
//发送到broker端的任何消息格式都必须是字节数组,因此需要进行序列化,指定key的序列化器,除使用默认提供的
//序列化器,也可以通过实现Serializer接口,创建自定义序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//指定value部分的序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//指定Producer确认请求发送完成之前需要搜集到的反馈信息数量,0将不等待任何服务器的反馈直接认为发送成功
//; 1将等待leader节点写入本地日志,则认为发送成功,不关心follower节点同步情况,leader节点挂掉,但follwer节点又
//没来得及同步,将导致消息丢失;all或-1 leader节点将等待所有副本确认之后在确认这条消息发送成功
properties.put("acks", "-1");
//设置发送消息重试次数
properties.put("retries", 3);
//批次大小
properties.put("batch.size", 323840);
//延迟时间
properties.put("linger.ms", 10);
//缓冲大小
properties.put("buffer.memory", 33554432);
//最大阻塞时间
properties.put("max.block.ms", 3000);
//设置压缩算法,默认不压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
构造生产者
使用上步构造的属性对象创建生产者
//构建生产者
Producer producer = new KafkaProducer(properties);
同步发送消息
调用生产者的send方法完成消息发送
// 同步发送消息
try {
producer.send(new ProducerRecord("my-topic",
Integer.toBinaryString(1), Integer.toString(1))).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
关闭生产者
消息发送完毕后,如果不需要继续使用生产者,需要关闭,释放资源
//关闭生产者
producer.close();
发送消息源码解析
上面展示了一个典型的producer发送消息流程。针对上述流程我们来研究一下Kafka producer端发送消息源码。对于第一步的构造配置属性对象,并没有什么可以研究的,无非就是设置属性及对应的值。我们从第二步开始分析
使用属性对象构造生产者
调用KafkaProducer(properties)构造方法初始化生产者,此构造函数内部调用重载构造函数KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer)完成初始化,其中keySerializer、valueSerializer指定key/value序列化类,会覆盖属性对象中设置的key/value序列化类
public KafkaProducer(Properties properties) {
this(properties, null, null);
}
public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(Utils.propsToMap(properties), keySerializer, valueSerializer);
}
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
}
接着继续调用重载构造函数KafkaProducer(Map<String, Object> configs, Serializer keySerializer, Serializer valueSerializer),完成构造生产者,在调用此构造函数之前,调用propsToMap(Properties properties)先将Properties属性配置转换成Map类型并做了必要的校验,Properties类其实是java.util包下的继承Hashtable<Object,Object>的Map结构。在propsToMap方法中遍历properties对象,并检查key是否是String类型,如果不是则抛出ConfigException异常
public static Map<String, Object> propsToMap(Properties properties) {
Map<String, Object> map = new HashMap<>(properties.size());
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
if (entry.getKey() instanceof String) {
String k = (String) entry.getKey();
map.put(k, properties.get(k));
} else {
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
}
}
return map;
}
接着继续调用重载构造函数KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient,ProducerInterceptors<K, V> interceptors, Time time) 完成生产者创建。在调用此构造函数之前,先对configs, keySerializer, valueSerializer三个参数进行了如下处理
-
首先调用ProducerConfig.appendSerializerToConfig(Map<String, Object> configs, Serializer<?> keySerializer, Serializer<?> valueSerializer)方法将keySerializer、valueSerializer覆盖设置到configs中,前提是二者都不为空
-
之后调用ProducerConfig(Map<String, Object> props)构造函数,使用上一步处理过后的configs完成ProducerConfig创建
ProducerConfig构造过程
上述第二步中调用ProducerConfig(Map<String, Object> props)构造函数,完成ProducerConfig构造,此构造函数内部调用调用父类构造函数AbstractConfig(ConfigDef definition, Map<?, ?> originals),在调用父类构造函数时传入CONFIG,此属性被final修饰,在static代码块中完成初始化
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
in(ClientDnsLookup.DEFAULT.toString(),
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Type.STRING,
"1",
in("all", "-1", "0", "1"),
Importance.HIGH,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(MAX_REQUEST_SIZE_CONFIG,
Type.INT,
1024 * 1024,
atLeast(0),
Importance.MEDIUM,
MAX_REQUEST_SIZE_DOC)
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(RECONNECT_BACKOFF_MAX_MS_CONFIG, Type.LONG, 1000L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(MAX_BLOCK_MS_CONFIG,
Type.LONG,
60 * 1000,
atLeast(0),
Importance.MEDIUM,
MAX_BLOCK_MS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
30 * 1000,
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
.define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
.define(METADATA_MAX_IDLE_CONFIG,
Type.LONG,
5 * 60 * 1000,
atLeast(5000),
Importance.LOW,
METADATA_MAX_IDLE_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
.define(METRICS_RECORDING_LEVEL_CONFIG,
Type.STRING,
Sensor.RecordingLevel.INFO.toString(),
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),
Importance.LOW,
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
Type.INT,
5,
atLeast(1),
Importance.LOW,
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC)
.define(SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,
Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
.define(SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
Type.LONG,
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,
Importance.MEDIUM,
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
9 * 60 * 1000,
Importance.MEDIUM,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(PARTITIONER_CLASS_CONFIG,
Type.CLASS,
DefaultPartitioner.class,
Importance.MEDIUM, PARTITIONER_CLASS_DOC)
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
INTERCEPTOR_CLASSES_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SECURITY_PROVIDERS_CONFIG,
Type.STRING,
null,
Importance.LOW,
SECURITY_PROVIDERS_DOC)
.withClientSslSupport()
.withClientSaslSupport()
.define(ENABLE_IDEMPOTENCE_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
ENABLE_IDEMPOTENCE_DOC)
.define(TRANSACTION_TIMEOUT_CONFIG,
Type.INT,
60000,
Importance.LOW,
TRANSACTION_TIMEOUT_DOC)
.define(TRANSACTIONAL_ID_CONFIG,
Type.STRING,
null,
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC)
.defineInternal(AUTO_DOWNGRADE_TXN_COMMIT,
Type.BOOLEAN,
false,
Importance.LOW);
}
看似CONFIG初始化很复杂,其实其初始化过程就是在不断定义各种配置的默认值、类型等内容,为用户设置配置属性做类型转换及默认值设置等操作做准备。在AbstractConfig(ConfigDef definition, Map<?, ?> originals)中继续调用其重载构造函数AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog),此构造函数源码如下
public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
/* check that all the keys are really strings */
for (Map.Entry<?, ?> entry : originals.entrySet())
if (!(entry.getKey() instanceof String))
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
this.values = definition.parse(this.originals);
this.used = Collections.synchronizedSet(new HashSet<>());
Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
this.values.put(update.getKey(), update.getValue());
}
definition.parse(this.values);
this.definition = definition;
if (doLog)
logAll();
}
其主要做了如下操作
-
判断用户设置参数key是否为字符串类型,如果不是则抛出ConfigException异常
-
调用resolveConfigVariables方法完成ConfigProvider功能支持,实现调用用户设置的provider获取配置并更新用户配置参数值,赋值originals属性
-
之后对上一步处理过后的配置参数originals进行解析,调用ConfigDef的parse方法传入originals,definition中定义了合法的参数及其类型等信息,我们接下来看一下parse方法
public Map<String, Object> parse(Map<?, ?> props) { // Check all configurations are defined List<String> undefinedConfigKeys = undefinedDependentConfigs(); if (!undefinedConfigKeys.isEmpty()) { String joined = Utils.join(undefinedConfigKeys, ","); throw new ConfigException("Some configurations in are referred in the dependents, but not defined: " + joined); } // parse all known keys Map<String, Object> values = new HashMap<>(); for (ConfigKey key : configKeys.values()) values.put(key.name, parseValue(key, props.get(key.name), props.containsKey(key.name))); return values; }
-
获取未被定义的配置参数,其实就是看configKeys中key不包含其对应value的dependents属性的值。configKeys是在静态代码块中初始化的
-
如果存在这样的数据则抛出异常
-
循环configKeys的value,从props获取对应参数的值放入values中。我们可以看到values为Map结构,其中key为ConfigKey对象中的name,也即支持的参数,value通过parseValue方法获取,第一个参数为ConfigKey对象key,第二个参数是从props用户参数中获取的支持的参数的值,第三个参数如果用户参数中配置了该参数则为true。在parseValue方法中执行逻辑是,如果用户设置了该参数则调用parseType方法,解析配置参数值转换为指定的类型;如果用户没有设置此参数,则判断是否有默认值,有默认值则获取默认值,没有则抛出异常,如果该参数对于Kafka来说是需要校验的则会调用校验方法校验
-
-
初始化一个线程安全Set赋值给used属性
-
postProcessParsedConfig方法,对参数默认值进行处理。比如用户没有设置reconnect.backoff.max.ms参数值,但设置了reconnect.backoff.ms参数值,则默认将reconnect.backoff.max.ms参数值设置为reconnect.backoff.ms参数值;如果用户设置了transactional.id,但没有设置enable.idempotence参数值, 则默认设置enable.idempotence为true
-
通过循环将上步得到的默认值设置放到配置参数Map中
-
再次调用parse方法
-
将处理过后的definition赋值给当前对象的属性definition
-
打印参数到控制台
总结:上面步骤完成了ProducerConfig的创建工作
其中resolveConfigVariables方法源码如下
private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
Map<String, String> providerConfigString;
Map<String, ?> configProperties;
Map<String, Object> resolvedOriginals = new HashMap<>();
Map<String, String> indirectVariables = extractPotentialVariables(originals);
resolvedOriginals.putAll(originals);
if (configProviderProps == null || configProviderProps.isEmpty()) {
providerConfigString = indirectVariables;
configProperties = originals;
} else {
providerConfigString = extractPotentialVariables(configProviderProps);
configProperties = configProviderProps;
}
Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties);
if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
return new ResolvingMap<>(resolvedOriginals, originals);
}
完成如下操作
-
通过extractPotentialVariables方法解析用户设置配置参数中值为String类型的参数并赋值給indirectVariables
-
resolvedOriginals局部变量拷贝用户参数
-
configProviderProps为空,执行将indirectVariables赋值給providerConfigString局部变量,将用户参数赋值configProperties
-
调用instantiateConfigProviders实例化配置提供者,此提供者主要用于获取密钥,比如从环境变量中获取配置但密钥信息,通常Docker、K8s都是通过环境变量指定密钥信息
-
通过上步尝试获取provider,判断如果provider不为空,则先初始化一个ConfigTransformer,之后调用ConfigTransformer的transform方法,通过provider获取配置参数,并替换变量部分
-
使用替换后的参数更新resolvedOriginals
-
循环关闭provider
-
使用resolvedOriginals、originals构建返回对象ResolvingMap
其中instantiateConfigProviders源码如下
private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, String> indirectConfigs, Map<String, ?> providerConfigProperties) {
final String configProviders = indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
if (configProviders == null || configProviders.isEmpty()) {
return Collections.emptyMap();
}
Map<String, String> providerMap = new HashMap<>();
for (String provider: configProviders.split(",")) {
String providerClass = CONFIG_PROVIDERS_CONFIG + "." + provider + ".class";
if (indirectConfigs.containsKey(providerClass))
providerMap.put(provider, indirectConfigs.get(providerClass));
}
// Instantiate Config Providers
Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
} catch (ClassNotFoundException e) {
log.error("ClassNotFoundException exception occurred: " + entry.getValue());
throw new ConfigException("Invalid config:" + entry.getValue() + " ClassNotFoundException exception occurred", e);
}
}
return configProviderInstances;
}
完成如下操作
-
从传入的值为字符串类型的参数中获取用户设置的config.providers参数值
-
如果没有设置此参数则返回空集合
-
如果设置了此参数,则将参数值按逗号分割,循环分割后的每一个provider名称,构造providerClass格式
config.providers.<provider名称>.class
并判断indirectConfigs参数中是否包含key为providerClass的参数,如果包含,则将provider名称、对应真实的provider全路径类名放入providerMap集合中 -
循环providerMap,构造prefix,格式为
config.providers.<provider名称>.param.
。调用configProviderProperties(prefix, providerConfigProperties)方法解析出用户配置参数中以prefix为前缀的key,并截取后面部分,将截取后的key和value放入新的map中赋值给configProperties -
调用Utils.newInstance(entry.getValue(), ConfigProvider.class)方法,根据provider的全路径名实例化provider
-
设置provider的config configProperties
-
将provider名称和对应的实例放入map中返回
ConfigTransformer的transform方法源码如下
public ConfigTransformerResult transform(Map<String, String> configs) {
Map<String, Map<String, Set<String>>> keysByProvider = new HashMap<>();
Map<String, Map<String, Map<String, String>>> lookupsByProvider = new HashMap<>();
// Collect the variables from the given configs that need transformation
for (Map.Entry<String, String> config : configs.entrySet()) {
if (config.getValue() != null) {
List<ConfigVariable> vars = getVars(config.getValue(), DEFAULT_PATTERN);
for (ConfigVariable var : vars) {
Map<String, Set<String>> keysByPath = keysByProvider.computeIfAbsent(var.providerName, k -> new HashMap<>());
Set<String> keys = keysByPath.computeIfAbsent(var.path, k -> new HashSet<>());
keys.add(var.variable);
}
}
}
// Retrieve requested variables from the ConfigProviders
Map<String, Long> ttls = new HashMap<>();
for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
String providerName = entry.getKey();
ConfigProvider provider = configProviders.get(providerName);
Map<String, Set<String>> keysByPath = entry.getValue();
if (provider != null && keysByPath != null) {
for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
String path = pathWithKeys.getKey();
Set<String> keys = new HashSet<>(pathWithKeys.getValue());
ConfigData configData = provider.get(path, keys);
Map<String, String> data = configData.data();
Long ttl = configData.ttl();
if (ttl != null && ttl >= 0) {
ttls.put(path, ttl);
}
Map<String, Map<String, String>> keyValuesByPath =
lookupsByProvider.computeIfAbsent(providerName, k -> new HashMap<>());
keyValuesByPath.put(path, data);
}
}
}
// Perform the transformations by performing variable replacements
Map<String, String> data = new HashMap<>(configs);
for (Map.Entry<String, String> config : configs.entrySet()) {
data.put(config.getKey(), replace(lookupsByProvider, config.getValue(), DEFAULT_PATTERN));
}
return new ConfigTransformerResult(data, ttls);
}
其完成如下操作
-
循环传入的用户配置Map config,此map为value类型为String类型的配置参数,如果配置参数的value不为空,则尝试获取匹配value值,匹配模式为
\$\{([^}]*?):(([^}]*?):)?([^}]*?)\}
即匹配格式为${mycustom:/path/pass/to/get/method:password}
的value值,并解析出第一部分mycustom 作为providerName;第二部分/path/pass/to/get/method作为path,path也可能为空;第三部分password作为variable,用于构造ConfigVariable -
循环上步解析到的ConfigVariable集合,将其解析到keysByProvider Map结构中,结构为Map<String, Map<String, Set>>,外层Map 的key为providerName,内层Map的key为path,内层Map的值为variable的集合
-
循环上一步解析得到的keysByProvider,获取最外层Map的key即provider名称,从configProviders中获取provider名称对应的provider实例。获取外层Map的value值即path与variable集合的键值对赋值keysByPath,如果两者都不为空则进一步循环keysByPath,获取path,及对应的variable集合赋值keys,调用provider实例的get方法传入path、keys,获取解析的configData对象。provider实例其实是指继承ConfigProvider接口的类的实例化对象,我们可以根据自己需要实现ConfigProvider接口,实现自己获取配置参数的方式,例如通过环境变量获取。此处我以官方的示例FileConfigProvider讲解get(String path, Set keys)方法的实现
public ConfigData get(String path, Set<String> keys) { Map<String, String> data = new HashMap<>(); if (path == null || path.isEmpty()) { return new ConfigData(data); } try (Reader reader = reader(path)) { Properties properties = new Properties(); properties.load(reader); for (String key : keys) { String value = properties.getProperty(key); if (value != null) { data.put(key, value); } } return new ConfigData(data); } catch (IOException e) { throw new ConfigException("Could not read properties from file " + path); } }
-
首先判断path是否为空,如果为空直接返回ConfigData实例其data属性为空Map。因为path并不是必须的,如果我们要实现从环境变量中获取配置,其实配不配path没有任何影响,但此处FileConfigProvider就是从path路径加载配置,因此没有直接返回空
-
path不为空继续之后逻辑,通过path文件路径构造Reader,直接通过properties加载reader内容,之后循环keys,从properties获取key对应的配置值,并放入data中,data为Map结构,key为循环的key,value为配置的key对应的配置值。之后通过获取的data构造ConfigData并返回
-
-
获取configData的data属性及ttl属性,如果ttl不为空且大于0,则将ttl与path关联,存储在Map结构中,ttl表示数据的生存时间,单位为秒。初始化lookupsByProvider,将provider名称与path及对应参数列表建立关联存入lookupsByProvider的Map结构中
-
接下来通过上面解析出来的数据替换正则表达式
\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}
匹配的部分。替换方法为replace(Map<String, Map<String, Map<String, String>>> lookupsByProvider, String value, Pattern pattern)private static String replace(Map<String, Map<String, Map<String, String>>> lookupsByProvider, String value, Pattern pattern) { if (value == null) { return null; } Matcher matcher = pattern.matcher(value); StringBuilder builder = new StringBuilder(); int i = 0; while (matcher.find()) { ConfigVariable configVar = new ConfigVariable(matcher); Map<String, Map<String, String>> lookupsByPath = lookupsByProvider.get(configVar.providerName); if (lookupsByPath != null) { Map<String, String> keyValues = lookupsByPath.get(configVar.path); String replacement = keyValues.get(configVar.variable); builder.append(value, i, matcher.start()); if (replacement == null) { // No replacements will be performed; just return the original value builder.append(matcher.group(0)); } else { builder.append(replacement); } i = matcher.end(); } } builder.append(value, i, value.length()); return builder.toString(); }
- 首先解析出满足正则表达试的部分,然后根据之前provider解析出的配置参数,替换匹配正则表达式的部分,之后便得到替换后的字符串,覆盖之前包含的老值
-
将替换之后的用户配置参数、及上面解析出的ttls用于构造返回对象ConfigTransformerResult
完成生产者创建
在上述步骤中完成ProducerConfig创建后就正式进入创建生产者的流程
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
try {
this.producerConfig = config;
this.time = time;
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext;
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
reporters.add(jmxReporter);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = new ApiVersions();
this.transactionManager = configureTransactionState(config, logContext);
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
this.metadata.bootstrap(addresses);
}
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
-
对创建的KafkaProducer生产者设置属性值,将传入的config设置给producerConfig属性,将当前系统时间设置给time属性,将client.id参数值设置给clientId属性
-
创建MetricConfig,设置相应属性,包括设置样本数、窗口时间、记录级别、tag,这里的tag是客户端id,创建JmxReporter、MetricsContext,将前面创建的与指标监控相关的对象用于创建Metrics对象并赋值给metrics属性
-
获取分区器并初始化给partitioner属性
-
如果keySerializer参数为空,则从参数中获取配置的key.serializer,并初始化给keySerializer属性。再调用keySerializer的configure方法完成encoding解析设置;如果不为空,则直接将参数keySerializer赋值给属性keySerializer属性
-
同上步一样设置valueSerializer属性值
-
传入的interceptors 拦截器参数不为空,则使用传入的拦截器参数初始化interceptors属性;如果为空,则从config参数中获取配置的拦截器,并用于创建ProducerInterceptors 生产者拦截器
-
configureClusterResourceListeners方法,构造集群资源监听器对象,此处仅仅是构造ClusterResourceListeners对象并设置其属性值,用户可以实现ClusterResourceListener接口,每次集群元数据更新时都将回调用户实现的方法
-
设置各种参数属性
-
configureTransactionState方法,完成事物管理器创建
private TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext) { TransactionManager transactionManager = null; final boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG); final boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG); if (userConfiguredTransactions && !userConfiguredIdempotence) log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, ProducerConfig.TRANSACTIONAL_ID_CONFIG); if (config.idempotenceEnabled()) { final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); final boolean autoDowngradeTxnCommit = config.getBoolean(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT); transactionManager = new TransactionManager( logContext, transactionalId, transactionTimeoutMs, retryBackoffMs, apiVersions, autoDowngradeTxnCommit); if (transactionManager.isTransactional()) log.info("Instantiated a transactional producer."); else log.info("Instantiated an idempotent producer."); } return transactionManager; }
-
从参数中获取是否开启密等性设置、是否设置事物Id,如果设置了事物Id但没有开启幂等性设置,则打印提示信息
-
如果开启幂等设置并且设置事物Id,则进行TransactionManager创建,创建TransactionManager会从配置中获取配置值进行初始化
-
-
创建记录累加器
-
ClientUtils.parseAndValidateAddresses静态方法解析并验证地址,此静态方法需要传入设置的bootstrap.servers服务端地址,以及client.dns.lookup参数值,此值默认为use_all_dns_ips
public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, ClientDnsLookup clientDnsLookup) { List<InetSocketAddress> addresses = new ArrayList<>(); for (String url : urls) { if (url != null && !url.isEmpty()) { try { String host = getHost(url); Integer port = getPort(url); if (host == null || port == null) throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { InetAddress[] inetAddresses = InetAddress.getAllByName(host); for (InetAddress inetAddress : inetAddresses) { String resolvedCanonicalName = inetAddress.getCanonicalHostName(); InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); if (address.isUnresolved()) { log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); } else { addresses.add(address); } } } else { InetSocketAddress address = new InetSocketAddress(host, port); if (address.isUnresolved()) { log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); } else { addresses.add(address); } } } catch (IllegalArgumentException e) { throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); } catch (UnknownHostException e) { throw new ConfigException("Unknown host in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); } } } if (addresses.isEmpty()) throw new ConfigException("No resolvable bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); return addresses; }
-
循环参数bootstrap.servers的服务地址列表,解析出每个地址的host及端口
-
查看client.dns.lookup参数设置值,如果为resolve_canonical_bootstrap_servers_only,则根据上步的host获取对应的IP列表inetAddresses,如果本身host就是IP地址,则只做验证。循环inetAddresses,获取规范的主机名,并使用对应端口,创建InetSocketAddress对象;如果不为resolve_canonical_bootstrap_servers_onl,则直接使用host及端口创建InetSocketAddress。返回解析后的InetSocketAddress列表对象
-
-
创建生产端元数据对象,并使用用户设置的相关元数据的配置进行初始化
-
newSender方法,创建发送器
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) { int maxInflightRequests = configureInflightRequests(producerConfig); int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder, logContext), metadata, clientId, maxInflightRequests, producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG), producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), requestTimeoutMs, producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG), ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)), time, true, apiVersions, throttleTimeSensor, logContext); short acks = configureAcks(producerConfig, log); return new Sender(logContext, client, metadata, this.accumulator, maxInflightRequests == 1, producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, producerConfig.getInt(ProducerConfig.RETRIES_CONFIG), metricsRegistry.senderMetrics, time, requestTimeoutMs, producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); }
-
configureInflightRequests方法,获取配置的max.in.flight.requests.per.connection参数值,如果开启幂等性,则不允许此参数设置值大于5。此参数值控制,在阻塞前在单个连接上允许发送的最大未确定请求数。设置大于1有乱序风险
-
获取请求超时时间
-
ClientUtils.createChannelBuilder静态方法,根据用户配置的参数完成通道构造器创建
-
创建生产者指标参数
-
创建produceThrottleTime传感器
-
创建NetworkClient,NetworkClient包含了各种与连接相关的参数,比如各种缓冲区大小、退避时间、连接超时时间等
-
configureAcks方法,获取并验证acks设置,对于启用事物则必须将acks设置为-1
-
创建Sender对象,将各种解析设置放入Sender属性中,Sender类实现了Runnable接口
-
-
创建KafkaThread io线程,KafkaThread继承了Thread,使用sender对象初始化线程
public KafkaThread(final String name, Runnable runnable, boolean daemon) { super(runnable, name); configureThread(name, daemon); } private void configureThread(final String name, boolean daemon) { setDaemon(daemon); setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e)); }
-
其构造方法调用父类Thread的构造方法,使用sender创建线程
-
将此线程设置为守护线程,并设置未捕获异常的处理
-
-
启动io线程
-
AppInfoParser.registerAppInfo方法,记录相关信息
-
自此KafkaProducer生产者创建完成
ClientUtils.createChannelBuilder创建通道构造器解析
此静态方法根据提供的配置信息创建一个新的通道构造器
public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time, LogContext logContext) {
SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);
return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null,
clientSaslMechanism, time, true, logContext);
}
-
首先从配置参数中获取安全协议,默认PLAINTEXT
-
获取用于客户端连接的SASL机制,默认是GSSAPI
-
调用ChannelBuilders的静态方法clientChannelBuilder创建通道构造器
public static ChannelBuilder clientChannelBuilder( SecurityProtocol securityProtocol, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, Time time, boolean saslHandshakeRequestEnable, LogContext logContext) { if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) { if (contextType == null) throw new IllegalArgumentException("`contextType` must be non-null if `securityProtocol` is `" + securityProtocol + "`"); if (clientSaslMechanism == null) throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`"); } return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism, saslHandshakeRequestEnable, null, null, time, logContext, null); }
-
首先进行参数值校验,如果安全协议为SASL_PLAINTEXT或SASL_SSL,则contextType、clientSaslMechanism参数必须不能为空
-
调用create方法完成创建
-
create创建ChannelBuilder
完成ChannelBuilder创建
private static ChannelBuilder create(SecurityProtocol securityProtocol,
Mode mode,
JaasContext.Type contextType,
AbstractConfig config,
ListenerName listenerName,
boolean isInterBrokerListener,
String clientSaslMechanism,
boolean saslHandshakeRequestEnable,
CredentialCache credentialCache,
DelegationTokenCache tokenCache,
Time time,
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
Map<String, Object> configs = channelBuilderConfigs(config, listenerName);
ChannelBuilder channelBuilder;
switch (securityProtocol) {
case SSL:
requireNonNullMode(mode, securityProtocol);
channelBuilder = new SslChannelBuilder(mode, listenerName, isInterBrokerListener, logContext);
break;
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
Map<String, JaasContext> jaasContexts;
String sslClientAuthOverride = null;
if (mode == Mode.SERVER) {
@SuppressWarnings("unchecked")
List<String> enabledMechanisms = (List<String>) configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
jaasContexts = new HashMap<>(enabledMechanisms.size());
for (String mechanism : enabledMechanisms)
jaasContexts.put(mechanism, JaasContext.loadServerContext(listenerName, mechanism, configs));
// SSL client authentication is enabled in brokers for SASL_SSL only if listener-prefixed config is specified.
if (listenerName != null && securityProtocol == SecurityProtocol.SASL_SSL) {
String configuredClientAuth = (String) configs.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
String listenerClientAuth = (String) config.originalsWithPrefix(listenerName.configPrefix(), true)
.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
// If `ssl.client.auth` is configured at the listener-level, we don't set an override and SslFactory
// uses the value from `configs`. If not, we propagate `sslClientAuthOverride=NONE` to SslFactory and
// it applies the override to the latest configs when it is configured or reconfigured. `Note that
// ssl.client.auth` cannot be dynamically altered.
if (listenerClientAuth == null) {
sslClientAuthOverride = SslClientAuth.NONE.name().toLowerCase(Locale.ROOT);
if (configuredClientAuth != null && !configuredClientAuth.equalsIgnoreCase(SslClientAuth.NONE.name())) {
log.warn("Broker configuration '{}' is applied only to SSL listeners. Listener-prefixed configuration can be used" +
" to enable SSL client authentication for SASL_SSL listeners. In future releases, broker-wide option without" +
" listener prefix may be applied to SASL_SSL listeners as well. All configuration options intended for specific" +
" listeners should be listener-prefixed.", BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
}
}
}
} else {
// Use server context for inter-broker client connections and client context for other clients
JaasContext jaasContext = contextType == JaasContext.Type.CLIENT ? JaasContext.loadClientContext(configs) :
JaasContext.loadServerContext(listenerName, clientSaslMechanism, configs);
jaasContexts = Collections.singletonMap(clientSaslMechanism, jaasContext);
}
channelBuilder = new SaslChannelBuilder(mode,
jaasContexts,
securityProtocol,
listenerName,
isInterBrokerListener,
clientSaslMechanism,
saslHandshakeRequestEnable,
credentialCache,
tokenCache,
sslClientAuthOverride,
time,
logContext,
apiVersionSupplier);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder(listenerName);
break;
default:
throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
}
channelBuilder.configure(configs);
return channelBuilder;
}
-
channelBuilderConfigs方法,主要完成参数处理
-
根据安全策略创建对应安全策略的通道构建器。例如默认创建PlaintextChannelBuilder通道构造器
-
设置第一步返回的参数给通道构建器
发送消息
创建完发送者之后就可以使用发送者发送消息了,但发送的内容还需要构造成发送者需要的对象
构造发送消息需要的对象类型ProducerRecord
发送消息前需要将待发送消息构造成ProducerRecord对象,直接通过ProducerRecord构造方法可以完成创建,我们除需要设置待发送消息Key、Value外,还需要设置需要发送到的topic,我们也可以设置发送的分区等信息
通过生产者send方法完成消息发送
KafkaProducer的send方法用于完成消息的发送,消息发送是异步的,会返回Future的对象。发送消息可以指定错误回调方法也可以不指定,我们下面看一下不指定错误回调方法的消息发送源码
不指定错误回调的发送消息方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
-
调用ProducerInterceptors的onSend方法,循环所有的拦截器,并调用用户注册拦截器的onSend方法,此方法在序列化之前执行。并且上一个拦截器的输出作为下一个拦截器的输入
-
执行doSend真正完成消息发送操作
doSend方法完成真正发送
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
-
throwIfProducerClosed方法,检查sender属性是否为空或者是否已经关闭,如果是则抛出"Cannot perform operation after producer has been closed"异常
-
waitOnMetadata方法,获取集群元数据信息
-
执行waitOnMetadata方法前的时间数+执行完成waitOnMetadata方法时间作为nowMs局部变量值
-
最大阻塞时间 - waitOnMetadata方法获取元数据时间作为remainingWaitMs剩余等待时间值
-
调用keySerializer.serialize方法完成key的序列化
-
调用valueSerializer.serialize方法完成value序列化
-
调用partition方法计算发送消息的目标分区
-
setReadOnly方法,如果发送消息头为RecordHeaders类型对象,则设置只读标识isReadOnly为ture
-
AbstractRecords.estimateSizeInBytesUpperBound方法,获取要发送消息的预估最大值,没有考虑压缩
-
ensureValidRecordSize方法,判断是否超过发送消息大小的上线,主要根据与maxRequestSize、totalMemorySize做对比
-
如果发送数据的时间戳为空则使用上面的nowMs作为时间戳
-
new InterceptorCallback<>创建回调器
-
RecordAccumulator的append方法将要发送的消息记录追加到发送消息的内存缓存池中
-
this.sender.wakeup()唤起发送线程进行消息发送
waitOnMetadata获取集群元数据信息
待补充
partition方法计算发送消息目标分区
待补充
RecordAccumulator的append方法
待补充
唤起发送线程进行消息发送
待补充