环境准备
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>heima43</artifactId>
<groupId>cn.hanjiaxiaozhi</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>KafkaStudy</artifactId>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
生产者
版本说明
- Kafka 0.10.0.0 及以后的版本,对生产者代码的底层实现进行了重构。
- katka.producer.Producer类被org.apache.kafka.clients.producer.KafkaProducer类替换。
- Kafka 系统支持两种不同的发送方式–
同步模式(Sync)和异步模式(ASync)
同步和异步
- 同步模式如下:一般不使用
- 异步模式如下:大数据场景下,
效率更高
代码实现
package cn.hanjiaxiaozhi.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyKafkaProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
props.setProperty("acks","all");
props.setProperty("retries","2");
props.setProperty("retries.backoff.ms","20");
props.setProperty("buffer.memory","10240000");
props.setProperty("batch.size","10240");
props.setProperty("linger.ms","25");
props.setProperty("max.request.size","102400");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key_" + i, "value_" + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata!=null){
System.out.println("异步发送后获得分区和offset :"+metadata.partition() + "---" + metadata.offset());
}
}
});
}
producer.close();
}
}
分区策略
- 默认的分区策略
- 注意:
- 我们的生产者往Kafka发送/推送消息的时候,应该要将消息均匀的发送到Kafka的各个分区中去,为了提供并发读写
- 那么消息到底被发送到哪一个分区去了呢?由谁决定呢?
- 有一个Partitioner类型的对象决定
- 默认有个DefaultPartitioner
- 连敲2次shift,输入
- 默认的分区策略:
- 1.如果在record记录对象中
指定
了分区,那么就会使用该分区
- 2.如果没有指定分区,但存在key,则根据key的 哈希值 % 分区数 得到分区编号;如果key一样都到一个分区.所以如果使用
key作为分区依据,
那么key应该要不一样
- 3.如果没有分区或key,则以
循环/轮询方式选择分区
for (int i = 10; i < 20; i++){
ProducerRecord<String, String> record = new ProducerRecord<>("test_topic","value_" + i);
RecordMetadata metadata = producer.send(record).get();
System.out.println("同步发送后获得分区编号和offset :"+metadata.partition() + "---" + metadata.offset());
}
自定义分区器
- 自己实现分区的逻辑 implements Partitioner接口
package cn.hanjiaxiaozhi.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String k = (String)key;
String num3 = k.substring(0, 3);
Integer partitionCount = cluster.partitionCountForTopic(topic);
int partitionNum = Math.abs(num3.hashCode()) % partitionCount;
System.out.println("获得的分区号是:"+partitionNum);
return partitionNum;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
package cn.hanjiaxiaozhi.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyKafkaProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
props.setProperty("acks","all");
props.setProperty("retries","2");
props.setProperty("retries.backoff.ms","20");
props.setProperty("buffer.memory","10240000");
props.setProperty("batch.size","10240");
props.setProperty("linger.ms","25");
props.setProperty("max.request.size","102400");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "cn.hanjiaxiaozhi.producer.MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++){
ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "13"+i+"88888888", "value_" + i);
RecordMetadata metadata = producer.send(record).get();
System.out.println("同步发送后获得分区编号和offset :"+metadata.partition() + "---" + metadata.offset());
}
producer.close();
}
}
消费者
版本说明
消费者从哪个offset偏移量开始消费
- kafka-0.10.1.X版本之前: (offest保存在zk中);
- auto.offset.reset 的值为smallest(最小)和largest(最大)
- kafka-0.10.1.X版本之后: (offest保存在kafka的名为__consumer_offsets的topic里面);
auto.offset.reset 的值更改为:earliest,latest,和none
- 1.earliest :当各分区下有已提交的 Offset 时,从提交的 Offset开始消费;无提交的Offset 时,从头开始消费;
- 2.latest : 当各分区下有已提交的 Offset 时,从提交的 Offset 开始消费;无提交的 Offset时,消费新产生的该分区下的数据
- 3.none : Topic 各分区都存在已提交的 Offset 时,从 Offset 后开始消费;只要有一个分区不存在已提交的 Offset,则抛出异常。
如何提交偏移量
- 自动提交–大多数情况下用自动提交即可
- enable.auto.commit设置为true ,offset偏移量会被自动提交到默认主题中(__consumer_offsets) ,
- 注意:
- 老版本提交到ZK,新版本提交到默认主题,开发中都是使用新版本API
- 手动提交
- enable.auto.commit设置为false
- 然后调用API提交
- consumer.commitAsync();//异步提交–用这个
- consumer.commitSync();//同步提交
代码实现-自动提交
package cn.hanjiaxiaozhi.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class MyKafkaConsumerAutoCommit {
public static void main(String[] args) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put("auto.offset.reset","latest ");
props.put("group.id","myconsumer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test_topic"));
while (true){
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("分区:"+record.partition()+" 偏移量:"+record.offset()+" key:"+record.key()+" value:"+record.value());
}
}
}
}
代码实现-手动提交
package cn.hanjiaxiaozhi.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
public class MyKafkaConsumerAutoCommitFalse {
public static void main(String[] args) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit","false");
props.put("auto.offset.reset","latest ");
props.put("group.id","myconsumer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test_topic"));
ArrayList<ConsumerRecord<String, String>> list = new ArrayList();
while (true){
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("分区:"+record.partition()+" 偏移量:"+record.offset()+" key:"+record.key()+" value:"+record.value());
list.add(record);
}
if (list.size() > 5){
System.out.println("list.size>5偏移量已经提交");
consumer.commitAsync();
list.clear();
}
}
}
}