Maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.10</artifactId>
<version>1.3.2</version>
</dependency>
Java代码
DataStream<Tuple2<IntWritable,Text>> input = ...;
BucketingSink<String> sink = new BucketingSink<String>("/base/path");//如果跨集群要带上前缀,指定集群
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new StringWriter<>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
input.addSink(sink);
主要设置三个属性Bucketer,Writer,BatchSize。
Bucketer:数据写入hdfs目录划分,DateTimeBucketer是根据当前系统时间划分,具体粒度根据传入的参数确定。当然我们也可以设置自己的划分规则,利用数据里的字段确定划分目录;
例如我根据Json数据里的Timestamp字段确定划分目录:
class DateHourBucketer implements Bucketer<JSONObject>{
private static final long serialVersionUID = 1L;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd--HH");
@Override
public Path getBucketPath(Clock clock, Path basePath, JSONObject element) {
// TODO Auto-generated method stub
Long timetamp = element.getLong("Timestamp");
String newDateTimeString = format.format(new Date(timetamp));
return new Path(basePath + "/" + newDateTimeString);
}
}
Writer:数据写入格式,默认转化为字符串写入。如果数据格式为SequenceFiles,我们可以用SequenceFileWriter;
BatchSize:默认每一个线程一个part文件,batchsize指定part文件多大的时候生成新的文件
当然我们还是可以设置路径前缀、后缀,多长时间关闭文件句柄等等属性。
默认生成的路径格式如下:
/base/path/{date-time}/part-{parallel-task}-{count}
count是由于BatchSize而设定的part文件编号