Avro是一种与编程语言无关的序列化格式
丰富的数据结构
紧凑快速的二进制数据格式
提供容器文件,用来持久化数据
远程过程调用
与动态语言充分集成,代码生成不需要读写数据文件,也不需要实现RPC协议
avro依靠schema
1. 增加pom.xml依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.0</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
2. 定义schema文件
2.1 People.avsc
{
"namespace": "com.nq",
"type": "record",
"name": "People",
"fields" : [
{"name": "name", "type":"string"},
{"name": "age", "type":"int"},
{"name": "hasHouse", "type":"boolean"},
{"name": "children","type":"string" }
]
}
2.2 运行mvn avro:schema
生成的java类在项目文件在/target/generated-sources/avro/com/nq/People.java
2.3 复制到自己需要的地方
此外还可以下载avro-tools 生成实体类
3. kafka生产者
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;
public class AvroProducer {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
String topic = "test-vip";
// 改成自己的
props.put("bootstrap.servers", "kafka-node01:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
ByteArrayOutputStream out = new ByteArrayOutputStream();
SpecificDatumWriter<People> datumWriter = new SpecificDatumWriter<>(People.class);
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
for (int i = 0; i < 1000; i++) {
out.reset();
People people = new People();
people.setName("达拉崩吧---" + i);
people.setAge(i);
people.setChildren("chilren===" + i);
people.setHasHouse(i % 2 == 0);
datumWriter.write(people, encoder);
encoder.flush();
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, "vip-" + i, out.toByteArray());
producer.send(record);
}
out.close();
producer.close();
}
}
4. kafka消费者
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class AvroConsumer {
public static void main(String[] args) {
Properties props = new Properties();
String topic = "test-vip";
// 改成自己的
props.put("bootstrap.servers", "kafka-node01:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("group.id", "avro-test");
props.put("auto.offset.reset","latest");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
SpecificDatumReader<People> datumReader = new SpecificDatumReader<>(People.getClassSchema());
consumer.subscribe(Collections.singletonList(topic));
try {
while (true){
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord<String, byte[]> record : records) {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
People people = null;
try {
people = datumReader.read(null, decoder);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("key: " + record.key()+"\t" + people);
}
}
} finally {
consumer.close();
}
}
}