kafka基本操作

#指定zookeeper主机,副本数1,分区数为1,topic名称
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topicname
#查看消息是否创建成功
bin/kafka-topics.sh --list --zookeeper localhost:2181
#发送消息,默认的shell脚本。服务器list,tople名称,发送到topic中。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname
#消费,那个消费者,服务器地址,那个topic,偏移量从前面的偏移量开始读。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2:分布式集群配置:

        更改config目录下server.properties

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
#唯一的机器id
broker.id=0


############################# Log Basics #############################

# A comma separated list of directories under which to store log files log目录
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1


############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.zookeeper集群主机端口
zookeeper.connect=hadoop:2181,hadoop2:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


 3:复制到其他节点,并且修改brokeid

4:启动Kafka

#先启动zookeeper
#在启动Kafka
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
#创建topic,指定三个副本,1个分区
> bin/kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 3 --partitions 1 --topic my-topic
#启动生产者写消息
> bin/kafka-console-producer.sh --broker-list hadoop:9092 --topic my-topic
#启动消费者
> bin/kafka-console-consumer.sh --bootstrap-server hadoop:9092 --from-beginning --topic my-topic
Kafka描述信息
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
#leder  partition0的leder为broker2
#replicas  三个副本分别在broker2 broke0 broker1
#isr  2,0副本处于同步状态

#失去leder后会从存活的节点中选取一个leder

http://kafka.apache.org/documentation/

猜你喜欢

转载自blog.csdn.net/qq_34696236/article/details/82856833