一个应用程序在很多情况下需要往Kafka写入消息:记录用户的活动(用于审计和分析),记录度量指标,保存日志消息,记录智能家电的信息,与其他应用程序进行异步通信,缓冲即将写入到数据库的数据,等等。
多样的使用场景意味着多样的需求:是否每个消息都很重要?是否允许丢失一小部分消息?偶尔出现重复消息是否可以接受?是否有严格的延迟和吞吐量要求?
不同的使用场景对生产者API的使用和配置会有直接的影响。
消息发送过程
首先创建一个ProducerRecord对象。
ProducerRecord对象包含目标主题和要发送的内容。
可以指定键或分区。
如果有键的话,将键序列化成字节数组,以便在网络上传输。同样地,要发送的内容,即值,也需要序列化成字节数组。
接下来,数据被传给分区器。
如果之前在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。
如果没有指定分区,那么分区器会根据ProducerRecord对象的键来选择一个分区。
选好分区后,生产者就知道该往哪个主题和分区发送记录了。
紧接着,这条记录被添加到了一个记录批次里。
这个批次里的所有消息都会被发送到相同的主题和分区上。
有一个独立的线程负责把这些记录批次发送到相应的broker上。
服务器在收到这些消息时会返回一个相应。
如果消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。
如果写入失败,则会返回一个错误。
生产者在接收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。
Kafka生产者参数
bootstrap.servers
该属性指定broker的地址清单,地址的格式为host:port。
清单不需要包含所有的broker地址,生产者会从给定的broker里查找到其他broker的信息。
建议至少要提供两个broker的信息,一旦其中一个单机,生产者仍然能够连接到集群上。
该参数为必选参数。
key.serializer
broker希望接收到的消息的键和值都是字节数组。
生产者接口允许使用参数化类型,因此可以把Java对象作为键和值发送给broker。这样的代码具有良好的可读性,不过生产者需要知道如何把这些Java对象转化成字节数组。
key。serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。
Kafka客户端默认提供了ByteArraySerializer,StringSerializer和IntegerSerializer。
该参数必须设置,即使只发送值。
value.serializer
用指定的类将值序列化。
该参数必须设置。
发送消息的方式
生产者可以使用单个消费者单个线程,也可以使用单个消费者多个线程。
或者增加消费者。
发送并忘记
我们把消息发送给服务器,但并不关心它是否正常到达。
大多数情况下,消息会正常到达, 因为Kafka是高可用的,而且生产者会自动尝试重发。
不过这种方式有时候也会丢失一些消息。
同步发送
我们使用send()发送消息,它会返回一个Future对象,调用get()方法进行等待。就可以知道消息是否发送成功。
异步发送
我们调用send()方法,并指定一个回调函数,服务器在返回响应时调用该函数。