集群信息
kafka集群有三台broker,id分别为:0,1,2
查看topic的replication-factor信息:
[root@ali-37 bin]# ./kafka-topics.sh --zookeeper localhost:2182 --describe --topic connect-configs
Topic:connect-configs PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: connect-configs Partition: 0 Leader: 2 Replicas: 2 Isr: 2
[root@ali-37 bin]#
可以看到ReplicationFactor
为1,并且Leader、Replicas、Isr都在broker id=2的机器上。
编辑修改信息json文件
[root@ali-37 kafka_2.11-2.0.0]# cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"connect-configs","partition":0,"replicas":[0,1,2]}]
}
[root@ali-37 kafka_2.11-2.0.0]#
由于connect-configs这个topic只有一个partition,文件中指定了其partition 0的replicas在broker id = 0,1,2这三台broker上。如果有多个partition,可以在partitions后面放一个数组,针对每一个partition进行修改副本数。
执行修改命令
[root@ali-37 kafka_2.11-2.0.0]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2182 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"connect-configs","partition":0,"replicas":[2],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
[root@ali-37 kafka_2.11-2.0.0]#
看到控制台输出成功开始partitions的再分配,但是分配结果如何,需要用下面的命令验证一下。
验证修改是否成功
[root@ali-37 kafka_2.11-2.0.0]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2182 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition connect-configs-0 completed successfully
[root@ali-37 kafka_2.11-2.0.0]#
注意这儿验证的json文件和上一步的是同一个文件,只是换成了--verify
选项。可以看到再分配的状态是成功的。
再查看topic信息
[root@ali-37 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2182 --topic connect-configs --describe
Topic:connect-configs PartitionCount:1 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-configs Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1
[root@ali-37 kafka_2.11-2.0.0]#
可以看到ReplicationFactor
已经变成了3,Leader在broker id = 2的机器上,副本和同步副本都在broker id = 0,1,2这三台机器上。整个修改过程完成。
参考Kafka官方网站:http://kafka.apache.org/documentation/#basic_ops_increase_replication_factor