一、创建topic
> bin/kafka-topics.sh --bootstrap-server <broker_host:port> \
--create --topic my_topic_name \
--partitions 2 \
--replication-factor 3
- bootstrap-server配置项,用来配置kafka的borker实例服务地址,如:
zimug1:9092,zimug2:9092,zimug3:9092
。官方推荐如果kafka版本大于等于2.2使用--bootstrap-server
替代--zookeeper
(2.2以上也兼容--zookeeper
)。如果是2.2之前的版本,可以使用下面的命令行
> bin/kafka-topics.sh --zookeeper zookeeper_host:port \
--create --topic my_topic_name \
--partitions 2 \
--replication-factor 3
- create 表示创建topic
- topic用来指定tipic的名称
- partitions用来指定topic的分区数,topic的分区数量决定了消费者的并发消费能力,理论上分区数越多并发消费能力越高。但是实际上会受到硬件资源、网络环境的限制。分区数量可以先设定一个预估的中位数,分区数量后续可以增加,但不能减少。
- replication-factor用来指定每个分区的副本数,不能设置大于kafka broker服务实例的数量。副本之间需要数据同步,同步数据会占用网络资源,所以不宜过大,2或者3即可。
二、查看所有的主题
通过该命令会把所有的主题全部都列出来
> bin/kafka-topics.sh --list --bootstrap-server zimug1:9092,zimug2:9092,zimug3:9092
三、查看某个主题的所有分区
下面的命令查看kafka
> bin/kafka-topics.sh --describe \
--bootstrap-server zimug1:9092,zimug2:9092,zimug3:9092 \
--topic my_topic_name
通过该命令查看主题topicName的分区
Topic:my_topic_name PartitionCount:3 ReplicationFactor:2 Configs:
Topic: my_topic_name Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: my_topic_name Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: my_topic_name Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
- PartitionCount:partition 分区个数。
- ReplicationFactor:每个分区的副本个数。
- Partition:partition 编号,从 0 开始递增。
- Leader:当前 partition主分区(负责直接读写)的breaker.id。
- Replicas: 当前分区的副本数据所在的 breaker.id。多副本数据备份。
- Isr:处于数据同步状态的副本所在的 breaker.id。数量应该等于ReplicationFactor,如果不相等表示副本之间的数据同步存在较大的延迟,或者某些kafka broker服务实例出现问题。
四、删除 Topic
bin/kafka-topics.sh --delete \
--bootstrap-server zimug1:9092,zimug2:9092,zimug3:9092 \
--topic my_topic_name
- 若 delete.topic.enable=true,直接彻底删除该 Topic。
- 若 delete.topic.enable=false
- 如果当前 Topic 没有使用过即没有传输过信息:可以彻底删除。
- 如果当前 Topic 有使用过即有过传输过信息:并没有真正删除 Topic 只是把这个 Topic 标记为删除(marked for deletion),重启 Kafka Server 后删除。
注:delete.topic.enable=true 配置信息位于配置文件 config/server.properties 中(较新的版本中无显式配置,默认为 true)。
五、修改topic分区数
增加分区数
> bin/kafka-topics.sh --alter \
--bootstrap-server zimug1:9092,zimug2:9092,zimug3:9092 \
--topic my_topic_name
--partitions 4
分区数量只能增加,不能减少。 若是减少 partition 个数,则会报如下错误信息:
org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic my_topic_name currently has 3 partitions, 2 would not be an increase.
该命令不能用来修改副本个数。(需要使用 kafka-reassign-partitions.sh
脚本增加副本数,在《集群可用性验证》那一章节有非常具体的使用案例)
六、修改topic配置参数
该命令除了增加分区数量,还可以修改分区配置(默认使用服务器全局配置),修改之后的配置参数只对当前主题生效。如下通过--config
修改flush.messages
配置参数。
> bin/kafka-topics.sh --alter \
--bootstrap-server zimug1:9092,zimug2:9092,zimug3:9092 \
--topic my_topic_name \
--config flush.messages=10000
删除配置
> bin/kafka-topics.sh --alter \
--bootstrap-server zimug1:9092,zimug2:9092,zimug3:9092 \
--topic my_topic_name \
--delete-config flush.messages