【kafka专栏】集成apache kafka-clients实现数据消费者

现在我们通过apache kafka的api方式来一步一步的实现kafka消费者,首先前提还是在我们的项目里面已经引入了下面的maven 坐标。

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.6.0</version>
</dependency>

一、初始化消费者配置

在kafka消费者的基础配置中,下面几个信息是必须给出的。kafka集群连接信息、该消费者属于哪个消费者组,以及消息数据key、value反序列化的格式。反序列化的格式要与序列化的格式一一对应,不能生产者序列化格式是JSON,消费者反序列化的格式是String,二者要对应统一。

public class MyConsumer {
    
    
    private final Properties props;

    MyConsumer(){
    
    
        props = new Properties();
        //kafka集群信息
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.10:9092");
        //消费者组名称
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "zimug_group");
        //key的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //value的反序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    }
}

二、拉取数据并实现数据处理逻辑

消费者数据消费代码的通用模板,下文中

  • consumer.subscribe决定了消费者订阅哪个主题下的数据进行消费
  • 循环不断拉取kafka broker消息代理服务端的数据
  • 下文的dealRecord方法,是针对单条数据进行处理,这里我们只是简单的打印出来。如果你是真是应用,应该有具体的数据处理业务。
public void pollData(){
    
    
    //1.创建消费者
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    //2.订阅Topic
    consumer.subscribe(Collections.singletonList("producer_test"));
    //支持正则表达式,订阅所有与test相关的Topic
    //consumer.subscribe("test.*");
    
    try {
    
    
        while (true) {
    
    
            //循环拉取数据,
            //Duration超时时间,如果有数据可消费,立即返回数据
            // 如果没有数据可消费,超过Duration超时时间也会返回,但是返回结果数据量为0
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
            for (ConsumerRecord<String, String> record : records) {
    
    
                dealRecord(record);
            }
        }
    } finally {
    
    
        //退出应用程序前使用close方法关闭消费者,
        // 网络连接和socket也会随之关闭,并立即触发一次再均衡(再均衡概念后续章节介绍)
        consumer.close();
    }
}

//针对单条数据进行处理,此方法中应该做好异常处理,避免外围的while循环因为异常中断。
private void dealRecord(ConsumerRecord<String, String> record) {
    
    
    System.out.println("topic:" + record.topic()
            + ",partition:" + record.partition()
            + ",offset:" + record.offset() 
            + ",key:" + record.key() 
            + ",value" + record.value());
}

通过下面的代码,可以启动消费者进行循环数据消费。

public static void main(String[] args) {
    
    
    MyConsumer myConsumer = new MyConsumer();
    myConsumer.pollData();
}

猜你喜欢

转载自blog.csdn.net/hanxiaotongtong/article/details/125588706