资料
原文: https://kafka.apache.org/quickstart
1.下载
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.11-1.1.0.tgz
2.启动
Kafka需要Zookeeper.程序包自带了一个可以快速启动单节点zookeeper服务的脚本.
> bin/zookeeper-server-start.sh config/zookeeper.properties
然后启动Kafka
> bin/kafka-server-start.sh config/server.properties
3.创建Topic
创建一个单副本单分区的Topic test.
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看系统中的Topic列表
> bin/kafka-topics.sh --list --zookeeper localhost:2181
另一种,可以通过配置Broker来决定是否在程序发布一个不存在的Topic时自动创建.
4.发送消息
卡夫卡附带了一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送给卡夫卡集群。默认情况下,每一行都将作为单独的消息发送。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
5.启动消费者
卡夫卡还有一个命令行消费者,它会将消息转储到标准输出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
所有的命令行工具都有附加的选项;在没有参数的情况下运行命令将显示使用帮助信息
6.启动一个多节点的集群
首先为每个节点创建一个配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
编辑文件设置属性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id属性是集群中每个节点的唯一和永久名称。我们必须重写端口和日志目录,只是因为我们在同一台机器上运行这些文件,我们希望阻止broker在同一个端口上注册或覆盖彼此的数据。
已经启动了一个节点了,所以只需再启动两个即可:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
现在创建一个新的主题,其复制因子为3:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
好的,但是现在我们有了一个集群我们怎么知道哪个broker在做什么呢?要看到这些,运行“describe topics”命令:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
第一行给出了所有分区的摘要,每条额外的行提供关于一个分区的信息。因为这个主题只有一个分区,所以只有一行。
- “Leader”是负责为给定分区的所有读写操作的节点。每个节点将成为分区中随机选择的部分的领导者。
- “Replicas”是为该分区复制日志的节点列表,不管它们是否是领导者或者它们现在是否还活着。
- “Isr”是一组“同步”副本。这是副本列表的子集,它现在还活着,并被引导到领导者。
看下之前创建的 test 主题:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
这里没有什么奇怪的地方——原来的主题没有副本,在broker 0上,这是我们创建集群时唯一的服务器。
再发布一些消息到新的主题上:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
消费它们:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
现在测试下容错,broker 1 四个Leader , 让我们 kill 掉看看:
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
Leader 已经切换到 broker 2 , Isr 中也不存在 broker 1 了
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但是数据的消费是没有任何影响的:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
7.使用Kafka导入/导出数据
从控制台编写数据并将其写到控制台是一个很方便的起点,但是您可能想要使用来自其他来源的数据,或者从卡夫卡导出数据到其他系统。对于许多系统,您可以使用卡夫卡Connect来导入或导出数据,而不是编写自定义的集成代码。
卡夫卡Connect是卡夫卡的一个工具,它将数据导入和导出到卡夫卡。它是一个可扩展的工具,它运行连接器,它实现了与外部系统交互的自定义逻辑。在这个快速启动中,我们将看到如何使用简单的连接器来运行卡夫卡Connect,这些连接器将数据从一个文件导入到卡夫卡的主题,并将从卡夫卡主题导出的数据导出到一个文件中。
首先创建一些种子数据:
> echo -e "foo\nbar" > test.txt
接下来,我们将启动两个以独立模式运行的连接器,这意味着它们在一个单独的、本地的、专用的进程中运行。我们提供三个配置文件作为参数。第一个是卡夫卡Connect过程的配置,包含一些常见的配置,比如卡夫卡的broker连接,以及数据的序列化格式。其余的配置文件都指定要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
这些示例配置文件,包括与卡夫卡,使用默认的本地集群配置,创建两个连接器:第一种是源连接器从输入文件中读取每一行输入到卡夫卡的主题,第二个是sink连接器从卡夫卡的主题读取消息并输出消息行到文件。
在启动过程中,您将看到许多日志消息,包括一些表明连接器正在被实例化的消息。一旦卡夫卡连接过程开始,源连接器应该开始读取test.txt
中的行 , 并将其发送到主题connect-test
,而sink连接器应该开始读取主题connect-test
的消息,并将它们写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据已经通过整个管道传递:
> more test.sink.txt
foo
bar
请注意,这些数据存储在卡夫卡主题连接测试中,因此我们也可以运行一个控制台使用者来查看主题中的数据(或者使用定制的消费者代码来处理):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
连接器持续处理数据,因此我们可以将数据添加到文件中,并看到它在管道中移动:
> echo Another line>> test.txt
您应该会看到这一行出现在控制台的消费者输出和sink文件中。
8.使用Kafka Streams 实时处理
卡夫卡流是一个用于构建关键任务实时应用程序和微服务的客户端库,其中输入或输出数据存储在卡夫卡集群中。卡夫卡的流结合了写作的简单性,并在客户端部署了标准的Java和Scala应用程序,并利用了卡夫卡的服务器端集群技术,使这些应用程序具有高度可伸缩性、弹性、容错、分布式等功能。这个快速启动的例子将演示如何在这个库中运行一个流处理应用程序。