前提:
1、有zk集群
2、关闭防火墙:chkconfig iptables off && setenforce 0
3、创建kafka的存放目录并赋权:
mkdir /export
mkdir /export/servers
chmod 755 -R /export
开始搭建
下载安装包:http://kafka.apache.org/downloads.html
在linux中使用wget命令下载安装包
wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
解压
tar -zxvf /export/software/kafka_2.11-2.0.0.tgz -C /export/servers/
cd /export/servers/
ln -s kafka_2.11-0.8.2.2 kafka
修改配置文件:
vi /export/servers/kafka/config/server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/export/servers/logs/kafka
#topic在当前broker上的分片个数
num.partitions=2
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#滚动生成新的segment文件的最大时间
log.roll.hours=168
#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824
#周期性检查文件大小的时间
log.retention.check.interval.ms=300000
#日志清理是否打开
log.cleaner.enable=true
#broker需要使用zookeeper保存meta数据
zookeeper.connect=192.168.52.1:2181,192.168.52.10:2181,192.168.52.8:2181
#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000
#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000
#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000
#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true
#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01
分发安装包
scp -r /export/servers/kafka_2.11-2.0.0/ kafka02:/export/servers
然后分别在各机器上创建软连
cd /export/servers/
ln -s kafka_2.11-2.0.0 kafka
再次修改配置文件
依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复
配置kafka的环境变量
vi etc/profile
export KAFKA_HOME=/export/servers/kafka
export PATH=${KAFKA_HOME}/bin:$PATH
启动
先启动zookeeper
启动kafka:bin/kafka-server-start.sh config/server.properties &
启动出现错误:Unsupported major.minor version 52.0解决
原因:下载的kafka2.0.0要求jdk版本较高 把jdk1.7换成1.8启动成功(这里注意自己安装的虚拟机是多少位的再下载对应为数的jdk)
查看是否启动成功
# jps
4640 Kafka
4966 Jps
2905 QuorumPeerMain