简介
在Flink文档中,提供connector读取源数据和把处理结果存储到外部系统中。但是没有提供数据库的connector,如果要读写数据库,官网给出了异步IO(Asynchronous I/O)专门用于访问外部数据,详细可看:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html
还有一种方法是继承RichSourceFunction,重写里面的方法,所有的数据库flink都可以通过这两种方式进行数据的读写,这里以hbase为例进行说明。
flink读写Hbase
写入HBase提供两种方式:
- 第一种:继承RichSourceFunction重写父类方法
- 第二种:实现OutputFormat接口
本文主要介绍我们实现OutputFormat接口的具体步骤
实现OutputFormat接口
实现方式:
我们需要自己自定义一个hbase的操作类实现OutputFormat接口,重写里面的抽象方法,也就是下面的抽象方法
public interface OutputFormat<IT> extends Serializable {
void configure(Configuration parameters);
void open(int taskNumber, int numTasks) throws IOException;
void writeRecord(IT record) throws IOException;
void close() throws IOException;
}
抽象方法说明
configure
configure方法主要用于:配置输出格式。由于输出格式是通用的,因此是无参数的,这个方法是输出格式根据配置值设置基本字段的地方,此方法总是在实例化输出格式上首先调用,但是我们不会这个方法做实际测操作。
open
用于打开输出格式的并行实例,以存储其并行实例的结果,调用此方法时,将确保配置该方法的输出格式。所以在open方法中我们会进行hbase的连接,配置,建表等操作。
writeRecord
用于将数据写入数据源,所以我们会在这个方法中调用写入hbase的API
close
这个不用说了就是关闭数据源的连接
导入依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
实例
public class HBaseOutputFormat implements OutputFormat<Tuple5<Long, Long, Long, String, Long>> {
private org.apache.hadoop.conf.Configuration conf = null;
private Connection conn = null;
private Table table = null;
@Override
public void configure(Configuration parameters) {
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
HbaseUtil.setConf("ip1,ip2,ip3", "2181");
conn = HbaseUtil.connection;
HbaseUtil.createTable("flink_test2","info");
}
@Override
public void writeRecord(Tuple5<Long, Long, Long, String, Long> record) throws IOException {
Put put = new Put(Bytes.toBytes(record.f0+record.f4));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("uerid"), Bytes.toBytes(record.f0));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("behavior"), Bytes.toBytes(record.f3));
ArrayList<Put> putList = new ArrayList<>();
putList.add(put);
//设置缓存1m,当达到1m时数据会自动刷到hbase
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("flink_test2"));
params.writeBufferSize(1024 * 1024); //设置缓存的大小
BufferedMutator mutator = conn.getBufferedMutator(params);
mutator.mutate(putList);
mutator.flush();
putList.clear();
}
public void close() throws IOException {
if (table != null) {
table.close();
}
if (conn != null) {
conn.close();
}
}
}
如何使用
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val dataStream = env.addSource()
...........
dataStream.writeUsingOutputFormat(new HBaseOutputFormat());//写入Hbase
---------------------
扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦
扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦
扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦