反序列化是相对于序列化而言的,对于kafka而言我们举一个例子
- 序列化过程:kafka生产者将User对象序列化为JSON格式,进而转成二进制byte[]用于网络传输
- 反序列化过程:kafka消费者得到byte[],反序列化为JSON,进而通过JSON得到User对象。
所以要学习本节的内容,我们有必要回顾一下本专栏的《生产者客户端-自定义序列化器》再开始下面的学习
一、消费者反序列化接口
消费者反序列化接口如下,如果我们需要实现自定义数据格式的反序列化,需要定义一个类实现该接口。
package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map;
import org.apache.kafka.common.header.Headers;
public interface Deserializer<T> extends Closeable {
/**
* 参数configs会传入消费者配置参数,
* 反序列化器实现类可以根据消费者参数配置影响序列化逻辑
* isKey布尔型,表示当前反序列化的对象是不是消息的key,如果不是key就是value
*/
default void configure(Map<String, ?> configs, boolean isKey) {
}
//核心反序列化函数,将二进制数组转成T类对象
T deserialize(String topic, byte[] var2);
default T deserialize(String topic, Headers headers, byte[] data) {
return this.deserialize(topic, data);
}
default void close() {
}
}
二、使用Jackson反序列化对象
在《生产者客户端-自定义序列化器》中我们使用jackson序列化User对象为二进制数组,这里我们将其反序列化回来,即:将二进制数组转换成User对象。
public class MyConsumerDeserializer implements Deserializer<User> {
@Override
public User deserialize(String s, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(bytes,User.class);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
三、指定消费者反序列化器
使用消费者参数设置反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyConsumerDeserializer.class.getName());
完整的消费者代码如下:
public class JsonConsumer{
private final Properties props;
JsonConsumer(){
props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.111:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "JsonConsumerGroup"); //消费者组名称有意义即可,随便起名
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyConsumerDeserializer.class.getName());
}
public void pollData(){
//1.创建消费者
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
//2.订阅Topic
consumer.subscribe(Collections.singletonList("json-test"));
try {
while (true) {
//循环拉取数据,
//Duration超时时间,如果有数据可消费,立即返回数据
// 如果没有数据可消费,超过Duration超时时间也会返回,但是返回结果数据量为0
ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, User> record : records) {
dealRecord(record);
}
}
} finally {
//退出应用程序前使用close方法关闭消费者,
// 网络连接和socket也会随之关闭,并立即触发一次再均衡(再均衡概念后续章节介绍)
consumer.close();
}
}
//针对单条数据进行处理
private void dealRecord(ConsumerRecord<String, User> record) {
System.out.println(record.value().toString());
}
public static void main(String[] args) {
JsonConsumer myConsumer = new JsonConsumer();
myConsumer.pollData();
}
}