目录
Flume
分布式日志收集系统
支持接收多种数据来源,可以对日志信息进行简单处理,然后写出到数据存储系统中。
重要概念
Event:事件,数据载体,flume将日志数据包装成event进行传输处理,其结构非常简单,就是json串。Eg:{“headers”:info,”body”:info}其中headers中的数据允许自定义的一些内容。Body中的数据就是日志本身。
Agent:代理,flume集群中,每个节点都是一个agent,包含了flume单节点:接受、封装、承载、传输event到目的地的过程。这个过程中包含三部分(source、channel、sink)。
Source:数据源,接收日志信息并将其包装为一个一个的event,传到channel中。
Channel:通道,被动接收source传来的数据,并缓存,等待sink的消费。一般以内存资源作为缓存,所以要设置最大容量。
Sink:水槽,指定该agent数据传输的目的地,消费channel中的数据,发送到目的地,这个目的地可以是数据存储模块或者其他的agent。
上图表示:利用flume将webserver产生的日志,传输到HDFS中保存的过程。
Flume多级流动
Flume的扇入扇出
扇入:多个数据来源一起流进同一个agent中。
扇出:一个agent中的数据流向多个数据出处。
Flume的优势
基于flume的多级流动,扇入扇出,可以实现非常复杂的拓扑结构,适应处理几乎任何场景的数据。
Flume的安装
在有JDK环境的前提下,/home/app目录上传解压即可。
配置
vim flume.properties
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动
在/home/app/apache-flume-1.9.0-bin/conf目录下执行
../bin/flume-ng agent -c ./ -f ./flume.properties -n a1 -Dflume.root.logger=INFO,console
测试
flume启动后占用当前窗口,复制一个新的窗口在任意目录下执行以下
curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://hadoop01:22222
flume.properties详解
Flume配置文件主要为配置agent内容。若值为多个,空格隔开
#a1为自定义的agent名字,与启动命令中的-n属性对应
a1.sources = r1 #定义agent的数据源source,可以有多个。
a1.sinks = k1 #定义agent的数据出处,可以有多个。
a1.channels = c1 #定义agent的通道,一般有多少个sink就有多少个channel
a1.sources.r1.type = avro #指定source的类型为avro
a1.sources.r1.bind = 0.0.0.0 #指定source的来源。一般为本机,被动接收。
a1.sources.r1.port = 22222 #指定端口
a1.sinks.k1.type = avro #指定sink的类型为avro
a1.sinks.k1.hostname = 192.168.65.162 #指定sink的目标节点IP
a1.sinks.k1.port = 22222 #指定目标端口
a1.channels.c1.type = memory #指定channel的类型为 内存
a1.channels.c1.capacity = 1000 #指定存储容量,避免强制抢占内存影响其他进程的正常运行
a1.channels.c1.transactionCapacity = 100 #指定事务容量
a1.sources.r1.channels = c1 #绑定source
a1.sinks.k1.channel = c1 #绑定sink
练习案例
Source练习
avro
修改flume.properties
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.在/home/data下创建文件log.txt并编辑添加数据
2.在flume安装目录下的conf目录下执行命令启动agent
3.模拟发送avro在flume的bin目录下执行:
./flume-ng avro-client -c ../conf -H hadoop01 -p 22222 -F /home/data/log.txt
Spooldir
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/data/spooldir
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 在/home/data目录下创建文件夹spooldir
- 启动
- 在spooldir中vim文件并添加内容并保存。发现flume日志中打印编辑内容。
Channel练习
channel部分一般都以内存为资源基础
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Sink练习
Logger
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Avro
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 22222
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
HDFS
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flume/data
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
集群部署
Hadoop01:JDK、Hadoop、Flume
Hadoop02:JDK、Flume
Hadoop03:JDK、Flume
只需要将hadoop01安装好的Flume文件夹发送到02 03两个节点相应的位置即可。
案例练习
多级
Hadoop01
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 22222
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
hadoop02
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop03
a1.sinks.k1.port = 22222
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
hadoop03
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
扇入
Hadoop01
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop03
a1.sinks.k1.port = 22222
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Hadoop02
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop03
a1.sinks.k1.port = 22222
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Hadoop03
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
扇出
Hadoop01
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = http
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 22222
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03
a1.sinks.k2.port = 22222
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
Hadoop02
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Hadoop03
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
项目、Flume、HDFS整合
log4j和flume整合
配置log4j.properties
log4j.rootLogger = info,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %m%n
# appender flume
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop01
log4j.appender.flume.Port = 22222
log4j.appender.flume.UnsafeMode = true
Flume和HDFS整合
配置flume.properties
#命名Agent a1的组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#描述/配置Source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
#描述Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/jt/data
a1.sinks.k1.hdfs.fileType=DataStream
#描述内存Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#为Channle绑定Source和Sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
进阶:自定义Sink
pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.tedu</groupId>
<artifactId>flume</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-hdfs-sink</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-hive-sink</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
</dependencies>
</project>
代码
package cn.tedu.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
/**
* 自定义flume-sink
*/
public class MySink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(MySink.class);
public Status process() throws EventDeliveryException {
Status stat;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
transaction.begin();
while(true){
event = channel.take();
if(event != null){
break;
}
}
try {
LOG.info(new String(event.getBody()));
LOG.info("输出在这里");
transaction.commit();
stat = Status.READY;
} catch (Exception e) {
e.printStackTrace();
transaction.rollback();
stat = Status.BACKOFF;
} finally {
transaction.close();
}
LOG.info(stat.toString());
return stat;
}
public void configure(Context context) {
}
}
打成jar包上传至flume安装目录中的lib目录下
修改配置文件
mysink.properties
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 22222
a1.sinks.k1.type = cn.tedu.flume.MySink
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
../bin/flume-ng agent -c ./ -f ./mysink.properties -n a1 -Dflume.root.logger=INFO,console