1. Kafka 安装与配置
1.1 下载Kafka
在官网下载kafka,kafka_2.11-2.1.1.tgz。
1.2 解压
解压至指定目录。tar -zxvf -C 你的目录
1.3 启动kafka
-
切换到kafka的安装路径,启动zookeeper
-
启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka
bin/kafka-server-start.sh config/server.properties &
1.4 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flumeTopic
2. Flume 安装与配置
2.1 下载Flume
在Flume官网下载apache-flume-1.9.0-bin.tar.gz。
2.2 解压
tar -zxvf -C 你的目录
2.3 配置Flume
# 定义a1 agent
a1.sources = src1
a1.channels = ch1
a1.sinks = k1
# 定义source
a1.sources.src1.type = exec
# 监听日志文件
a1.sources.src1.command = tail -F /home/hadoop001/code/iqiyiLog/logdata/logs
a1.sources.src1.channels=ch1
#定义sinks
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#指定flume流向的topic
a1.sinks.k1.topic = flumeTopic
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.channel = ch1
# 定义channnels
a1.channels.ch1.type = memory
a1.channels.ch1.capacity = 1000
2.4 启动Flume
切换到Flume的路径,执行
./bin/flume-ng agent --conf conf --conf-file conf/a1.conf --name a1 -Dflume.root.logger=INFO,console
3. Flume 和 Kafka连通测试
3.1 数据流程
3.2 使用kafka进行消费
切换到kafka的路径,执行消费命令。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flumeTopic --from-beginning
示例数据显示如下:
124.29.143.100 2019-04-02 08:02:01 "GET www/2 HTTP/1.0" _ 200
100.187.29.143 2019-04-02 08:02:01 "GET toukouxu/821 HTTP/1.0" https://www.sogou.com/web?query=猎场 404
156.143.124.167 2019-04-02 08:02:01 "GET www/4 HTTP/1.0" _ 200
100.167.187.29 2019-04-02 08:02:01 "GET www/1 HTTP/1.0" _ 302
167.29.10.30 2019-04-02 08:02:01 "GET www/6 HTTP/1.0" _ 302
30.124.187.156 2019-04-02 08:02:01 "GET toukouxu/821 HTTP/1.0" https://search.yahoo.com/search?p=我的体育老师 302
30.167.10.156 2019-04-02 08:02:01 "GET www/1 HTTP/1.0" _ 200
143.132.29.124 2019-04-02 08:02:01 "GET www/2 HTTP/1.0" _ 302
132.156.29.187 2019-04-02 08:02:01 "GET www/1 HTTP/1.0" _ 302
29.100.124.132 2019-04-02 08:02:01 "GET www/3 HTTP/1.0" _ 200