1. 环境准备
创建 maven 工程,添加依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
2. 生产者 Java API
package com.mock.data.stream.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class ProducerSender {
KafkaProducer<String, String> producer;
public ProducerSender(String topic) {
producer = new KafkaProducer<String, String>(producerConfig());
}
public Map<String, Object> producerConfig(){
Map<String, Object> props = new HashMap<>();
// kafka 服务器 地址
props.put("bootstrap.servers", "hadoop102:9092,hadoop102:9094,hadoop103:9092");
// 生产者ID
props.put("client.id","producer_id");
// ack:-1 需要 ISR 中所有的 Replica(副本)给予接收确认,速度最慢,安全性最高,但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 时并不能一定避免数据丢失。
props.put("acks","-1");
// 消息重试次数
props.put("retries", "3");
// 单个请求的最大值
// 设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为1MB。超过 1MB 将会报错。
props.put("max.request.size",1048576);// 1M
// 每一批次的消息大小
props.put("batch.size", 16384);// 16kb
// 消息延迟发送时间,默认0,只有满足一批次才会发送。这里设置为1s,一个批次的消息不满足 16kb,也会发送
props.put("linger.ms", 1);
// 缓冲池大小,默认32M
props.put("buffer.memory", 33554432L);
// key的序列化方式
props.put("key.serializer", StringSerializer.class);
// value的序列化方式
props.put("value.serializer", StringSerializer.class);
return props;
}
public void send(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key,value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata == null) {
e.printStackTrace();
}else {
long offset = recordMetadata.offset();
System.out.println("sender success:"+offset);
}
}
});
}
}
3. 自定义分区
实现 Partitioner
接口:
package com.mock.data.stream.kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// todo 控制分区
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
在生成者中使用自定义分区:
props.put("partitioner.class", "com.mock.data.stream.kafka.CustomPartitioner");
4. Kafka 消费者 Java API
package com.mock.data.stream.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定义kakfa 服务的地址,不需要将所有broker指定上
props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
// 制定consumer group
props.put("group.id", "consumer_id");
// 是否自动确认offset
props.put("enable.auto.commit", "false");
// 自动确认offset的时间间隔
// props.put("auto.commit.interval.ms", "1000");
// 使用消费者组管理时,调用poll()之间的最大延迟。这对消费者在获取更多记录之前可以空闲的时间量设置了一个上限。
// 如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给其他成员
// 默认值 300s
props.put("max.poll.interval.ms",300000L);
// offset 重置,默认:latest,从最新的开始消费,这里设置为 earliest,重最早的提交的 offset 开始消费
props.put("auto.offset.reset","earliest");
// 拉取的消息的最大条数,默认 500
props.put("max.poll.records",500);
// key的序列化类
props.put("key.deserializer", StringDeserializer.class);
// value的序列化类
props.put("value.deserializer", StringDeserializer.class);
// 定义consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅的topic, 可同时订阅多个
consumer.subscribe(Arrays.asList("topic1", "topic3","topic3"));
while (true) {
Duration d = Duration.ofSeconds(10);
ConsumerRecords<String, String> records = consumer.poll(d);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 同步提交 offset
consumer.commitSync();
// 异步提交 offset
// consumer.commitAsync();
}
}
}
指定分区和 offset 消费:
// 定义消费的分区
ArrayList<TopicPartition> partitions = new ArrayList<>();
TopicPartition partition = new TopicPartition("topic1", 0);
partitions.add(partition);
consumer.assign(partitions);
// 指定消费的分区和 offset
consumer.seek(partition,102);