1. 用Java编写自定义的序列化类,通过jar形式放入flume/lib中
maven项目添加依赖jar包
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-hbase-sink</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>C:/Program Files/Java/jdk1.8.0_144/lib/tools.jar</systemPath>
</dependency>
</dependencies>
自定义类MinputHbase, 实现接口HbaseEventSerializer, 重写其所有方法
package com;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.HbaseEventSerializer;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
//在sink中使用该类,对从channel接收数据导入到HBase
public class MinputHbase implements HbaseEventSerializer {
private Event rowEvent;
private byte[] colFamily;
@Override
public void initialize(Event ev, byte[] ar) {
System.out.println("转换类的initialize");
this.colFamily=ar;
this.rowEvent=ev;
}
@Override
public List<Row> getActions() {
System.out.println("转换类的actionActions");
//插入的行对象
List<Row> rows=new ArrayList<>();
//解析每行的event
byte[] body = this.rowEvent.getBody();
String line= null;
try {
line = new String(body,"utf8");
//System.out.println("单行:"+line);
Matcher m = Pattern.compile("^(.*,)(.*\\s)(.*)").matcher(line);
if (m.matches()){
String num = m.group(1);
String title = m.group(2);
String ans = m.group(3);
System.out.println("题号:"+num+",题干:"+title+",答案:"+ans);
//创建当前行,插入单行数据
Put row=new Put(num.getBytes());
row.addColumn("cf".getBytes(),"title".getBytes(),title.getBytes());
row.addColumn("cf".getBytes(),"ans".getBytes(),ans.getBytes());
rows.add(row);
}
else {
System.out.println("正则未捕获");
}
} catch (UnsupportedEncodingException e) {
System.out.println("getActions错误");
}
return rows;
}
@Override
public List<Increment> getIncrements() {
List<Increment> list=new ArrayList<>();
System.out.println("转换类的getIncrements");
return list;
}
@Override
public void close() {
this.rowEvent=null;
this.colFamily=null;
}
@Override
public void configure(Context context) {
}
@Override
public void configure(ComponentConfiguration componentConfiguration) {
}
}
2. 修改flume的配置文件, 自定义为a3.conf
数据源为文本文件mydata.txt, 放在/home/hadoop下
1-1,hello world
1-2,nihao shijie
1-3,nihao shijie
自定义flume配置文件a3.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/mydata.txt#tail -F会持续监听文件的增长
a1.sources.r1.port = 9999#自定义端口
a1.sources.r1.host = ghym#本机IP
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = access_log#自定义hbase表名
a1.sinks.k1.columnFamily = cf#自定义表access_log的列族名
a1.sinks.k1.serializer = com.MinputHbase#自定义jar中的类
a1.sinks.k1.channel = memoryChannel
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3. 启动hbase,创建表
create 'access_log','cf'
4. 启动flume
会将mydata.txt中的每行数据按格式插入hbase的表中
flume-ng agent -c . -f a3.conf -n a1 -Dflume.root.logger=INFO,console
动态添加数据到mydata.txt中
如再添加一行,会自动添加入hbase中
echo '1-4,nihao shijie' >> mydata.txt