1、partition如何分布到不同的broker上
下面给出kafka在实现分区分布到各个broker上的算法实现,可以通过创建topic,设置副本数验证
public void kafkaProducter(){
//partitions创建的分区,比如我创建了一个topic,
// 设置的副本是1时,partitions = partition * 1;
// 设置的副本为2时,partitions = partition * 2;
List<String> partitions = new LinkedList<>();
partitions.add("p0");
partitions.add("p1");
partitions.add("p2");
partitions.add("p3");
partitions.add("p0");//副本
partitions.add("p1");//副本
partitions.add("p2");//副本
partitions.add("p3");//副本
//borkers是kafka集群
List<String> brokers = new LinkedList<>();
brokers.add("b1");
brokers.add("b2");
brokers.add("b3");
for(int i = 0;i<partitions.size();i++){
System.out.println("分区"+partitions.get(i)+"在:"+brokers.get(i%brokers.size())+"的broker上");
}
}
测试结果:
2、consumerGroup组员和partition之间如何做负载均衡
通过设置消费者的数量,验证下面的demo
public void kafkaConsumer(){
List<String> partitions = new LinkedList<>();
partitions.add("p0");
partitions.add("p1");
partitions.add("p2");
partitions.add("p3");
List<String> consumers = new LinkedList<>();
consumers.add("c1");
consumers.add("c2");
consumers.add("c3");
consumers.add("c4");
consumers.add("c5");
consumers.add("c6");
//向上取整,计算每个消费者对应几个分区
int m = (int) Math.ceil(partitions.size()*1.0/consumers.size());
System.out.println("m:"+m);
for (int i = 0;i<consumers.size();i++){
System.out.println("消费者"+consumers.get(i)+",对应的分区:");
for(int j=0;j<m;j++){
//如果下标大于等于partitions的元素个数,break
if(i*m+j >= partitions.size()){
break;
}
System.out.println(partitions.get(i*m+j));
}
}
}
测试结果:
3、kafka常用命令
a、创建topic
bin/kafka-topics.sh --zookeeper ip:2181 --create --topic topic名字 --partitions 分区数量 --replication-factor 副本数量
b、删除topic,执行命令之后不是马上删除
bin/kafka-topics.sh --zookeeper ip:2181 --delete --topic topic名字
c、查看各个分区数据量
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ip:9092 --time -1 --topic topic名字
d、查看topic列表
bin/kafka-topics.sh --list --zookeeper ip:3181
e、查看topic表述
bin/kafka-topics.sh --zookeeper ip:3181 --topic topic名字 --describe
f、向对应topic发送消息
bin/kafka-console-producer.sh --broker-list ip:9092 --topic topic名字
g、消费消息
bin/kafka-console-consumer.sh --zookeeper ip:2181 --topic topic名字 --from-beginning
欢迎大家来吐槽,内容有问题,会及时修改,谢谢!!!