前言
前两篇博客简单介绍了kafka的producer和admin的api操作,这一篇继续总结一下kafka的consumer的api操作,依旧参照kafka的官网
consumer简单实例
offset简介
在熟悉consumer实例之前,需要对offset有一个了解。与RabbitMQ不同的是,kafka中的consumer是通过pull的方式去kafka拉取消息进行消费。之前的博客中总结过,kafka的消息存放于partition中。而所谓的offset就是表示消费者读取下一条待消费消息的偏移量,在consumer消费完该offset指定的消息之后,会有一个提交的操作,告知kafka自己已经消费该条消息。
一个简单的示意图如下。
最简单的实例
根据官网中,有一个最简单的实例,依旧是注入Porperties对象。这是一个最简单的consumer的实例,设定配置属性enable.auto.commit
为true,表示自动提交告知kafka已经消费完该消息。
/**
* 简单的helloworld,但是这里没有考虑消费失败的情况,如果消费失败,offset并不会重置
*/
private static void helloworld(){
Properties props = new Properties();
props.setProperty("bootstrap.servers", LOCAL_KAFKA_ADDRESS);//地址
props.setProperty("group.id", "test");//设置consumer的组
props.setProperty("enable.auto.commit", "true");//设置为自动提交
props.setProperty("auto.commit.interval.ms", "1000");//自动提交间隔时间
//key value的序列化
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer(props);
// 消费订阅哪一个Topic或者几个Topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));//kafka实质是以pull的方式去kafka拉取消息消费
for (ConsumerRecord<String, String> record : records)
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(),record.offset(), record.key(), record.value());
}
}
手动提交
自动提交很简单,简单实例中的代码中,实例化consumer的时候指定了一个时间间隔。consumer在指定的时间间隔之后,会重新去kafka读取下一条待消费的消息。如果在这个间隔之内,客户端依旧没有完成消息的处理,则consumer依旧会自动提交,将该消息标记为处理。这在生产中一般不采用这种方式,而是采用手动提交。
代码上改动不多,只需要将上述的while循环改成如下所示
props.setProperty("enable.auto.commit", "false");//将自动提交设置为false
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
// 消费数据,成功就成功,不成功...也不影响,指导消费完所有数据,再手动提交
// TODO 这里模拟消息消费的操作,用sout替代。
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
// 如果成功,手动通知offset提交
consumer.commitAsync();
}
关于consumer的分组
消费者可以构建组的,但是只要group名称一样,就是一个组的。
单个partition的消息只能由group中的某一个consumer消费。如果partition的个数小于consumer的个数,则会存在consumer闲置。简而言之不能多个consumer消费同一个partition。
如下图所示,下图中标红的消费方式,是不被允许的。
consumer从partition中消费信息是顺序消费的,默认是从头开始消费。
单个consumer的分组,会消费所有partition中的数据。比如:有三个consumer,订阅了一个topic的消息,则会默认去消费这个topic下所有的partition的消息。
consumer以partition为粒度进行提交
之前的两个消费实例都是以consumer中topic为单位进行的消息提交,如果想单独以partition为单位进行消息确认提交,可以参考如下代码
实质也没什么修改,只是增加了一层内嵌的遍历partition的循环操作。最后的提交操作放到了内充循环代码中,同时在提交的时候指定了偏移量offset,需要注意的是kafka服务端的offset保留的是下一次消费消息的起始位置,因此我们提交确认消息的时候,是需要+1的。
/**
* 以partition为单位进行消息确认提交
*/
public static void commitOffsetInPartition(){
Properties props = new Properties();
props.setProperty("bootstrap.servers", LOCAL_KAFKA_ADDRESS);
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
//消费订阅一个topic或者多个topic,这里任然是订阅的topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//取出这个topic下的数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
// 针对每个partition单独处理
for(TopicPartition topicPartition:records.partitions()){
//records这个方法可以传递Topic,也可以传递partition
List<ConsumerRecord<String, String>> partitionRecord = records.records(topicPartition);
for(ConsumerRecord<String,String> record:partitionRecord){
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = partitionRecord.get(partitionRecord.size()-1).offset();
//单个partition中的offset,并且提交
Map<TopicPartition,OffsetAndMetadata> offsetAndMetadata = new HashMap<>();
//这里需要注意下,kafka服务器记录的offset是我们下次消费的起点,因此要加一
offsetAndMetadata.put(topicPartition,new OffsetAndMetadata(lastOffset+1));
//提交offset
consumer.commitSync(offsetAndMetadata);
System.out.println("=============partition - "+ topicPartition +" end================");
}
}
}
consumer以partition为粒度订阅信息
以上事例,consumer均是以topic为单位订阅信息,如果想只订阅某个partition,可以做出如下修改
之前的订阅方式
consumer.subscribe(Arrays.asList(TOPIC_NAME));//直接以topic为单位进行订阅
只是订阅指定的partition
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
TopicPartition topicPartitionZero = new TopicPartition(TOPIC_NAME,0);
TopicPartition topicPartitionOne = new TopicPartition(TOPIC_NAME,1);
//消费订阅一个topic中的一个或者多个partition,这里只定义了一个partition——topicPartitionOne
consumer.assign(Arrays.asList(topicPartitionOne));
基本的consumer的使用,上述操作是没问题了。
consumer对offset的手动控制
在某些应用场景中,如果我们确实需要重复消费某些数据,则可以通过手动控制offset的方式来实现,kafka与其他消息中间件不同,消息并不会丢失,而是会一直存在。如果想要重复消费,则只需要手动将offset的位置指向特定的位置即可
/**
* 手动控制offset
*/
public static void controlOffsetPosition(){
Properties props = new Properties();
props.setProperty("bootstrap.servers", LOCAL_KAFKA_ADDRESS);
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
TopicPartition topicPartitionZero = new TopicPartition(TOPIC_NAME,0);
TopicPartition topicPartitionOne = new TopicPartition(TOPIC_NAME,1);
//consumer.subscribe(Arrays.asList(TOPIC_NAME));
//消费订阅一个topic中的一个或者多个partition,这里只定义了一个partition
consumer.assign(Arrays.asList(topicPartitionOne));
while(true){
/**
* 手动指定offset的位置 通常用与有些数据需要重复消费的场景。
* 实际操作中一般是:
* 1、第一次从0开始消费消息
* 2、比如一次消费了100条,将offset置为101,将该offset数据存入Redis
* 3、每次poll之前,从Redis中获取最新的offset的位置
* 4、每次从这个位置开始消费
* 主要有两种场景:1、人为控制offset起始位置,2、如果出现程序错误,需要重复消费,则手动控制offset位置重复消费一次
*/
//手动指定offset的起始位置。//关于这个seek方法还有很多重载的操作,可以直接将offset指向首位,也可以直接将offset指向末位,都是可以的详情参考kafka官网即可。
consumer.seek(topicPartitionOne,100);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
// 针对每个partition单独处理
for(TopicPartition topicPartition:records.partitions()){
List<ConsumerRecord<String, String>> partitionRecord = records.records(topicPartition);
for(ConsumerRecord<String,String> record:partitionRecord){
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = partitionRecord.get(partitionRecord.size()-1).offset();
//单个partition中的offset,并且提交
Map<TopicPartition,OffsetAndMetadata> offsetAndMetadata = new HashMap<>();
//这里需要注意下,服务器记录的offset是我们下次消费的起点,因此要加一
offsetAndMetadata.put(topicPartition,new OffsetAndMetadata(lastOffset+1));
//提交offset
consumer.commitSync(offsetAndMetadata);
System.out.println("=============partition - "+ topicPartition +" end================");
}
}
}
consumer 多线程并发处理
consumer并不是线程安全的,如果在多线程环境下使用consumer,需要自行解决consumer的线程安全问题。通常针对consumer的多线程并发处理,有两种方式
第一种,直接上图吧(源自某课网的介绍)
之前提起过,同一个partition不能被多个consumer消费,如果consumer的个数大于partition,则多余的consumer会闲置。这里的多线程其实就是多个consumer的意思,尽量的将consumer个数设置的与partition一致即可。
实例代码
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* autor:liman
* createtime:2021/3/13
* comment:consumer 多线程简单实例
* kafka的consumer不是线程安全的,需要自己解决线程安全的问题
* 因此我们每个线程中,都实例化了一个KafaConsumer
*/
@Slf4j
public class ConsumerThreadSample {
public final static String TOPIC_NAME = "client_hello_topic";
public final static String LOCAL_KAFKA_ADDRESS = "127.0.0.1:9092";
/**
* 这种类型是经典模式,是每一个线程单独创建一个KafkaConsumer,用于保证线程安全
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
//即使这个实例中开启了多个consumer的线程,最终我们只订阅了两个partition的数据。最多只会有两个consumer在工作
for(int i=0;i<5;i++) {
KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner();
Thread threadOne = new Thread(kafkaConsumerRunner,"SelfThread-"+i);
threadOne.start();
Thread.sleep(1000);
kafkaConsumerRunner.shutdown();
}
}
/**
* consumer的线程
*/
public static class KafkaConsumerRunner implements Runnable{
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public KafkaConsumerRunner() {
Properties props = new Properties();
props.put("bootstrap.servers", LOCAL_KAFKA_ADDRESS);
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
consumer.assign(Arrays.asList(p0,p1));
}
@Override
public void run() {
try{
while(!closed.get()){
//处理消息
ConsumerRecords<String,String> consumerRecord = consumer.poll(Duration.ofMillis(10000));
String threadName = Thread.currentThread().getName();
for(TopicPartition topicPartition:consumerRecord.partitions()){
List<ConsumerRecord<String, String>> partitionRecord = consumerRecord.records(topicPartition);
//处理每一个分区的数据
for(ConsumerRecord<String,String> record:partitionRecord){
System.out.printf("threadName = %s,patition = %d , offset = %d, key = %s, value = %s%n",threadName,
record.partition(),record.offset(), record.key(), record.value());
}
// 返回去告诉kafka新的offset
long lastOffset = partitionRecord.get(partitionRecord.size() - 1).offset();
// 注意加1
consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}catch (WakeupException e){
if(!closed.get()){
throw e;
}
}finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}
第二种,consumer只是去获取数据,将获取到的数据交给其他线程去处理(这一点和netty很像)。
实例代码
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* autor:liman
* createtime:2021/3/13
* comment: 类似于netty的那种多线程方式,对partition的管控力度没有那么强,但是这种方式比较适用于流处理
*
*/
@Slf4j
public class ConsumerRecordThreadSample {
public final static String TOPIC_NAME = "client_hello_topic";
public final static String LOCAL_KAFKA_ADDRESS = "127.0.0.1:9092";
public static void main(String[] args) throws InterruptedException {
String brokerList = LOCAL_KAFKA_ADDRESS;
String groupId = "test";
int workNum = 5;
ConsumerExecutor consumerExecutor = new ConsumerExecutor(brokerList,groupId,TOPIC_NAME);
consumerExecutor.execute(workNum);
Thread.sleep(1000000);
consumerExecutor.shutdown();
}
public static class ConsumerExecutor{
private final KafkaConsumer<String,String> consumer;
private ExecutorService executors;
//consumer的一些初始化操作
public ConsumerExecutor(String brokerList,String groupId,String topic){
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum){
//这里初始化线程池
executors = new ThreadPoolExecutor(workerNum,workerNum,0L, TimeUnit.MICROSECONDS
,new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
while(true){
//这里只是利用consumer去拉取数据,将数据交给线程池中的线程去处理
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(20000));
for (final ConsumerRecord record : records) {
executors.submit(new ConsumerRecordWorker(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
/**
真正处理数据的线程
*/
public static class ConsumerRecordWorker implements Runnable{
private ConsumerRecord<String, String> record;
public ConsumerRecordWorker(ConsumerRecord record) {
this.record = record;
}
@Override
public void run() {
// 假如说数据入库操作
String threadName="Thread - "+ Thread.currentThread().getName();
log.error("thread-name:{}",threadName);
System.out.println();
System.err.printf("ThreadName = %s ,patition = %d , offset = %d, key = %s, value = %s%n",threadName,
record.partition(), record.offset(), record.key(), record.value());
}
}
}
关于consumer限流的方式
关于consumer,其实还有一个限流的场景需要说明一下,前面已经说过,consumer可以针对特定的partition进行消费。可以引入令牌的方式进行限流,consumer如果想要正常消费数据,必须先从令牌桶中获取令牌,如果成功获取令牌,则开始消费数据,没有成功则限制其消费,达到限流的目的。
简单实例
TopicPartition topicPartitionZero = new TopicPartition(TOPIC_NAME,0);
TopicPartition topicPartitionOne = new TopicPartition(TOPIC_NAME,1);
//消费订阅一个topic中的一个或者多个partition,这里只定义了一个partition
consumer.assign(Arrays.asList(topicPartitionZero,topicPartitionOne));
long totalNum =40l;//阈值,超过这个进行限流
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
// 针对每个partition单独处理
for(TopicPartition topicPartition:records.partitions()){
long num = 0l;
List<ConsumerRecord<String, String>> partitionRecord = records.records(topicPartition);
for(ConsumerRecord<String,String> record:partitionRecord){
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
/**
* 实际开发中的操作
* 1、接收到record信息以后,去令牌桶中拿取令牌
* 2、如果获取到令牌,则继续业务处理
* 3、如果获取不到令牌, 则pause等待令牌
* 4、当令牌桶中的令牌足够, 则将consumer置为resume状态
*/
num++;
if(record.partition() == 0){
//如果partition0中的消费个数超过阈值,则停止partition0,只保留partition1的数据消费
if(num >= totalNum){
log.info("暂停partition 0");
consumer.pause(Arrays.asList(topicPartitionZero));
}
}
if(record.partition() == 1){
//如果partition1中的消费个数超过阈值,则唤醒之前停止的partition0
if(num >= totalNum){
log.info("唤醒partition 0");
consumer.resume(Arrays.asList(topicPartitionZero));
}
}
}
long lastOffset = partitionRecord.get(partitionRecord.size()-1).offset();
//单个partition中的offset,并且提交
Map<TopicPartition,OffsetAndMetadata> offsetAndMetadata = new HashMap<>();
//这里需要注意下,服务器记录的offset是我们下次消费的起点,因此要加一
offsetAndMetadata.put(topicPartition,new OffsetAndMetadata(lastOffset+1));
//提交offset
consumer.commitSync(offsetAndMetadata);
num--;
System.out.println("=============partition - "+ topicPartition +" end================");
}
}
总结
简单的总结了一下consumer的操作。重点需要关注consumer的多线程模式以及限流的操作,这个应该是面试中的高频问点。