pom
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
6、在kafka集群中创建18BD-10主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为2
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为指定分区2,把数据发送到指定的分区中
消费者设置:
消费者组id为test
设置自动提交偏移量
设置自动提交偏移量的时间间隔
设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
消费指定分区2中的数据
模拟生产者,请写出代码向18BD-10主题中生产数据test0-test99
模拟消费者,请写出代码把18BD-10主题中的2号分区的数据消费掉 ,打印输出到控制台
Priducer 代码
package Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by 一个蔡狗 on 2020/3/20.
*
* 在kafka集群中创建 18BD-10 主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为2
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为指定分区2,把数据发送到指定的分区中
模拟生产者,请写出代码向18BD-10主题中生产数据 test0 - test99
*/
public class Priducer_06 {
public static void main(String[] args) {
//编写 生产数据程序
//1、配置kafka集群环境(设置)
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "node001:9092");
//消息确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 2);
//批量发送的大小
props.put("batch.size", 16384);
//消息延迟
props.put("linger.ms", 1);
//批量的缓冲区大小
props.put("buffer.memory", 33554432);
// kafka key 和value的序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//2. 实例一个 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
//3. 模拟生产者,请写出代码向18BD-10主题中生产数据 test0 - test99
ProducerRecord producerRecord = new ProducerRecord("18BD-10",2,"","test"+i);
kafkaProducer.send(producerRecord);
}
}
}
Consumer 代码
package Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
/**
* Created by 一个蔡狗 on 2020/3/22.
*
消费者设置:
消费者组id为test
设置自动提交偏移量
设置自动提交偏移量的时间间隔
设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
消费指定分区2中的数据
模拟消费者,请写出代码把18BD-10主题中的2号分区的数据消费掉 ,打印输出到控制台
*/
public class Consumer_06 {
public static void main(String[] args) {
//1、添加哦配置文件
Properties props = new Properties();
//指定kafka服务器
props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
//消费组
props.put("group.id", "test");
//以下两行代码 ---消费者自动提交offset值
props.put("enable.auto.commit", "true");
//自动提交的周期
props.put("auto.commit.interval.ms", "1000");
// 设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
props.put("auto.offset.reset","none");
//kafka key 和value的反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2、实例消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//3、设置读取的topic
//Collection<TopicPartition>
// 模拟消费者,请写出代码把18BD-10主题中的2号分区的数据消费掉 ,打印输出到控制台
TopicPartition topicPartition2 = new TopicPartition("18BD-10",2);
kafkaConsumer.assign(Arrays.asList(topicPartition2));
//循环遍历
while (true){
//4、拉取数据,并输出
//获取到所有的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
Set<TopicPartition> partitions = consumerRecords.partitions();
for (TopicPartition partition : partitions) {
//获取分区的数据,多条数据
List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);
for (ConsumerRecord<String, String> record : records) {
System.out.println("数据是 "+record.value()+" "+"分区是 "+record.partition());
}
}
}
}
}
7、在kafka集群中创建18BD-20主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为1
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为轮询方式发送到每个分区中
手动提交每条数据
消费者设置:
消费者组id为test
设置手动提交偏移量
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
模拟生产者,请写出代码向18BD-20主题中生产数据test0-test99
模拟消费者,请写出代码把18BD-20主题中的2号分区的数据消费掉 ,打印输出到控制台
Priducer 代码
package Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by 一个蔡狗 on 2020/3/22.
*
*
* 在kafka集群中创建18BD-20主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为1
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为轮询方式发送到每个分区中
手动提交每条数据
模拟生产者,请写出代码向18BD-20主题中生产数据test0-test99
*/
public class Priducer_07 {
public static void main(String[] args) {
//编写 生产数据程序
//1、配置kafka集群环境(设置)
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "node001:9092");
//消息确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 1);
//批量发送的大小
props.put("batch.size", 16384);
//消息延迟
props.put("linger.ms", 1);
//批量的缓冲区大小
props.put("buffer.memory", 33554432);
// kafka key 和value的序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//2. 实例一个 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
//3. 模拟生产者,请写出代码向18BD-20主题中生产数据test0-test99, 数据分发策略为轮询方式发送到每个分区中
ProducerRecord producerRecord = new ProducerRecord("18BD-20","test"+i);
kafkaProducer.send(producerRecord);
}
}
}
Consumer 代码
package Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;
/**
* Created by 一个蔡狗 on 2020/3/22.
*
*
* 消费者设置:
消费者组id为test
设置手动提交偏移量
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
模拟消费者,请写出代码把18BD-20主题中的 2号分区 的 数据 消费 掉 ,打印输出到控制台
*/
public class Consumer_07 {
//程序入口
public static void main(String[] args){
//1配置文件
Properties props = new Properties();
//指定kafka服务器
props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
//消费组
props.put("group.id", "test");
//以下两行代码 ---消费者自动提交offset值 为 true 手动为 false
props.put("enable.auto.commit", "false");
//自动提交的周期
//props.put("auto.commit.interval.ms", "1000");
//kafka key 和value的反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//3、设置读取的topic
//Collection<TopicPartition>
// 模拟消费者,请写出代码把18BD-20主题中的 2号分区 的 数据 消费 掉 ,打印输出到控制台
TopicPartition topicPartition2 = new TopicPartition("18BD-20",2);
//循环遍历
while (true){
//4、拉取数据,并输出
//获取到所有的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
Set<TopicPartition> partitions = consumerRecords.partitions();
for (TopicPartition partition : partitions) {
//获取分区的数据,多条数据
List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);
for (ConsumerRecord<String, String> record : records) {
System.out.println("数据是 "+record.value()+" "+"分区是 "+record.partition());
}
}
}
}
}
8、在kafka集群中创建18BD-30主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为1
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为轮询方式发送到每个分区中
消费者设置:
消费者组id为test
设置手动提交偏移量
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
依次消费完每个分区之后手动提交offset
模拟生产者,请写出代码向18BD-30主题中生产数据test0-test99
模拟消费者,请写出代码把18BD-30主题中的2号分区的数据消费掉 ,打印输出到控制台
Priducer 代码
package Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by 一个蔡狗 on 2020/3/22.
*
* 在kafka集群中创建18BD-30主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为1
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为轮询方式发送到每个分区中
模拟生产者,请写出代码向 18BD-30 主题中生产数据 test0 - test99
*/
public class Priducer_08 {
public static void main(String[] args) {
//编写 生产数据程序
//1、配置kafka集群环境(设置)
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "node001:9092");
//消息确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 1);
//批量发送的大小
props.put("batch.size", 16384);
//消息延迟
props.put("linger.ms", 1);
//批量的缓冲区大小
props.put("buffer.memory", 33554432);
// kafka key 和value的序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//2. 实例一个 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
//3. 模拟生产者,请写出代码向18BD-30主题中生产数据test0-test99, 数据分发策略为轮询方式发送到每个分区中
ProducerRecord producerRecord = new ProducerRecord("18BD-30","test"+i);
kafkaProducer.send(producerRecord);
}
}
}
Consumer 代码
package Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;
/**
* Created by 一个蔡狗 on 2020/3/22.
*
* 消费者设置:
消费者组id为test
设置手动提交偏移量
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
依次消费完每个分区之后手动提交offset
模拟消费者,请写出代码把18BD-30主题中的2号分区的数据消费掉 ,打印输出到控制台
*/
public class Consumer_08 {
//程序入口
public static void main(String[] args){
//1配置文件
Properties props = new Properties();
//指定kafka服务器
props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
//消费组
props.put("group.id", "test");
//以下两行代码 ---消费者自动提交offset值 为 true 手动为 false
props.put("enable.auto.commit", "false");
//自动提交的周期
//props.put("auto.commit.interval.ms", "1000");
//kafka key 和value的反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//3、设置读取的topic
//Collection<TopicPartition>
// 模拟消费者,请写出代码把18BD-30主题中的2号分区的数据消费掉 ,打印输出到控制台
TopicPartition topicPartition2 = new TopicPartition("18BD-30",2);
//循环遍历
while (true){
//4、拉取数据,并输出
//获取到所有的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
Set<TopicPartition> partitions = consumerRecords.partitions();
for (TopicPartition partition : partitions) {
//获取分区的数据,多条数据
List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);
for (ConsumerRecord<String, String> record : records) {
System.out.println("数据是 "+record.value()+" "+"分区是"+record.partition());
}
}
}
}
}
9、在kafka集群中创建18BD-40主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为1
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为轮询方式发送到每个分区中
消费者设置:
消费者组id为test
设置自动提交偏移量
设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
消费指定分区0和分区2中的数据
模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99
模拟消费者,请写出代码把18BD-40主题中的0和2号分区的数据消费掉 ,打印输出到控制台
Priducer 代码
package Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by 一个蔡狗 on 2020/3/22.
* 在kafka集群中创建18BD-40主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为1
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为轮询方式发送到每个分区中
模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99
*/
public class Priducer_09 {
public static void main(String[] args) {
//编写 生产数据程序
//1、配置kafka集群环境(设置)
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "node001:9092");
//消息确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 1);
//批量发送的大小
props.put("batch.size", 16384);
//消息延迟
props.put("linger.ms", 1);
//批量的缓冲区大小
props.put("buffer.memory", 33554432);
// kafka key 和value的序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//2. 实例一个 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
//3. 模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99, 数据分发策略为轮询方式发送到每个分区中
ProducerRecord producerRecord = new ProducerRecord("18BD-40","test"+i);
kafkaProducer.send(producerRecord);
}
}
}
Consumer 代码
package Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
/**
* Created by 一个蔡狗 on 2020/3/22.
*
*
消费者设置:
消费者组id为test
设置自动提交偏移量
设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
消费指定分区0和分区2中的数据
模拟消费者,请写出代码把18BD-40主题中的0和2号分区的数据消费掉 ,打印输出到控制台
*/
public class Consumer_09 {
public static void main(String[] args) {
//1、添加哦配置文件
Properties props = new Properties();
//指定kafka服务器
props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
//消费组
props.put("group.id", "test");
//以下两行代码 ---消费者自动提交offset值
props.put("enable.auto.commit", "true");
//自动提交的周期
props.put("auto.commit.interval.ms", "1000");
// 设置 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
props.put("auto.offset.reset","earliest");
//kafka key 和value的反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2、实例消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//3、设置读取的topic
//Collection<TopicPartition>
// 模拟消费者,请写出代码把18BD-10主题中的2号分区的数据消费掉 ,打印输出到控制台
TopicPartition topicPartition0 = new TopicPartition("18BD-40",0);
TopicPartition topicPartition2 = new TopicPartition("18BD-40",2);
kafkaConsumer.assign(Arrays.asList(topicPartition2));
//循环遍历
while (true){
//4、拉取数据,并输出
//获取到所有的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
Set<TopicPartition> partitions = consumerRecords.partitions();
for (TopicPartition partition : partitions) {
//获取分区的数据,多条数据
List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);
for (ConsumerRecord<String, String> record : records) {
System.out.println("数据是"+record.value()+" "+"分区是"+record.partition());
}
}
}
}
}
10、在kafka集群中创建18BD-50主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为1
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为轮询方式发送到每个分区中
消费者设置:
消费者组id为test
设置自动提交偏移量
设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
消费指定分区0和分区2中的数据,并且设置消费0分区的数据offerset值从0开始,消费2分区的数据offerset值从10开始
模拟生产者,请写出代码向18BD-50主题中生产数据test0-test99
模拟消费者,请写出代码把18BD-50主题中的0和2号分区的数据消费掉 ,打印输出到控制台
Priducer 代码
package Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by 一个蔡狗 on 2020/3/22.
*
* 在kafka集群中创建18BD-50主题 副本为2个,分区为3个
生产者设置:
消息确认机制 为all
重试次数 为1
批量处理消息字节数 为16384
设置缓冲区大小 为 33554432
设置每条数据生产延迟1ms
设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
数据分发策略为轮询方式发送到每个分区中
模拟生产者,请写出代码向18BD-50主题中生产数据test0-test99
*/
public class Priducer_10 {
public static void main(String[] args) {
//编写 生产数据程序
//1、配置kafka集群环境(设置)
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "node001:9092");
//消息确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 1);
//批量发送的大小
props.put("batch.size", 16384);
//消息延迟
props.put("linger.ms", 1);
//批量的缓冲区大小
props.put("buffer.memory", 33554432);
// kafka key 和value的序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//2. 实例一个 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
//3. 模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99, 数据分发策略为轮询方式发送到每个分区中
ProducerRecord producerRecord = new ProducerRecord("18BD-50","test"+i);
kafkaProducer.send(producerRecord);
}
}
}
Consumer 代码
package Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
/**
* Created by 一个蔡狗 on 2020/3/22.
*
* 消费者设置:
消费者组id为test
设置自动提交偏移量
设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
消费指定分区0和分区2中的数据,并且设置消费0分区的数据offerset值从0开始,消费2分区的数据offerset值从10开始
模拟消费者,请写出代码把18BD-50主题中的0和2号分区的数据消费掉 ,打印输出到控制台
*/
public class Consumer_10 {
public static void main(String[] args) {
//1配置文件
Properties props = new Properties();
//指定kafka服务器
props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
//消费组
props.put("group.id", "test");
//以下两行代码 ---消费者自动提交offset值
props.put("auto.offset.reset", "true");
//只有当offset不存在的时候,才用latest或者earliest 设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
props.put("auto.offset.reset", "earliest");
//自动提交的周期
props.put("auto.commit.interval.ms", "1000");
//kafka key 和value的反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//3、设置topic
//Collection<TopicPartition> topicPartition0 只消费分区号为 0 的 分区
TopicPartition topicPartition0 = new TopicPartition("18BD-50",0);
// topicPartition2 只消费分区号为 2 的 分区
TopicPartition topicPartition2 = new TopicPartition("18BD-50",2);
kafkaConsumer.assign(Arrays.asList(topicPartition0,topicPartition2));
// 消费指定分区0和分区2中的数据,并且设置消费0分区的数据offerset值从0开始, topicPartition0
kafkaConsumer.seek(topicPartition0,0);
//从头开始消费
// kafkaConsumer.seekToBeginning(Arrays.asList(topicPartition0));
// 消费2分区的数据offerset值从10开始
kafkaConsumer.seek(topicPartition2,10);
while (true){
//4、拉取数据,并输出
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
//通过数据获取到多有的分区 0 2
Set<TopicPartition> partitions = consumerRecords.partitions();
//遍历所有分区,或得到一个分区
for (TopicPartition partition : partitions) {
//获取每个分区的数据,多条数据
List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);
//遍历分区内的所有数据,或得到一条
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value()+" "+record.partition());
}
}
}
}
}