一、Kafka安装
下载安装Kafka极其简单,只需要将下载的Kafka安装包解压到相应的目录即可。进入官方网站 http://kafka .apache.org/downloads 下载当前最新版本Kafka, Kafka安装包并没有区分Windows安装包还是 Linux 安装包,仅在bin录下将Windows环境执行Kafka的相关脚本放在/bin/windows 目录下。在Windows平台,请使用 bin\windows\ 而不是bin/, 并将脚本扩展名改为.bat。
下载 1.0.0版本并解压缩。
> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0
其中kafka_2.11代表scala版本,1.0.0代表kafka版本。
二、快速启动
2.1 启动服务
Kafka 使用 ZooKeeper,如果你还没有ZooKeeper服务器,你需要先启动一个ZooKeeper服务器。您可以通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例。
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
现在启动Kafka服务器:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
上诉命令是在前台启动kafka,也就是启动服务后该cmd窗口就不能干其他事了。kafka后台启动的命令 :
kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
kafka-server-start.sh -daemon .. /config/server . properties
2.2 创建一个topic
让我们创建一个名为“test”的topic,它有一个分区和一个副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
现在我们可以运行list(列表)命令来查看这个topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
或者,您也可将代理配置为:在发布的topic不存在时,自动创建topic,而不是手动创建。
若代理设置了 auto .create. topics enable=true ,该配置默认值为 true 这样当生产者向一个还未创建的主题发送消息时,会自动创建一个拥有$ {num.partitions }个分区和${default. replication.factor}个副本的主题。
2.3 发送一些消息
Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。
运行 producer,然后在控制台输入一些消息以发送到服务器
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
2.4 启动一个 consumer
Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果您将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来。
所有的命令行工具都有其他选项;运行不带任何参数的命令将显示更加详细的使用信息。
三、伪分布式环境部署(单机多代理集群)
KafkaServer启动时需要加载一个用于KafkaServer初始化相关配置的 server.properties文件,当然文件名可以任意,server. properties对应 Kafk:aServer实例 ,Kafka 伪分布式就是在一台机器上启动多个KafkaServer 来达到多代理的效果,因此要保证 broker.id、port、log在同一台机器的多个server. properities中唯一。
首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
现在编辑这些新文件并设置如下属性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。
我们已经建立Zookeeper和一个单节点了,现在我们只需要启动两个新的节点:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
现在,我们创建一个新topic,把备份设置为:3
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
现在我们有一个集群,但是我们怎么才能知道那些代理在做什么呢?运行"describe topics"命令来查看:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。
“leader”是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。
“replicas”是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。
“isr”是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。
请注意,在示例中,节点1是该主题中唯一分区的领导者。
副本会被分布在不同的节点上,副本数不能超过节点数,否则创建主题会失败。例如,3个节点的Kafka集群最多只能有3个副本,若创建主题时指定副本数大于3,则会抛出错误提示。
我们可以在已创建的原始主题上运行相同的命令来查看它的位置:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
这没什么大不了,原来的主题没有副本且在服务器0上。我们创建集群时,这是唯一的服务器。
四、分布式集群环境部署
前面己经讲解了Kafka单机版安装配置,因此对 Kafka 集群环境配置时只需将单机版安装的 Kafka 配置进行相应修改,然后复制到另外两台机器即可。另外分布式环境下ZooKeeper需要单独部署,这里只需修改server. properties文件中Kafka连接ZooKeeper的配置,将Kafka连接到 ZooKeeper 集群,配置格式为“ZooKeeper服务IP:ZooKeeper 客户端端口”,多个ZooKeeper机器之间以逗号分隔开。
zookeeper . connect=server-1:2181 ,server- 2:2181 ,server-3:2181
执行以下两条命令将本机安装的Kafka分别复制到另外两台服务器上:
scp -r kafka 2.11-1.0.0 [email protected]:/usr/local/software/kafka
scp -r kafka_2.11-1.0.0 [email protected] :/usr/local/software/kafka
复制完成后,分别登录另外两台机器,修改 server.properties 文件中的 broker.id 依次为2和3。当然可以设置任一整数 ,只要保证一个集群中 broker.id唯一即可。同时在一台机器的server. roperties文件中设置 host.name 为本机的IP,例如,对主机名为server-1的机器上的Kafka节点,在server. properties文件中增加host.name=172.117.12.61。
配置完毕后,分别启动每台机器的KafkaServer,通过ZooKeeper客户端查看Kafka在Zoo Keeper中的相应元数据信息,其中查 /brokers/ids 点信息如下
[ zk : 172.117.12. 61 (CONNECTED) l] ls /brokers/ ids
[ 1 , 2 , 3]