软件版本号 jdk1.8、apache-flume-1.6.0-bin、kafka_2.8.0-0.8.0、zookeeper-3.4.5
集群环境安装请先测试;
参考以下作者信息,特此感谢;
http://blog.csdn.net/wzy0623/article/details/73650053
https://www.cnblogs.com/sunyaxue/p/6645415.html
需要向/usr/local/bigdata/apache-flume-1.6.0-bin/lib 放入三个jar包
flume-ng-sql-source-1.3.7.jar -->flume的mysql source 下载地址: https://github.com/keedio/flume-ng-sql-source/
mysql-connector-java-5.1.35.jar -->mysql 驱动包 应该都有吧。
flume-mysql-sink-1.0-SNAPSHOT.jar -->自定义的mysql sink参考第二个网址;打的jar包
三个jar包的下载地址: http://download.csdn.net/download/kongfanyu/10271848
重点是flume的配置文件信息如下:
agent.channels = ch1 ch2 ch3
agent.sinks = HDFS mysqlSink kfk
agent.sources = sql-source
agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
agent.sources.sql-source.channels = ch1 ch2 ch3
agent.sources.sql-source.connection.url = jdbc:mysql://192.168.2.164:3306/test
agent.sources.sql-source.user = root
agent.sources.sql-source.password = root
agent.sources.sql-source.table = wlslog
agent.sources.sql-source.columns.to.select = *
agent.sources.sql-source.incremental.column.name = id
agent.sources.sql-source.incremental.value = 0
agent.sources.sql-source.run.query.delay=5000
agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status
#ch1
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 1000
agent.channels.ch1.transactionCapacity = 100
#ch2
agent.channels.ch2.type = memory
agent.channels.ch2.capacity = 1000
agent.channels.ch2.transactionCapacity = 100
#ch3
agent.channels.ch3.type = memory
agent.channels.ch3.capacity = 1000
agent.channels.ch3.transactionCapacity = 100
agent.sinks.HDFS.channel = ch1
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.path = hdfs://dbserver:9000/flume/mysql3
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.rollSize = 268435456
agent.sinks.HDFS.hdfs.rollInterval = 0
agent.sinks.HDFS.hdfs.rollCount = 0
# 自定义的 mysqlSink
agent.sinks.mysqlSink.type = org.flume.mysql.sink.MysqlSink
agent.sinks.mysqlSink.url=jdbc:mysql://192.168.2.171:3306/test
agent.sinks.mysqlSink.user=root
agent.sinks.mysqlSink.password=123456
agent.sinks.mysqlSink.tableName=wlslog
agent.sinks.mysqlSink.column_name=id,time_stamp,category,type,servername,code,msg
agent.sinks.mysqlSink.channel = ch2
# 配置 kafka sink
agent.sinks.kfk.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kfk.brokerList=192.168.2.171:9092,192.168.2.172:9092,192.168.2.173:9092,192.168.2.174:9092,192.168.2.175:9092
agent.sinks.kfk.topic=mytopic
agent.sinks.kfk.requiredAcks = 1
agent.sinks.kfk.batchSize = 2
agent.sinks.kfk.channel = ch3
有问题留言.