kafka 消费者
——总结自《kafka 权威指南》
kafka 消费者从属于消费者群组,一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。如果群组里的消费者超出主题的分区数量,那么就有一部分消费者会被闲置,不会接收到任何信息。
分区的所有权从一个消费者转移到另一个消费者,这种行为成为再均衡
。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过被指派为群组协调器的 broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡后才会触发再均衡,在这几秒钟内,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡。
在 0.10.1 版本里,kafka 引入了一个独立的心跳线程,可以在轮询消息的空档发送心跳。这样一来,发送心跳的频率与消息轮询的频率之间就是相互独立的。可以指定消费者在离开群组并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁。
当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为群主,群主从协调器那里获得群组的成员列表,并负责给每一个消费者分配分区。它使用了一个实现了 PartitionAssignor 接口的类来决定哪些分区应该分配给哪个消费者。
public class ConsumerTest {
public static void main(String[] args) {
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");
kafkaProperties.put("group.id", "group-1");
kafkaProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(kafkaProperties);
// 订阅主题,可以在 subscribe 传入一个正则表达式,匹配多个主题,即 consumer.subscribe("test.*")
kafkaConsumer.subscribe(Collections.singleton("CustomerCountry"));
// 轮序读取数据
try {
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records){
String printStr = String.format("topic=%s, partition=%s, offset=%s, customer=%s, country=%s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
System.out.println(printStr);
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
Properties
-
fetch.min.bytes:指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于该值,它就会等到有足够的可用数据时才把它返回给消费者。
-
fetch.max.wait.ms:指定 broker 的等待时间,默认是500ms,当 broker 数据量达到 fetch.min.bytes 或延时时间达到 fetch.max.wait.ms 时数据返回。
-
max.partition.fetch.bytes:指定了服务器从每个分区返回给消费者的最大字节数总量,默认值是 1MB。max.partition.fetch.bytes 的值必须要比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取消息,导致消费者一直挂起重试。
-
session.timeout.ms:指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s。该属性与 hearbeat.interval.ms 紧密相关,hearbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率。hearbeat.invterval.ms 一般是 session.timeout 的1/3。
-
auto.offset.reset:指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。默认值是 latest,即消费者将从最新的记录开始读取数据。另一个值是 earlist,即从起始位置读取记录。
-
enable.auto.commit:指定了消费者是否自动提交偏移量,默认是 true。可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。
-
partition.assignment.strategy:指定分区的分配策略。kafka 默认的分配有两个。
- range:把主题的若干个连续的分区分配给消费者。只要使用了 range 策略,而且分区数量无法被消费者数量整除,就会是上一个消费者分区数量大于下一个消费者分区数量。
- roundRobin:把主题的所有分区逐个分配给消费者,会给所有消费者分配相同数量的分区(或最多差一个分区)。
也可以自己自定义分区策略并进行相关的配置。partition.assignment.startegy 默认使用的是 oeg.apache.kafka.clients.consumer.RangeAssignor。
-
client.id:可以是任意字符串,用来表示从客户端发送过来的消息,通常用在日期、度量指标和配额里。
-
max.poll.records:用于控制单次调用 call() 方法能够返回的记录数量,可以控制在轮询里需要处理的数据量。
-
receive.buffer.bytes:指定了 TCP socket 接收数据包的缓冲大小。
-
send.buffer.bytes:指定了 TCP socket 发送数据包的缓冲大小。
偏移量
消费者提交偏移量:消费者往一个叫做 _consumer_offset 的特殊主体发送消息,消息里包含每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,为了能够进行之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续。如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息会被重复处理。
自动提交:
Properties kafkaProperties = new Properties();
// ...其他配置...
kafkaProperties.setProperty("enable.auto.commit", "true"); // 默认
kafkaProperties.setProperty("auto.commit.interval.ms", "100"); // 设置提交时间,默认是5s
提交当前偏移量:
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records){
String printStr = String.format("topic=%s, partition=%s, offset=%s, customer=%s, country=%s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
System.out.println(printStr);
kafkaConsumer.commitSync(); //手动提交当前偏移量
}
}
异步提交:
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null){
System.out.println("提交失败" + e);
return;
}
System.out.println("提交偏移量 " + map + "成功");
}
});
同步和异步结合
try {
while (true){
// ......
kafkaConsumer.commitAsync();
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
try {
kafkaConsumer.commitSync();
} finally {
kafkaConsumer.close();
}
}
提交特定的偏移量:
HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
map.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata") );
kafkaConsumer.commitAsync(map, null);
从特定偏移量开始处理数据:
for (TopicPartition partition: kafkaConsumer.assignment()){
//getOffsetFromMysql:从数据库中查找分区的偏移量,并返回。
kafkaConsumer.seek(partition,getOffsetFromMysql(partition));
}
再均衡监听器
在为消费者分配新分区或移除旧分区时,可以通过消费者API执行一些应用程序代码。在调用 subscribe() 方法时传入一个 ConsumerRebalanceListerner 类就可以了。该类由两个需要实现的方法:
- onPartitionsRevoked:在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道从哪里开始读取了。
- onPartitionsAssigned:在重新分配分区之后和消费者开始读取消息之前被调用。
class HandleRebalance implements ConsumerRebalanceListener{
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
// 将记录和偏移量插入数据库
commitDBTransaction();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
for (TopicPartition partition: collection){
//getOffsetFromMysql:从数据库中查找分区的偏移量,并返回。
kafkaConsumer.seek(partition, getOffsetFromMysql(partition));
}
}
}
// 订阅主题,可以在 subscribe 传入一个正则表达式,匹配多个主题,即 consumer.subscribe("test.*")
kafkaConsumer.subscribe(Collections.singleton("CustomerCountry"), new HandleRebalance());
可以通过 kafkaConsumer.wakeup() 方法安全退出 poll(),并抛出 WakeupException 异常,或者如果调用 kafkaConsumer.wakeup() 没有等待轮询,那么异常将下下一轮调用 poll() 时抛出。我们不需要处理该异常,因为它只是跳出循环的一种方式。
反序列器
自定义反序列器
public class CustomerDeseralizer implements Deserializer<CustomerBean> {
@Override
public void configure(Map<String, ?> map, boolean b) {
// 不做任何配置
}
@Override
public CustomerBean deserialize(String s, byte[] bytes) {
int id;
int nameSize;
String name;
try {
if (bytes == null){
return null;
}
if (bytes.length < 8){
throw new SerializationException("字节数组长度过短,数据错误");
}
ByteBuffer buffer = ByteBuffer.wrap(bytes);
id = buffer.getInt();
nameSize = buffer.getInt();
byte[] nameBytes = new byte[nameSize];
buffer.get(nameBytes);
name = new String(nameBytes, "utf-8");
return new CustomerBean(id, name);
} catch (Exception e){
e.printStackTrace();
return null;
}
}
@Override
public void close() {
}
}
avro 反序列器
public class ConsumerByAvro {
public static void main(String[] args) {
Init init = new Init();
Properties props = init.kafkaConsumerInit();
String schemaStr="{"
+ "\"type\":\"record\","
+ "\"name\":\"Student\","
+ "\"fields\":["
+ " { \"name\":\"name\", \"type\":\"string\" },"
+ " { \"name\":\"age\", \"type\":\"int\" }"
+ "]}";
Producer<String, byte[]> producer = new KafkaProducer<>(props);
KafkaConsumer<String ,byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
consumer.subscribe(Collections.singletonList("CustomerCountry"));
SpecificDatumReader<Student> reader = new SpecificDatumReader<>(Student.getClassSchema());
try {
while (true){
ConsumerRecords<String,byte[]> records = consumer.poll(10);
for(ConsumerRecord<String,byte[]> record : records){
Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
Student student = null;
try {
student = reader.read(null,decoder);
System.out.println(student);
} catch (IOException e) {
e.printStackTrace();
}
}
}
} finally {
consumer.close();
}
}
}
独立消费者
List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");
List<TopicPartition> partitioners = new ArrayList<>();
for (PartitionInfo partitionInfo:partitionInfos){
partitioners.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
// 知道需要哪些分区之后调动 assign 方法。
consumer.assign(partitioners);
while (true){
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (ConsumerRecord<String, byte[]> record : records){
String printStr = String.format("topic=%s, partition=%s, offset=%s, customer=%s, country=%s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
System.out.println(printStr);
consumer.commitAsync();
}
}