1、最简单消费者(自动提交offset)
package tyh.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
//properties,存放消费者的配置信息
Properties properties = new Properties();
//集群信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");
//开启自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自动提交延时
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
//key,Value的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//指定消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "yyx");
//创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅主题
//订阅多个主题
// consumer.subscribe(Arrays.asList("first", "second"));
//订阅一个主题
consumer.subscribe(Collections.singletonList("first"));
//设置一个死循环,直到手动关闭才停止消费者
while (true) {
//获取数据,传入一个时间,若没拉取到数据则阻塞传入的时间(单位为ms)
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
//遍历数据并打印到工作台
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + "-" + consumerRecord.topic() + "-" +
consumerRecord.partition() + "-" + consumerRecord.value());
}
}
}
}
思路:
①new一个KafkaConsumer消费者,发现和生产者一样需要传入properties配置信息
②new一个properties存储配置信息,key同样也用ConsumerConfig获取,value至少需要的信息为:集群信息,key、value的反序列化和消费者组和offset是否自动提交
(注:offset为消费者消费到了第几条数据)
③消费者订阅主题(注:可以订阅不存在的主题,不会报错,但会有提示
④通过consumer.poll获取数据集合,传入时间,若没拉取到数据则阻塞传入的时间
⑤遍历数据集合,并打印自己想要的数据
⑥不能通过程序关闭消费者,要手动ctrl+c或kill-9,所以要将消费者执行一个死循环进行常轮询,若拉取到数据则打印,若没拉取到数据阻塞指定的时间
2、消费者进阶一:同步自动提交offset
注:要将自动提交设置为false
package tyh.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
//手动提交offset之同步提交offset
public class CommitSyncConsumer {
public static void main(String[] args) {
//properties对象存储配置信息
Properties properties = new Properties();
//集群信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");
//关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tyh");
//key、value的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//new Consumer对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//指定消费的主题
consumer.subscribe(Collections.singleton("first"));
while (true) {
//指定未消费到消息时阻塞的时间
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
//增强for循环消费消息
for (ConsumerRecord<String, String> Record : consumerRecords) {
System.out.println(Record.topic() + "-" + Record.partition() + "-" + Record.offset() + "-" + Record.value());
}
//同步手动提交offset
//会阻塞当前线程,直至提交成功
consumer.commitSync();
}
}
}
虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
因为同步提交是会阻塞当前线程的,所以同步提交可能会造成重复消费的问题,即当消费者消费完一批数据后执行offset提交,而此时当前score挂掉了,(因为同步消费效率低,而导致offset没提交成功)就可能导致下次启动consumer时会出现重复消费问题。
3、消费者进阶二:异步提交offset
package tyh.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class CommitAsyncConsumer {
public static void main(String[] args) {
//创建properties对象存储配置信息
Properties properties = new Properties();
//集群信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");
//关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//key、value的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "yyx");
//创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//订阅消息
consumer.subscribe(Arrays.asList("first", "second"));
//消费消息
while (true) {
//设置未消费到消息时阻塞的时间
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
//逐条消费消息
for (ConsumerRecord<String, String> Record: consumerRecords) {
System.out.println(Record.topic() + "-" + Record.partition() + "-" + Record.offset() + "-" + Record.value());
}
//异步手动提交offset
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null){
//如果exception不为空则说明未提交成功,打印未提交成功错误
System.err.println("Commit failed for" + offsets);
}
}
});
}
}
}
因为异步提交offset是先提交后消费,所以有可能出现提交了offset,但还没消费完,score就挂掉了,就导致数据漏消费。