1. flink split流 ,select流?
(拆分流&select流)
Split 就是将一个DataStream分成两个或者多个DataStream
Select 就是获取分流后对应的数据
示例代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val elements: DataStream[Int] = env.fromElements(1,2,3,4,5,6)
//数据分流
val split_data = elements.split(
(num: Int) => (num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
//获取分流后的数据
val select: DataStream[Int] = split_data.select("even")
select.print()
env.execute()
2. 流的connect 和 union 的区别?
- Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
- Connect只能操作两个流,Union可以操作多个。
3. flink的数据类型?
- 基础数据类型
Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )
- Java和Scala元组(Tuples)
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18)
- Scala样例类(case classes)
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)
- Java简单对象(POJOs)
public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));
- 其它(Arrays, Lists, Maps, Enums, 等等)
Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。
4. 什么是富函数? 富函数有什么作用?
“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
Rich Function有一个生命周期的概念。典型的生命周期方法有:
- open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
下图展现出RichSourceFunction的继承关系
示例代码:
package com.atguigu.apiTest
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
object RichFunction {
def main(args: Array[String]): Unit = {
}
}
class MyMapFunction extends RichMapFunction[Double, Int]{
//ctr + O 查看重写方法
override def map(value: Double): Int = {
value.toInt - 1
}
override def open(parameters: Configuration): Unit = super.open(parameters)
override def close(): Unit = super.close()
}
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
//子任务的index
var subTaskIndex = 0
override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// 以下可以做一些初始化工作,例如建立一个和HDFS的连接
}
override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
if (in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
}
}
override def close(): Unit = {
// 以下做一些清理工作,例如断开和HDFS的连接。
}
}