下面所有操作都是基于前面2片文章搭建的zookeeper和kafka集群,如果不清楚如何搭建环境,请移步部署3个节点的Zookeeper伪分布式集群和部署3个节点的Kafka伪分布式集群;
1、查看现有topic
cd /opt/kafka/
bin/kafka-topics.sh --zookeeper localhost:2181 --list
2、创建一个topic
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
my-replicated-topic
3、查看topic详情
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-topics.sh --describe --topic my-replicated-topic --zookeeper localhost:2181
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
4、发布/消费消息
通过kafka内置控制台的生产消费者来演示
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-console-producer.sh --broker-list 172.19.152.171:9092 --topic my-replicated-topic
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-console-consumer.sh --bootstrap-server 172.19.152.171:9092 --from-beginning --topic my-replicated-topic
如果使用offset,必须指定partition才行:
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-console-consumer.sh --bootstrap-server 172.19.152.171:9092 --offset 0 --topic my-replicated-topic --partition 0
5、Kafka Connect
kafka自带功能,从test.txt文件中读取每行数据,写入到名为connect-test的topic中,然后再输出到test.sink.txt文件中;
创建test.txt文件,并写入数据
cd /opt/kafka/
echo -e "hello" > test.txt
echo -e "kafka" > test.txt
echo -e "world" > test.txt
修改默认的配置文件,将其中的IP地址更换为自己服务器的IP地址即可:
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# vim config/connect-standalone.properties
bootstrap.servers=172.19.152.171:9092
启动connector
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
查看生成的test.sink.txt
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# cat test.sink.txt
hello
kafka
world
查看现有topic
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
connect-test
my-replicated-topic
启动消费者获取connect-test中所有消息内容
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-console-consumer.sh --bootstrap-server 172.19.152.171:9092 --from-beginning --topic connect-test
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"kafka"}
{"schema":{"type":"string","optional":false},"payload":"world"}
此时连接器还在工作,向test.txt中追加内容,可以分别在消费者和test.sink.txt中看到追加内容:
#追加内容
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# echo -e "message" >> test.txt
#消费者窗口
{"schema":{"type":"string","optional":false},"payload":"message"}
#再次查看test.sink.txt
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# cat test.sink.txt
hello
kafka
world
message
6、Kafka Stream
6.1 创建输入topic及输出topic
创建输出主题时启用压缩,因为输出流是更改日志流;
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input
Created topic "streams-plaintext-input".
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact
Created topic "streams-wordcount-output".
查看是否创建topic成功
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
connect-test
my-replicated-topic
streams-plaintext-input
streams-wordcount-output
6.2 启动wordcount应用程序
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
[2019-12-16 12:56:05,001] WARN The configuration 'admin.retries' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
如果按照之前的配置,这一步会报错,需要将kafka配置文件中的本地ip地址改成localhost
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# vim config/server1.properties
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# vim config/server2.properties
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# vim config/server3.properties
listeners=PLAINTEXT://localhost:9092
listeners=PLAINTEXT://localhost:9093
listeners=PLAINTEXT://localhost:9094
6.3 开启生产者终端
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
>
6.4 开启消费者终端
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.desializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
6.5 向生产者终端输入原始数据,经过处理后的数据会在消费者终端输出
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
>hello kafka world kafka hello kafka
[root@iZuf66txzmeg2fbo0i8nhkZ kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.desializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
hello 2
kafka 3
world 1