背景:上篇文章已经说明confluent schema registry优点及如何实现。本文实现kafka confluent schema registry 一个topic多个不同结构的表消费需求
上篇文章:kafka Confluent Schema Registry 简单实践_温柔的小才的博客-CSDN博客
第一步
说明:在上篇文章基础上做修改。首先在原有topic下注册多个schema(这里注册两个做示范)。
#进入kafka的目录下执行,启动kafka
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
#注:kafka集群中每个节点kafka都需要启动
#进入confluent的目录下执行,启动Conflfluent Schema Registry
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
#test-topic5下注册第一个schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"id\": \"id\", \"type\": \"int\"}, {\"time\": \"time\", \"type\": \"int\"}]}"}' \
http://hadoop01:8081/subjects/test-topic5-value/versions
#test-topic5下注册第二个schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id_num\", \"type\": \"int\"}, {\"name\": \"time\", \"type\": \"string\"}, {\"name\": \"number\", \"type\": \"int\"}]}"}' \
http://hadoop01:8081/subjects/test-topic5-value/versions
这样就在 http://hadoop01:8081/subjects/test-topic5-value/versions(这里是上面语句的网址,记得替换为自己节点的,语句中hadoop01请用自己节点ip替换)下注册了两个schema,可以用id查看。
查看id为1的网址:http://hadoop01:8081/subjects/test-topic5-value/versions/1
查看id为2的网址:http://hadoop01:8081/subjects/test-topic5-value/versions/2
说明可以注册多个schema。
第二步
需要改一个配置:将compability设置为NONE
#将compability设置为NONE
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://hadoop01:8081/config
更改原因:注册新架构时,架构注册表服务器可以强制执行某些兼容性规则。当前,支持以下兼容性规则。
向后兼容(默认):如果新架构可用于读取所有先前架构中写入的数据,则该架构向后兼容。向后兼容性对于将数据加载到Hadoop等系统中很有用,因为人们始终可以使用最新架构查询所有版本的数据。
前向兼容性: 如果所有先前的模式都可以读取以该模式编写的数据,则新模式是前向兼容的。前向兼容性对于只能处理特定版本(不一定总是最新版本)中的数据的消费者应用程序很有用。
完全兼容:如果新模式既向后兼容又向前兼容,则完全兼容。
不兼容:新模式可以是任何模式,只要它是有效的Avro。
第三步:
之前文章中的代码做一下修改,注意下四个脚本topic为同一个(主要更改都在代码后面加了注释)
生产者1号
public class product_kafka {
public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
"\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
"{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop01:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用Confluent实现的KafkaAvroSerializer
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// 添加schema服务的地址,用于获取schema
props.put("schema.registry.url", "http://hadoop01:8081");
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Random rand = new Random();
int id = 0;
while (id < 100) {
id++;
String name = "name" + id;
int age = rand.nextInt(40) + 1;
GenericRecord user = new GenericData.Record(schema);
user.put("id", id);
user.put("name", name);
user.put("age", age);
// ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("test-topic6",user); //修改前
ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("test-topic6","table_name1",user); //修改后
producer.send(record);
Thread.sleep(1000);
}
producer.close();
}
}
生产者2号
public class product_kafka2 {
public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
"\"fields\": [{\"name\": \"id_num\", \"type\": \"int\"}, " +
"{\"name\": \"time\", \"type\": \"string\"}, {\"name\": \"number\", \"type\": \"int\"}]}";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop01:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用Confluent实现的KafkaAvroSerializer
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// 添加schema服务的地址,用于获取schema
props.put("schema.registry.url", "http://hadoop01:8081");
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Random rand = new Random();
int id = 0;
while (id < 100) {
id++;
String name = "2022" + id;
int age = rand.nextInt(40) + 1;
GenericRecord user = new GenericData.Record(schema);
user.put("id_num", id);
user.put("time", name);
user.put("number", age);
// ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("test-topic6",user); //修改前
ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("test-topic6","table_name2",user);
producer.send(record);
Thread.sleep(1000);
}
producer.close();
}
}
消费者一号(消费的是id为1,即表名为tablename1,表结构为id,name,age的表数据)
public class consumer_kafka {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop01:9092");
props.put("group.id", "test1");
// props.put("group.id", "test1");//修改前,主要是因为kafka特性,为了同组数据消费冲突,消费者需要不同组,消费者1为test1,消费者2为test2
props.put("enable.auto.commit", "false");
// 配置禁止自动提交,每次从头消费供测试使用
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 使用Confluent实现的KafkaAvroDeserializer
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
// 添加schema服务的地址,用于获取schema
props.put("schema.registry.url", "http://hadoop01:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic6"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000).toMillis());
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
if (record.key().equals("table_name1")){ //结合生产者数据,加表判断。
System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "
+ "partition = " + record.partition() + ", " + "offset = " + record.offset());
}
}
}
} finally {
consumer.close();
}
}
}
消费者2号(消费的是id为2,即表名为tablename2,表结构为id_num,time,number的表数据)
public class consumer_kafka2 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop01:9092");
props.put("group.id", "test2");
// props.put("group.id", "test2");//修改前,主要是因为kafka特性,为了同组数据消费冲突,消费者需要不同组,消费者1为test1,消费者2为test2
props.put("enable.auto.commit", "false");
// 配置禁止自动提交,每次从头消费供测试使用
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 使用Confluent实现的KafkaAvroDeserializer
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
// 添加schema服务的地址,用于获取schema
props.put("schema.registry.url", "http://hadoop01:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic6"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000).toMillis());
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
if (record.key().equals("table_name2")){ //结合生产者数据,加表判断。
System.out.println("value = [user.id_num = " + user.get("id_num") + ", " + "user.time = "
+ user.get("time") + ", " + "user.number = " + user.get("number") + "], "
+ "partition = " + record.partition() + ", " + "offset = " + record.offset());
}
}
}
} finally {
consumer.close();
}
}
}
启动后的结果展示:
消费者1号:
消费者2号
报错及处理
1.Schema being registered is incompatible with an earlier schema
原因:注册的多个schema之间不兼容
解决方式:执行本文中第二步,将compability设置为NONE
#将compability设置为NONE
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://hadoop01:8081/config
2.org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
原因:没有启动 confluent schema registry
解决方式:执行第一步中的
#进入confluent的目录下执行,启动Conflfluent Schema Registry
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
3.其他情况:检查kafka集群是否启动,节点ip是否配置正确。