kafka学习(三)——consumer的简单实例

前言

前两篇博客简单介绍了kafka的producer和admin的api操作,这一篇继续总结一下kafka的consumer的api操作,依旧参照kafka的官网

consumer简单实例

offset简介

在熟悉consumer实例之前,需要对offset有一个了解。与RabbitMQ不同的是,kafka中的consumer是通过pull的方式去kafka拉取消息进行消费。之前的博客中总结过,kafka的消息存放于partition中。而所谓的offset就是表示消费者读取下一条待消费消息的偏移量,在consumer消费完该offset指定的消息之后,会有一个提交的操作,告知kafka自己已经消费该条消息。

一个简单的示意图如下。

在这里插入图片描述

最简单的实例

根据官网中,有一个最简单的实例,依旧是注入Porperties对象。这是一个最简单的consumer的实例,设定配置属性enable.auto.commit为true,表示自动提交告知kafka已经消费完该消息。

/**
 * 简单的helloworld,但是这里没有考虑消费失败的情况,如果消费失败,offset并不会重置
 */
private static void helloworld(){
    
    
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", LOCAL_KAFKA_ADDRESS);//地址
    props.setProperty("group.id", "test");//设置consumer的组
    props.setProperty("enable.auto.commit", "true");//设置为自动提交
    props.setProperty("auto.commit.interval.ms", "1000");//自动提交间隔时间
    //key value的序列化
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String,String> consumer = new KafkaConsumer(props);
    // 消费订阅哪一个Topic或者几个Topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while (true) {
    
    
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));//kafka实质是以pull的方式去kafka拉取消息消费
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(),record.offset(), record.key(), record.value());
    }
}

手动提交

自动提交很简单,简单实例中的代码中,实例化consumer的时候指定了一个时间间隔。consumer在指定的时间间隔之后,会重新去kafka读取下一条待消费的消息。如果在这个间隔之内,客户端依旧没有完成消息的处理,则consumer依旧会自动提交,将该消息标记为处理。这在生产中一般不采用这种方式,而是采用手动提交。

代码上改动不多,只需要将上述的while循环改成如下所示

props.setProperty("enable.auto.commit", "false");//将自动提交设置为false
while (true) {
    
    
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
    for (ConsumerRecord<String, String> record : records) {
    
    
        // 消费数据,成功就成功,不成功...也不影响,指导消费完所有数据,再手动提交
        // TODO 这里模拟消息消费的操作,用sout替代。
        System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                record.partition(), record.offset(), record.key(), record.value());
    }

    // 如果成功,手动通知offset提交
    consumer.commitAsync();
}

关于consumer的分组

消费者可以构建组的,但是只要group名称一样,就是一个组的。

单个partition的消息只能由group中的某一个consumer消费。如果partition的个数小于consumer的个数,则会存在consumer闲置。简而言之不能多个consumer消费同一个partition。

如下图所示,下图中标红的消费方式,是不被允许的。

consumer从partition中消费信息是顺序消费的,默认是从头开始消费。

扫描二维码关注公众号,回复: 13238509 查看本文章

单个consumer的分组,会消费所有partition中的数据。比如:有三个consumer,订阅了一个topic的消息,则会默认去消费这个topic下所有的partition的消息。

consumer以partition为粒度进行提交

之前的两个消费实例都是以consumer中topic为单位进行的消息提交,如果想单独以partition为单位进行消息确认提交,可以参考如下代码

实质也没什么修改,只是增加了一层内嵌的遍历partition的循环操作。最后的提交操作放到了内充循环代码中,同时在提交的时候指定了偏移量offset,需要注意的是kafka服务端的offset保留的是下一次消费消息的起始位置,因此我们提交确认消息的时候,是需要+1的。

/**
 * 以partition为单位进行消息确认提交
 */
public static void commitOffsetInPartition(){
    
    
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", LOCAL_KAFKA_ADDRESS);
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "false");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer(props);

    //消费订阅一个topic或者多个topic,这里任然是订阅的topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
    
    
        //取出这个topic下的数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        // 针对每个partition单独处理
        for(TopicPartition topicPartition:records.partitions()){
    
    
            //records这个方法可以传递Topic,也可以传递partition
            List<ConsumerRecord<String, String>> partitionRecord = records.records(topicPartition);
            for(ConsumerRecord<String,String> record:partitionRecord){
    
    
                System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                        record.partition(), record.offset(), record.key(), record.value());
            }
            long lastOffset = partitionRecord.get(partitionRecord.size()-1).offset();

            //单个partition中的offset,并且提交
            Map<TopicPartition,OffsetAndMetadata> offsetAndMetadata = new HashMap<>();

            //这里需要注意下,kafka服务器记录的offset是我们下次消费的起点,因此要加一
            offsetAndMetadata.put(topicPartition,new OffsetAndMetadata(lastOffset+1));
            //提交offset
            consumer.commitSync(offsetAndMetadata);
            System.out.println("=============partition - "+ topicPartition +" end================");
        }
    }
}

consumer以partition为粒度订阅信息

以上事例,consumer均是以topic为单位订阅信息,如果想只订阅某个partition,可以做出如下修改

之前的订阅方式

consumer.subscribe(Arrays.asList(TOPIC_NAME));//直接以topic为单位进行订阅

只是订阅指定的partition

KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
TopicPartition topicPartitionZero = new TopicPartition(TOPIC_NAME,0);
TopicPartition topicPartitionOne = new TopicPartition(TOPIC_NAME,1);

//消费订阅一个topic中的一个或者多个partition,这里只定义了一个partition——topicPartitionOne
consumer.assign(Arrays.asList(topicPartitionOne));

基本的consumer的使用,上述操作是没问题了。

consumer对offset的手动控制

在某些应用场景中,如果我们确实需要重复消费某些数据,则可以通过手动控制offset的方式来实现,kafka与其他消息中间件不同,消息并不会丢失,而是会一直存在。如果想要重复消费,则只需要手动将offset的位置指向特定的位置即可

/**
 * 手动控制offset
 */
public static void controlOffsetPosition(){
    
    
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", LOCAL_KAFKA_ADDRESS);
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "false");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
      TopicPartition topicPartitionZero = new TopicPartition(TOPIC_NAME,0);
    TopicPartition topicPartitionOne = new TopicPartition(TOPIC_NAME,1);

      //consumer.subscribe(Arrays.asList(TOPIC_NAME));
    //消费订阅一个topic中的一个或者多个partition,这里只定义了一个partition
    consumer.assign(Arrays.asList(topicPartitionOne));
    while(true){
    
    
        /**
		 * 手动指定offset的位置	通常用与有些数据需要重复消费的场景。
         * 实际操作中一般是:
         * 1、第一次从0开始消费消息
         * 2、比如一次消费了100条,将offset置为101,将该offset数据存入Redis
         * 3、每次poll之前,从Redis中获取最新的offset的位置
         * 4、每次从这个位置开始消费
         * 主要有两种场景:1、人为控制offset起始位置,2、如果出现程序错误,需要重复消费,则手动控制offset位置重复消费一次
         */

        //手动指定offset的起始位置。//关于这个seek方法还有很多重载的操作,可以直接将offset指向首位,也可以直接将offset指向末位,都是可以的详情参考kafka官网即可。
        consumer.seek(topicPartitionOne,100);
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        // 针对每个partition单独处理
        for(TopicPartition topicPartition:records.partitions()){
    
    
            List<ConsumerRecord<String, String>> partitionRecord = records.records(topicPartition);
            for(ConsumerRecord<String,String> record:partitionRecord){
    
    
                System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                        record.partition(), record.offset(), record.key(), record.value());
            }
            long lastOffset = partitionRecord.get(partitionRecord.size()-1).offset();

            //单个partition中的offset,并且提交
            Map<TopicPartition,OffsetAndMetadata> offsetAndMetadata = new HashMap<>();

            //这里需要注意下,服务器记录的offset是我们下次消费的起点,因此要加一
            offsetAndMetadata.put(topicPartition,new OffsetAndMetadata(lastOffset+1));
            //提交offset
            consumer.commitSync(offsetAndMetadata);
            System.out.println("=============partition - "+ topicPartition +" end================");
        }
    }
}

consumer 多线程并发处理

consumer并不是线程安全的,如果在多线程环境下使用consumer,需要自行解决consumer的线程安全问题。通常针对consumer的多线程并发处理,有两种方式

第一种,直接上图吧(源自某课网的介绍)

之前提起过,同一个partition不能被多个consumer消费,如果consumer的个数大于partition,则多余的consumer会闲置。这里的多线程其实就是多个consumer的意思,尽量的将consumer个数设置的与partition一致即可。

在这里插入图片描述

实例代码

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * autor:liman
 * createtime:2021/3/13
 * comment:consumer 多线程简单实例
 * kafka的consumer不是线程安全的,需要自己解决线程安全的问题
 * 因此我们每个线程中,都实例化了一个KafaConsumer
 */
@Slf4j
public class ConsumerThreadSample {
    
    

    public final static String TOPIC_NAME = "client_hello_topic";
    public final static String LOCAL_KAFKA_ADDRESS = "127.0.0.1:9092";


    /**
     * 这种类型是经典模式,是每一个线程单独创建一个KafkaConsumer,用于保证线程安全
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
    
    
		//即使这个实例中开启了多个consumer的线程,最终我们只订阅了两个partition的数据。最多只会有两个consumer在工作
        for(int i=0;i<5;i++) {
    
    
            KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner();
            Thread threadOne = new Thread(kafkaConsumerRunner,"SelfThread-"+i);
            threadOne.start();
            Thread.sleep(1000);

            kafkaConsumerRunner.shutdown();
        }

    }

    /**
     * consumer的线程
     */
    public static class KafkaConsumerRunner implements Runnable{
    
    

        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;

        public KafkaConsumerRunner() {
    
    
            Properties props = new Properties();
            props.put("bootstrap.servers", LOCAL_KAFKA_ADDRESS);
            props.put("group.id", "test");
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            consumer = new KafkaConsumer<>(props);

            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

            consumer.assign(Arrays.asList(p0,p1));
        }

        @Override
        public void run() {
    
    
            try{
    
    
                while(!closed.get()){
    
    
                    //处理消息
                    ConsumerRecords<String,String> consumerRecord = consumer.poll(Duration.ofMillis(10000));
                    String threadName = Thread.currentThread().getName();
                    for(TopicPartition topicPartition:consumerRecord.partitions()){
    
    
                        List<ConsumerRecord<String, String>> partitionRecord = consumerRecord.records(topicPartition);
                        //处理每一个分区的数据
                        for(ConsumerRecord<String,String> record:partitionRecord){
    
    
                            System.out.printf("threadName = %s,patition = %d , offset = %d, key = %s, value = %s%n",threadName,
                                    record.partition(),record.offset(), record.key(), record.value());
                        }
                        // 返回去告诉kafka新的offset
                        long lastOffset = partitionRecord.get(partitionRecord.size() - 1).offset();
                        // 注意加1
                        consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset + 1)));
                    }
                }
            }catch (WakeupException e){
    
    
                if(!closed.get()){
    
    
                    throw e;
                }
            }finally {
    
    
                consumer.close();
            }
        }

        public void shutdown() {
    
    
            closed.set(true);
            consumer.wakeup();
        }
    }
}

第二种,consumer只是去获取数据,将获取到的数据交给其他线程去处理(这一点和netty很像)。
在这里插入图片描述

实例代码

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * autor:liman
 * createtime:2021/3/13
 * comment: 类似于netty的那种多线程方式,对partition的管控力度没有那么强,但是这种方式比较适用于流处理
 *
 */
@Slf4j
public class ConsumerRecordThreadSample {
    
    

    public final static String TOPIC_NAME = "client_hello_topic";
    public final static String LOCAL_KAFKA_ADDRESS = "127.0.0.1:9092";

    public static void main(String[] args) throws InterruptedException {
    
    
        String brokerList = LOCAL_KAFKA_ADDRESS;
        String groupId = "test";
        int workNum = 5;

        ConsumerExecutor consumerExecutor = new ConsumerExecutor(brokerList,groupId,TOPIC_NAME);
        consumerExecutor.execute(workNum);

        Thread.sleep(1000000);
        consumerExecutor.shutdown();
    }

    public static class ConsumerExecutor{
    
    
        private final KafkaConsumer<String,String> consumer;
        private ExecutorService executors;

        //consumer的一些初始化操作
        public ConsumerExecutor(String brokerList,String groupId,String topic){
    
    
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", groupId);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
        }

        public void execute(int workerNum){
    
    
            //这里初始化线程池
            executors = new ThreadPoolExecutor(workerNum,workerNum,0L, TimeUnit.MICROSECONDS
                    ,new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());

            while(true){
    
    
				//这里只是利用consumer去拉取数据,将数据交给线程池中的线程去处理
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(20000));
                for (final ConsumerRecord record : records) {
    
    
                    executors.submit(new ConsumerRecordWorker(record));
                }
            }
        }

        public void shutdown() {
    
    
            if (consumer != null) {
    
    
                consumer.close();
            }
            if (executors != null) {
    
    
                executors.shutdown();
            }
            try {
    
    
                if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
    
    
                    System.out.println("Timeout.... Ignore for this case");
                }
            } catch (InterruptedException ignored) {
    
    
                System.out.println("Other thread interrupted this shutdown, ignore for this case.");
                Thread.currentThread().interrupt();
            }
        }
    }

	/**
		真正处理数据的线程
	*/
    public static class ConsumerRecordWorker implements Runnable{
    
    

        private ConsumerRecord<String, String> record;

        public ConsumerRecordWorker(ConsumerRecord record) {
    
    
            this.record = record;
        }

        @Override
        public void run() {
    
    
            // 假如说数据入库操作
            String threadName="Thread - "+ Thread.currentThread().getName();
            log.error("thread-name:{}",threadName);
            System.out.println();
            System.err.printf("ThreadName = %s ,patition = %d , offset = %d, key = %s, value = %s%n",threadName,
                    record.partition(), record.offset(), record.key(), record.value());
        }
    }

}

关于consumer限流的方式

关于consumer,其实还有一个限流的场景需要说明一下,前面已经说过,consumer可以针对特定的partition进行消费。可以引入令牌的方式进行限流,consumer如果想要正常消费数据,必须先从令牌桶中获取令牌,如果成功获取令牌,则开始消费数据,没有成功则限制其消费,达到限流的目的。

简单实例

TopicPartition topicPartitionZero = new TopicPartition(TOPIC_NAME,0);
TopicPartition topicPartitionOne = new TopicPartition(TOPIC_NAME,1);

//消费订阅一个topic中的一个或者多个partition,这里只定义了一个partition
consumer.assign(Arrays.asList(topicPartitionZero,topicPartitionOne));
long totalNum =40l;//阈值,超过这个进行限流
while(true){
    
    

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
    // 针对每个partition单独处理
    for(TopicPartition topicPartition:records.partitions()){
    
    
        long num = 0l;
        List<ConsumerRecord<String, String>> partitionRecord = records.records(topicPartition);
        for(ConsumerRecord<String,String> record:partitionRecord){
    
    
            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());

            /**
             * 实际开发中的操作
             * 1、接收到record信息以后,去令牌桶中拿取令牌
             * 2、如果获取到令牌,则继续业务处理
             * 3、如果获取不到令牌, 则pause等待令牌
             * 4、当令牌桶中的令牌足够, 则将consumer置为resume状态
             */

            num++;
            if(record.partition() == 0){
    
    //如果partition0中的消费个数超过阈值,则停止partition0,只保留partition1的数据消费
                if(num >= totalNum){
    
    
                    log.info("暂停partition 0");
                    consumer.pause(Arrays.asList(topicPartitionZero));
                }
            }

            if(record.partition() == 1){
    
    //如果partition1中的消费个数超过阈值,则唤醒之前停止的partition0
                if(num >= totalNum){
    
    
                    log.info("唤醒partition 0");
                    consumer.resume(Arrays.asList(topicPartitionZero));
                }
            }

        }
        long lastOffset = partitionRecord.get(partitionRecord.size()-1).offset();

        //单个partition中的offset,并且提交
        Map<TopicPartition,OffsetAndMetadata> offsetAndMetadata = new HashMap<>();

        //这里需要注意下,服务器记录的offset是我们下次消费的起点,因此要加一
        offsetAndMetadata.put(topicPartition,new OffsetAndMetadata(lastOffset+1));
        //提交offset
        consumer.commitSync(offsetAndMetadata);
        num--;
        System.out.println("=============partition - "+ topicPartition +" end================");
    }
}

总结

简单的总结了一下consumer的操作。重点需要关注consumer的多线程模式以及限流的操作,这个应该是面试中的高频问点。

猜你喜欢

转载自blog.csdn.net/liman65727/article/details/115267565