版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_38038143/article/details/88872281
前言
继上篇《 04-Kafka生成者 – 使用Avro 序列化值写入消息(使用Confluent 注册表)》使用Avro序列化生产消息,该篇博客使用Avro 反序列化读取消息。
环境:
Kafka-2.1.1 + Kafka 集群 + Eclipse
GitHub:
https://github.com/GYT0313/Kafka-Learning
1. 注册表
上篇博客能够成功写入消息,但在写入消息时
2. 代码
ReadMessageAvroDeserializer.java
package consumer_read;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ReadMessageAvroDeserializer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
// Properties 对象
Properties props = new Properties();
props.put("bootstrap.servers", "slave1:9092,slave2:9092,slave3:9092");
props.put("group.id", "AvroTest"); // 消费者群组
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://master:8081");
// consumer 对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("AvroTest")); // 支持订阅多个主题,也支持正则
try {
// 设置分区开头读取, 0表示立立即返回,无需等待
consumer.seekToBeginning(consumer.poll(0).partitions());
while (true) {
// 0.1s 的轮询等待
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println(records.count());
for (ConsumerRecord<String, String> record : records) {
// 输出到控制台
System.out.printf("offset = %sd, value = %s\n",
record.offset(), record.value());
}
// 同步提交偏移量
consumer.commitSync();
Thread.sleep(500);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
先使用SendMessageAvro.java(点击见代码),发送了5000条消息。
然后运行,结果如下:
上面代码使用 record.value() 打印出的是全部字符串。在包含多个键值对的值中,或许我们会需要某些值,因此这样会比较麻烦。
所以,这里介绍另外一种获取值的方法(GenericRecord):
定义KafkaConsumer<String, GenericRecord>
ReadMessageAvroDeserializerGenericRecord.java(代码见GitHub)
运行结果: