《Kafka权威指南》学习笔记
Kafka生产者一一向Kafka写入数据
生产者概览
尽管生产者API使用起来很简单,但消息的发送过程还是有点复杂的:
- 1.我们从创建一个ProducerRecord对象开始,ProducerRecord对象需要包含
目标主题和要发送的内容
。我们还可以指定键或分区
。 - 2.在发送ProducerRecord对象时,生产者要先把键和值对象
序列化成字节数组
,这样它们才能够在网络上传输。 - 3.数据被传给分区器。
- 3.a 如果之前在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。
- 3.b 如果没有指定分区,那么分区器会根据ProducerRecord对象的
键
来选择一个分区。
- 4.选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个
记录批次
里,这个批次里的所有消息会被发送到相同的主题和分区上。 - 5.服务器在收到这些消息时会返回一个响应。
- 5.a 如果消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录
在分区里的偏移量
。 - 5.b 如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。
- 5.a 如果消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录
创建Kafka生产者
必选属性
要往Kafka写入消息,首先要创建一个生产者对象,井设置一些属性 。 Kafka 生产者有 3个必选的属性。
bootstrap.server
:该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其他broker的信息。不过建议至少要提供两个broker的信息,一且其中一个若机,生产者仍然能够连接到集群上。key.serializer
:broker希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把Java对象作为键和值发送给broker。
key.serializer
必须被设置为一个实现了。org.apache.kafka.common.serialization.Serializer
接口的类,生产者会使用这个类把键对象序列化成字节数组。value.serializer
:会将值序列化。
API创建生产者
Properties kafkaProps=new Properties();
kafkaProps.put("bootstrap.servers","s159:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer=new KafkaProducer<String,String>(kafkaProps);
发送消息到Kafka
发送消息主要有以下3种方式。
发送并忘记(fire-and-forget)
:我们把消息发送给服务器,但井不关心它是否正常到达。使用这种方式有时候也会丢失一些消息。
同步发送
:我们使用send()方法发送消息,它会返回一个Future对象,调用get()方法进行等待,就可以知道悄息是否发送成功。异步发送
:我们调用send()方法,并指定一个回调函数,服务器在返回响应时调用该callback函数。
API发送消息
producer.send(record);
producer.send(record).get();
producer.send(record,Callbackcallback);
生产者的配置
-
acks
:acks参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。acks=0
:生产者在成功写入悄息之前不会等待任何来自服务器的响应。吞吐量最高
acks=1
:只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。acks=all
:有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。最安全
-
buffer.memory
:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。 -
compression.type
:默认情况下,消息发送时不会被压缩。该参数可以设置为snappy、gzip或lz4,它指定了消息被发送给broker之前使用哪一种压缩算也进行压缩。 -
retries
:生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。
默认情况下,生产者会在每次重试之间等待lOOms,不过可以通过retry.backoff.ms
参数来改变这个时间间隔。 -
batch.size
:当有多个消息需要被发送到同一个分区时,生产者会把它们放在罔一个批次里。该参数指定了一个批次可以使用的内存大小
,按照字节数计算(而不是消息个数
)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。 -
linger.ms
:该参数指定了生产者在发送批次之前等待更多消息加入批次
的时间。 -
client.id
:该参数可以是任意的字符串,服务器会用它来识别消息的来橱 -
max.in.flight.requests.per.connection
:生产者在收到服务器晌应之前可以发送多少个消息。 -
timeout.ms
:指定了broker等待同步副本返回消息确认的时间 -
request.timeout.ms
:指定了生产者在发送数据时等待服务器返回响应的时间, -
metadata.fetch.timeout.ms
:指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。 -
max.block.ms
:该参数指定了在调用send()方法
或使用partitionFor()
方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方法就会阻塞。
-
max.request.size
:可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。(一个batch中所有消息的总和)
-
receive.buffer.bytes和send.buffer.bytes
:这两个参数分别指定了TCPsocket接收和发送数据包的缓冲区大小。如果它们被设为-1,就使用操作系统的默认值。
消息&分区
ProducerRecord对象包含了目标主题、键和值。
public ProducerRecord(String topic,Integer partition,K key,V value){};
public ProducerRecord(String topic,K key,V value){};
public ProducerRecord(String topic,V value){};
Kafka的消息是一个键值对
,ProducerRecord对象可以只包含目标主题和值,键可以设置为默认的null。
键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的悄息将被写到同一个分区
。
如果键值为null,井且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(RoundRobin)算法将消息均衡地分布到各个分区上。