基础API概览
API的一些基础概念在前文之中已经有些解释,已经解释过的这里做个简略的复习。 其实要强调的就是: 数据流、构建DAG图、Source、Sink、算子、延迟计算。
指定键
一些转换操作(join, coGroup, keyBy, groupBy)要求在元素集合上定义键。另外一些转换操作 (Reduce, GroupReduce, Aggregate, Windows)允许在应用这些转换之前将数据按键分组。
/*对 DataSet 分组*/
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*在这里定义键*/)
.reduceGroup(/*一些处理操作*/);
/*对 DataStream 指定键*/
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*在这里定义键*/)
.window(/*指定窗口*/);
Flink 的数据模型不是基于键值对的。因此你不需要将数据集类型物理地打包到键和值中。键都是“虚拟的”:它们的功能是指导分组算子用哪些数据来分组。
下面的讨论中我们将以 DataStream 和 keyby 为例。 对于 DataSet API 你只需要用 DataSet 和 groupBy 替换即可。
为Tuple定义键
/*按照 Tuple 的一个或多个字段进行分组*/
//按照第一个字段(整型字段)对 Tuple 分组。
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
//用第一个字段和第二个字段组成的组合键对 Tuple 分组
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
对于嵌套 Tuple 请注意: 如果你的 DataStream 是嵌套 Tuple,例如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定 keyBy(0) 将导致系统使用整个 Tuple2 作为键(一个整数和一个浮点数)。 如果你想“进入”到 Tuple2 的内部,你必须使用如下所述的字段表达式键。
使用字段表达式定义键
可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组、排序、join 或 coGrouping 的键。
字段表达式可以很容易地选取复合(嵌套)类型中的字段,例如 Tuple 和 POJO 类型。
可以把POJO的属性名传给keyBy()函数
// 普通的 POJO(简单的 Java 对象)
public class WC {
public String word;
public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*指定窗口*/);
字段表达式语法:
根据字段名称选择 POJO 的字段。例如 “user” 就是指 POJO 类型的“user”字段。
根据字段名称或 0 开始的字段索引选择 Tuple 的字段。例如 “f0” 和 “5” 分别指 Java Tuple 类型的第一个和第六个字段。
可以选择 POJO 和 Tuple 的嵌套字段。 例如,一个 POJO 类型有一个“user”字段还是一个 POJO 类型,那么 “user.zip” 即指这个“user”字段的“zip”字段。任意嵌套和混合的 POJO 和 Tuple都是支持的,例如 “f1.user.zip” 或 “user.f3.1.zip”。
可以使用 "*" 通配符表达式选择完整的类型。这也适用于非 Tuple 或 POJO 类型。
字段表达式示例:
public static class WC {
public ComplexNestedClass complex; //嵌套的 POJO
private int count;
// 私有字段(count)的 getter 和 setter
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
这些字段表达式对于以上代码示例都是合法的:
"count":WC 类的 count 字段。
"complex":递归选择 POJO 类型 ComplexNestedClass 的 complex 字段的全部字段。
"complex.word.f2":选择嵌套 Tuple3 类型的最后一个字段。
"complex.hadoopCitizen":选择 hadoop 的 IntWritable 类型。
使用键选择器函数定义键
定义键的另一种方法是“键选择器”函数。键选择器函数将单个元素作为输入并返回元素的键。键可以是任意类型,并且可以由确定性计算得出。
下例展示了一个简单返回对象字段的键选择器函数:
// 普通的 POJO
public class WC {public String word; public int count;}
DataStream<WC> words = // [...]
KeyedStream<WC> keyed = words
.keyBy(new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
});
制定转换函数
大多数转换操作需要用户定义函数。本节列举了指定它们的不同方法。
//实现接口
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) {
return Integer.parseInt(value);
}
};
data.map(new MyMapFunction());
//匿名类
data.map(new MapFunction<String, Integer> () {
public Integer map(String value) { return Integer.parseInt(value); }
});
//Lambda表达式
data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
另外要说一下的是富函数,所有需要用户定义函数的转换操作都可以将富函数作为参数。例如,对于
class MyMapFunction implements MapFunction<String, Integer> {
public Integer map(String value) {
return Integer.parseInt(value);
}
};
可以替换成:
class MyMapFunction extends RichMapFunction<String, Integer> {
public Integer map(String value) {
return Integer.parseInt(value);
}
};
并像往常一样将函数传递给 map 转换操作:
data.map(new MyMapFunction());
当然富函数也可以被定义为匿名类:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) {
return Integer.parseInt(value);
}
});
富函数为用户定义函数(map、reduce 等)额外提供了 4 个方法: open、close、getRuntimeContext 和 setRuntimeContext。这些方法有助于向函数传参、创建和终止本地状态、访问广播变量、访问诸如累加器和计数器等运行时信息和迭代信息。
看一下富函数的集成结构:
MapFunction和RichFunction都是继承自Function接口
MapFunction是个转换接口
RichFunction如其注释所言,定义了函数生命周期相关的接口,以及获取context的接口
累加器和计数器
累加器简单地由 加法操作 和 最终累加结果构成,可在作业结束后使用。
最简单的累加器是一个 计数器:你可以使用 Accumulator.add(V value) 方法递增它。作业结束时 Flink 会合计(合并)所有的部分结果并发送给客户端。累加器在 debug 或者你想快速了解数据的时候非常有用。
Flink 目前有如下 内置累加器。它们每一个都实现了 Accumulator 接口。
继承结构如下(只展示了常用的AverageAccumulator、IntCounter、LongCounter、DoubleCounter):
如何使用累加器:
//首先你必须在要使用它的用户定义转换函数中创建累加器对象(下例为计数器)。
private IntCounter numLines = new IntCounter();
//其次,你必须注册累加器对象,通常在富函数的 open() 方法中。在这里你还可以定义名称。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
//你现在可以在算子函数中的任何位置使用累加器,包括 open() 和 close() 方法。
this.numLines.add(1);
//总体结果将存储在 JobExecutionResult 对象中,该对象是从执行环境的 execute() 方法返回的 (目前这仅在执行等待作业完成时才有效)。
myJobExecutionResult.getAccumulatorResult("num-lines")
每个作业的所有累加器共享一个命名空间。 这样你就可以在作业的不同算子函数中使用相同的累加器。Flink 会在内部合并所有同名累加器。
关于累加器和迭代请注意: 目前,累加器的结果只有在整个作业结束以后才可用。
自定义累加器:
要实现你自己的累加器,只需编写累加器接口的实现即可。