版权声明: https://blog.csdn.net/weixin_37254888/article/details/79807070
一、网站日志流量项目
-》项目开发阶段:
-》可行性分析
-》需求分析
-》详细设计
-》代码实现
-》测试
-》上线
-》大数据业务流程
-》数据采集:sqoop、Flume、kafka、Logstash
-》数据源:日志文件、RDBMS、实时的数据流
-》目标地:hdfs、nosql、Hive
-》数据存储:入库的过程
-》数据计算:hive、MapReduce、spark
-》数据清洗
-》数据建模
-》数据分析
-》数据展示:java web、可视化分析工具
二、Flume的使用
-》特点
collecting, aggregating, and moving
收集 聚集 移动
source、 channel、 sink
-》flume原理
-》source:负责读数据源,将数据源的数据变成数据流,封装成event
event是数据采集的最小单元,
head:放一些配置信息key=value格式
body:真正的数据
-》channel:负责临时存储数据
-》sink:负责将数据发送到目标地
三、Flume的部署
-》下载解压
tar -zxvf flume-ng-1.6.0-cdh5.7.6.tar.gz -C /opt/cdh-5.7.6/
-》修改配置文件
mv conf/flume-env.sh.template conf/flume-env.sh
export JAVA_HOME=/opt/modules/jdk1.8.0_91
-》如何找到hdfs
-》配置全局环境变量:HADOOP_HOME
-》在配置文件中申明HADOOP_HOME
-》在agent中写明hdfs的绝对地址
hdfs://hostname:8020/flume
-》如果hdfs配置了HA
-》将core-site和hdfs-site拷贝到flume的配置文件目录
-》将core-site和hdfs-site拷贝到flume的配置文件目录
cp ../hadoop-2.6.0-cdh5.7.6/etc/hadoop/core-site.xml ../hadoop-2.6.0-cdh5.7.6/etc/hadoop/hdfs-site.xml conf/
-》将flume写数据进入hdfs需要的jar包导入flume的lib目录
commons-configuration-1.6.jar
hadoop-auth-2.6.0-cdh5.7.6.jar
hadoop-common-2.6.0-cdh5.7.6.jar
hadoop-hdfs-2.6.0-cdh5.7.6.jar
htrace-core4-4.0.1-incubating.jar
四、使用flume
-》flume的运行方式:
flume-og:老版本
flume-ng:新版本
Usage: bin/flume-ng <command> [options]...
bin/flume-ng agent --conf $flume_conf_dir --name agent_name --conf-file agent_file_path -Dflume.root.logger=INFO,console
-》案例一:读hive的日志,将hive的日志采集到logger中
agent:
source:读hive日志,将日志数据发送给channel
channel:存储source发过来的数据,内存
sink:从channel中取数据,将数据发送给日志
-》运行
bin/flume-ng agent --conf conf/ --name a1 --conf-file case/hive-mem-log.properties -Dflume.root.logger=INFO,console
-》案例二:使用file channel
bin/flume-ng agent --conf conf/ --name a1 --conf-file case/hive-file-log.properties -Dflume.root.logger=INFO,console
mem:读写比较快,数据容易丢失
file:相对来说速度慢一些,但是数据安全性较高
-》案例三:将数据采集到hdfs
bin/flume-ng agent --conf conf/ --name a1 --conf-file case/hive-mem-hdfs.properties -Dflume.root.logger=INFO,console
-》配置文件大小
-》按照时间生成文件
hdfs.rollInterval=0
-》按照文件大小生成文件:默认1024字节
hdfs.rollSize=10240 (工作中一般给125M左右对应的字节数)
-》按照event个数生成文件
hdfs.rollCount=0
bin/flume-ng agent --conf conf/ --name a1 --conf-file case/hive-mem-size.properties -Dflume.root.logger=INFO,console
-》按照时间生成对应的目录
bin/flume-ng agent --conf conf/ --name a1 --conf-file case/hive-mem-part.properties -Dflume.root.logger=INFO,console
-》设置文件名头部:hdfs.filePrefix
-》设置线程超时时间:hdfs.idleTimeout
-》案例四:
logs/2018-04-02.log
2018-04-03.log
2018-04-04.log
-》spooling dir source:用于动态的读取目录中的文件
运行:
bin/flume-ng agent --conf conf/ --name a1 --conf-file case/dir-mem-size.properties -Dflume.root.logger=INFO,console
logs/2018-04-02.log.tmp -> 2018-04-02.log
2018-04-03.log.tmp
2018-04-04.log
-》案例五:
logs/2018-04-02.log
2018-04-03.log
2018-04-04.log
-》使用taildir source
-》如果使用老版本的flume,没有该功能,需要自己编译taildir的源码
-》找到flume-1.7中taildir source的源码
-》导入eclipse
-》缺少一个类文件
C:\Users\江城子\Desktop\Git\flume\flume-ng-core\src\main\java\org\apache\flume\source\PollableSourceConstants.java
-》删除两个overwrite注解
-》maven编译,打成jar包
-》将jar包放入lib目录
-》flume常用的组件类型
-》source:avro source/sink ,kafka source ,exec source ,spooldir source ,taildir source
-》channel:file、mem、kafka
-》sink:kafka、hdfs、hive
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'a1'
# define agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/datas/flume/spooling
a1.sources.s1.ignorePattern = ([^ ]*\.tmp$)
# define channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# define sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/spoolingdir
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 10240
a1.sinks.k1.hdfs.rollCount = 0
# bond
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1