1.启动zookeeper
2.启动kafka集群的broker服务
kafka-server-start.sh /home/hadoop/apps/kafka_2.11-1.1.0/config/server.properties
3.打开新的shell窗口创建主题
kafka-topics.sh --zookeeper ali:2181 --topic calllog --create --replication-factor 1 --partitions 3
4.验证主题正确性
如果之前已经执行过3 4 课不用执行直接启动flume
kafka-topics.sh --zookeeper ali:2181 --list
5.创建任务的配置文件并且启动flume
配置flume的job的conf -->>> flume-exec-kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /home/hadoop/calllog/calllog.csv
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = ali:9092,tencent:9092,huawei:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
一定要使用自己刚才自定义的配置信息
flume-ng agent \
-c conf \
-n a1 \
-f /home/hadoop/calllog/flume-exec-kafka.conf \
-Dflume.root.logger=INFO,console
6.打包数据生产者的jar包到linux的calllog目录下
7.打开新的shell窗口启动控制台消费者,或者在idea中启动程序准备进行消费
kafka-console-consumer.sh --zookeeper ali:2181 --topic calllog --from-beginning
可以不用执行shell窗口验证,这一步骤只是用于验证
8.进入calllog文件夹启动生产数据的任务
java -cp ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /home/hadoop/calllog/calllog.csv
可以写一个脚本
#!/bin/bash
java -cp /home/hadoop/calllog/ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /home/hadoop/calllog/calllog.csv
等待一会之后开始消费数据
9.进入hbase查询数据是否写入
如果查询到自己插入的数据和协处理器插入的数据说明运行成功