1. 下载
Apache kafka 官方: http://kafka.apache.org/downloads.html
Scala 2.11 - kafka_2.11-0.10.2.0.tgz (asc, md5)
2.Kafka集群安装
第一步:安装JDK &配置JAVA_HOME
第二步:安装Zookeeper
参照Zookeeper官网搭建一个ZK集群, 并启动ZK集群。
启动kafka之前一定要先启动zookeeper集群
第三步:解压Kafka安装包到(apps目录)
第四步:修改配置文件config/server.properties
[root@wangzhihua2 ~]# cd ./apps/kafka_2.11-0.10.2.1/config/server.properties
需要修改的地方有:
Ø broker.id从0开始 每一台的broker.id都不一样,需要手动的去更改
Ø log.dirs=/root/kafkaData/ 设置Kafka的消息数据存储路径
Ø 配置zookeeper的连接地址zookeeper.connect=hdp-01:2181,hdp-02:2181,hdp-03:2181 //zookeeperServers列表,各节点以逗号分开
Ø 更改listeners listeners=PLAINTEXT://wangzhihua1:9092
注意 :这个ip是kafka所在的机器上的ip并不都是wangzhihua1,每台机器上配置自己的主机名
Ø 修改分区数 默认是1 改为3 num.partitions=3
Ø 安装包分发
for i in 2 3 ; do scp -r kafka_2.11-0.10.2.1/ wangzhihua$i:$PWD ;done
切记上传完成后 一次修改各个主机上的 broker.id
再其它几台节点上,分别修改该配置文件。
broker.id从0开始 每一台的broker.id都不一样,需要手动的去更改 broker.id=1 //为依次增长的:0、1、2、3、4,集群中唯一id log.dirs=/root/kafkaData/ // Kafka的消息数据存储路径 配置zookeeper的连接地址 zookeeper.connect=hdp-01:2181,hdp-02:2181,hdp-03:2181 //zookeeperServers列表,各节点以逗号分开 vi zookeeper.properties dataDir=/root/zkdata #指向你安装的zk的数据存储目录 delete.topic.enable=true listeners=PLAINTEXT://hdp-01:9092 修改kafka数据的存储位置 log.dirs=/root/kafkaData/ num.partitions=3 # 安装包分发 for i in 2 3 ; do scp -r kafka_2.11-0.10.2.1/ hdp-0$i:$PWD ;done 切记上传完成后 一次修改各个主机上的 broker.id 再其它几台节点上,分别修改该配置文件。 |
3.Kafka的启停(前后台)
启动kafka集群需要手动的一台一台的去启动和zookeeper一样
启动kafka之前,必须要保证zookeeper是启动的。
在每台节点上启动:
Bin ./kafka-server-start.sh 前台启动
Bin ./kafka-server-start.sh -daemon ../config/server.properties 后台启动
3.1启动脚本
brokers="wangzhihua1 wangzhihua2 wangzhihua3" kafka_home="/root/apps/kafka_2.11-0.10.2.1" for i in $brokers do echo "Starting kafka on ${i} ... " ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &" if [[ $? -ne 0 ]]; then echo "Start kafka on ${i} is OK !" fi done echo kafka kafka are started ! |
|
1.3.2. 停止脚本
brokers="wangzhihua1 wangzhihua2 wangzhihua3" kafka_home="/root/apps/kafka_2.11-0.10.2.1"
for i in $brokers do echo "Stoping kafka on ${i} ... " ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-stop.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &" if [[ $? -ne 0 ]]; then echo "Start kafka on ${i} is OK !" fi done echo kafka kafka are stop ! |
停止kafka命令: # ./kafka-server-stop.sh
4.测试集群
1-进入kafka根目录,创建Topic名称为: test的主题
或者,您也可以将代理配置为在发布不存在的主题时自动创建主题,而不是手动创建主题。
./kafka-topics.sh --create --zookeeper wangzhihua1:2181,wangzhihua2:2181,wangzhihua3:2181 --replication-factor 3 --partitions 1 --topic test |
2-列出已创建的topic列表
bin/kafka-topics.sh --list --zookeeper hdp-01:2181
3-查看Topic的详细信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
第一行是对所有分区的一个描述,然后每个分区对应一行,因为只有一个分区所以下面一行。
leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.
在例子中,节点1是作为leader运行。
4-模拟客户端去发送消息
Kafka带有一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。
运行生产者,然后在控制台中输入几条消息发送到服务器。
一般情况为了测试 消费者个生产者不要在一台机器上
发消息的机器
接收消息的机器
./kafka-console-producer.sh--broker-listwangzhihua1:9092,wangzhihua2:9092,wangzhua3:9092 --topic test |
5-模拟客户端去接受消息
连接broker的地址
./kafka-console-producer.sh--broker-listwangzhihua1:9092,wangzhihua2:9092,wangzhua3:9092 --topic test |
还可以直接连接到zookeeper
kafka-console-consumer.sh --zookeeper hdp-01:2181,hdp-02:2181,hdp-03:2181 --from-beginning --topic hellotopic
6-测试一下容错能力.
Kill -9 pid[leader节点]
另外一个节点被选做了leader,node 1 不再出现在 in-sync 副本列表中:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
虽然最初负责续写消息的leader down掉了,但之前的消息还是可以消费的:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test
删除主题:
bin/kafka-topics.sh --delete --zookeeper hdp-01:2181,hdp-02:2181,hdp-03:2181 --topic test2