目录
一、基本介绍
Flink提供了三层API,每层在简洁性和表达性之间进行了不同的权衡。
ProcessFunction是Flink提供的最具表现力的功能接口,它提供了对时间和状态的细粒度控制,能够任意修改状态。所以ProcessFunction能够为许多有事件驱动的应用程序实现复杂的事件处理逻辑。
DataStream API为许多通用的流处理操作提供原语,比如window。DataStream API适用于Java和Scala,它基于函数实现,比如map()、reduce()等。我们也可以自己扩展接口自定义函数。
SQL & Table API 这两个都是关系API,是批处理和流处理统一的API。Table API和SQL利用Apache Calcite进行解析、验证和查询优化。它们可以与DataStream和DataSet API无缝集成,并支持用户定义标量、聚合和表值函数。关系API(relational api)目标在于简化数据分析、数据流水线(data pipelining)和ETL。
我们一般主要使用DataStream进行数据处理,下面介绍的API也是DataStream相关的API。
二、DataStream API
DataStream是Flink编写流处理作业的API。我们前面说过一个完整的Flink处理程序应该包含三部分:数据源(Source)、转换操作(Transformation)、结果接收(Sink)。下面我们从这三部分来看DataStream API。
三、数据源(Source)
Flink应用程序从数据源获取要处理的数据,DataStream通过StreamExecutionEnvironment.addResource(SourceFunction)
来添加数据源。为了方便使用,Flink预提几类预定义的数据源,比如读取文件的Source、通过Sockt读取的Source、从内存中获取的Source等。
(1)基于集合的预定义Source
基于集合的数据源一般是指从内存集合中直接读取要处理的数据,StreamExecutionEnvironment提供了4类预定义方法。
1)、fromCollection
fromCollection是从给定的集合中创建DataStream,StreamExecutionEnvironment提供了4种重载方法:
-
fromCollection(Collection<T> data):通过给定的集合创建DataStream。返回数据类型为集合元素类型。
-
fromCollection(Collection<T> data,TypeInformation<T> typeInfo):通过给定的非空集合创建DataStream。返回数据类型为typeInfo。
-
fromCollection(Iterator<T> data,Class<T> type):通过给定的迭代器创建DataStream。返回数据类型为type。
-
fromCollection(Iterator<T> data,TypeInformation<T> typeInfo):通过给定的迭代器创建DataStream。返回数据类型为typeInfo。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = List(1,2,3,4)
val stream = env.fromCollection(list)
stream.print()
env.execute("FirstJob")
2)、fromParallelCollection
fromParallelCollection和fromCollection类似,但是是并行的从迭代器中创建DataStream。
-
fromParallelCollection(SplittableIterator<T> data,Class<T> type)
-
fromParallelCollection(SplittableIterator<T>,TypeInfomation typeInfo)
和Iterable中Spliterator类似,这是JDK1.8新增的特性,并行读取集合元素。
3)、fromElements
从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的
fromElements从给定的对象序列中创建DataStream,StreamExecutionEnvironment提供了2种重载方法:
-
fromElements(T... data):从给定对象序列中创建DataStream,返回的数据类型为该对象类型自身。
-
fromElements(Class<T> type,T... data):从给定对象序列中创建DataStream,返回的数据类型type。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = List(1,2,3,4)
val stream = env.fromElement(list)
stream.print()
env.execute("FirstJob")
4)、generateSequence
generateSequence(long from,long to)从给定间隔的数字序列中创建DataStream,比如from为1,to为10,则会生成1~10的序列。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10)
stream.print()
env.execute("FirstJob")
(2)基于Socket的预定义Source
我们还可以通过Socket来读取数据,通过Socket创建的DataStream能够从Socket中无限接收字符串,字符编码采用系统默认字符集。当Socket关闭时,Source停止读取。Socket提供了5个重载方法,但是有两个方法已经标记废弃。
-
socketTextStream(String hostname,int port):指定Socket主机和端口,默认数据分隔符为换行符(\n)。
-
socketTextStream(String hostname,int port,String delimiter):指定Socket主机和端口,数据分隔符为delimiter。
-
socketTextStream(String hostname,int port,String delimiter,long maxRetry):该重载方法能够当与Socket断开时进行重连,重连次数由maxRetry决定,时间间隔为1秒。如果为0则表示立即终止不重连,如果为负数则表示一直重试。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 11111)
stream.print()
env.execute("FirstJob")
(3)基于文件的预定义Source
基于文件创建DataStream主要有两种方式:readTextFile和readFile。(readFileStream已废弃)。readTextFile就是简单读取文件,而readFile的使用方式比较灵活。
1)readTextFile
readTextFile提供了两个重载方法:
-
readTextFile(String filePath):逐行读取指定文件来创建DataStream,使用系统默认字符编码读取。
-
readTextFile(String filePath,String charsetName):逐行读取文件来创建DataStream,使用charsetName编码读取。
// 获取运行环境
val env :StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
val path ="text01.txt"
// val stream =env.readTextFile(path)
// text01.txt内容
hello world scala
hello world hadoop
hello world yarn
2)readFile
按照指定的文件格式读取文件。
readFile通过指定的FileInputFormat来读取用户指定路径的文件。
对于指定路径文件,我们可以使用不同的处理模式来处理:
-
FileProcessingMode.PROCESS_ONCE
模式只会处理文件数据一次. -
FileProcessingMode.PROCESS_CONTINUOUSLY
会监控数据源文件是否有新数据,如果有新数据则会继续处理。
readFile(FileInputFormat<T> inputFormat,String filePath,
FileProcessingMode watchType,long interval,TypeInformation typrInfo)
参数 | 说明 | 实例 |
---|---|---|
inputFormat | 创建DataStream指定的输入格式 | |
filePath | 读取的文件路径,为URI格式。既可以读取普通文件,可以读取HDFS文件 | file:///some/local/file 或hdfs://host:port/file/path |
watchType | 文件数据处理方式 | FileProcessingMode.PROCESS_ONCE或FileProcessingMode.PROCESS_CONTINUOUSLY |
interval | 在周期性监控Source的模式下(PROCESS_CONTINUOUSLY),指定每次扫描的时间间隔 | 10 |
readFile提供了几个便于使用的重载方法,但它们最终都是调用上面这个方法的。
-
readFile(FileInputFormat<T> inputFormat,String filePath):处理方式默认使用FileProcessingMode.PROCESS_ONCE。
-
readFile(FileInputFormat<T> inputFormat,String filePath,FileProcessingMode watchType,long interval):返回类型默认为inputFormat类型。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val path = new Path("/opt/modules/test.txt")
val stream = env.readFile(new TextInputFormat(path), "/opt/modules/test.txt")
stream.print()
env.execute("FirstJob")
需要注意:在使用 FileProcessingMode.PROCESS_CONTINUOUSLY 时,当修改读取文件时,Flink会将文件整体内容重新处理,也就是打破了"exactly-once"。
(4)自定义Source
除了预定义的Source外,我们还可以通过实现 SourceFunction
来自定义Source,然后通过StreamExecutionEnvironment.addSource(sourceFunction)
添加进来。比如读取Kafka数据的Source:
我们可以实现以下三个接口来自定义Source:
-
SourceFunction:创建非并行数据源。
-
ParallelSourceFunction:创建并行数据源。
-
RichParallelSourceFunction:创建并行数据源。
1)MySql source
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
object FlinkSelfDataSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(new SourceFromMySql).print()
env.execute("Flink add self data source")
}
}
class SourceFromMySql extends RichSourceFunction[Student]{
var ps:PreparedStatement = _
var connection:Connection = _
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。
* @param parameters
* @throws Exception
*/
override def open(parameters: Configuration): Unit = {
super.open(parameters)
connection = this.getConnection()
val sql = "select * from student;"
ps = this.connection.prepareStatement(sql)
}
def getConnection():Connection= {
var con :Connection = null
try {
Class.forName("com.mysql.jdbc.Driver")
con = DriverManager.getConnection("jdbc:mysql://ip:port
/test_sun?useUnicode=true&characterEncoding=UTF-8", "xxxx", "xxxx")
} catch {
case e:Exception =>System.out.println("-----------
mysql get connection has exception , msg = "+ e.getMessage)
}
con
}
override def cancel() = {}
/**
* DataStream 调用一次 run() 方法用来获取数据
* @param sourceContext
*/
override def run(sourceContext: SourceFunction.SourceContext[Student]) = {
val resultSet:ResultSet = ps.executeQuery();
while (resultSet.next()) {
val student = new Student
student.id = resultSet.getInt("id")
student.name = resultSet.getString("name").trim()
student.password = resultSet.getString("password").trim()
student.age = resultSet.getInt("age")
sourceContext.collect(student)
}
}
}
class Student {
var id = 0
var name: String = null
var password: String = null
var age = 0
override def toString = s"Student($id, $name, $password, $age)"
}
2)Kafka source
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
object FlinkKafkaTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.put("bootstrap.servers", "Kafka-01:9092")
props.put("zookeeper.connect", "localhost:2181")
props.put("group.id", "flink_test_sxp")
//key 反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//value 反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(),props)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
val stream = env.addSource(myConsumer)
stream.print()
env.execute("Flink kafka test")
}
}
四、数据转换(Transformation)
数据处理的核心就是对数据进行各种转化操作,在Flink上就是通过转换将一个或多个DataStream转换成新的DataStream。
为了更好的理解transformation函数,下面给出匿名类的方式来实现各个函数。
所有转换函数都是依赖以下基础:
// 获取运行环境
val env :StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
val path ="text01.txt"
val stream =env.readTextFile(path)
1、基础转换操作
(1)Map转换
1)Map
接受一个元素,输出一个元素。MapFunction<T,V>中T代表输入数据类型(map方法的参数类型),V代表操作结果输出类型(map方法返回数据类型)。
2)flatMap
输入一个元素,输出0个、1个或多个元素。FlatMapFunction<T,V>中T代表输入元素数据类型(flatMap方法的第一个参数类型),V代表输出集合中元素类型(flatMap中的Collector类型参数)
转换前后数据类型:DataStream->DataStream。
3)filter
过滤指定元素数据,如果返回true则该元素继续向下传递,如果为false则将该元素过滤掉。FilterFunction<T>中T代表输入元素的数据类型。
转换前后数据类型:DataStream->DataStream。
object FlinkSource01 {
def main(args: Array[String]): Unit = {
// 获取运行环境
val env :StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
val path ="text01.txt"
val stream =env.readTextFile(path)
// val list =List(1,2,3,4)
// val stream= env.fromCollection(list)
// val stream =env.generateSequence(1,10)
import org.apache.flink.api.scala._
val mapStream = stream.map(_.split(" ").mkString("_"))
val flatmapStream = mapStream.flatMap(_.split("_"))
val filterStream = flatmapStream.filter(_.equals("hello"))
filterStream.print()//6> hello world flink---->6> 线程编号
env.execute("FlinkSource01")
}
}
(2)键值对操作
1)keyBy
DataStream → KeyedStream:输入必须是 Tuple 类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
val streamMap = streamFlatMap.map{
x => (x,1)
}
val streamKeyBy = streamMap.keyBy(0)
env.execute("FirstJob")
以下情况的元素不能作为key使用:
- POJO类型,但没有重写hashCode(),而是依赖Object.hashCode()。
- 该元素是数组类型。
2)reduce
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
转换前后数据类型:KeyedStream->DataStream。
KeyBy和Reduce实例
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item =>
(item, 1)).keyBy(0)
val streamReduce = stream.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
streamReduce.print()
env.execute("FirstJob")
object FlinkSource01 {
def main(args: Array[String]): Unit = {
// 获取运行环境
val env :StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
val path ="text01.txt"
val stream =env.readTextFile(path)
import org.apache.flink.api.scala._
val keybyReduceStream = stream.flatMap(_.split(" "))
.map(w=> (w,1))//把单词转成(word,1)这种形式
// .keyBy(0)
.keyBy(_._1)
.reduce((x, y) => (x._1, x._2 + y._2))
keybyReduceStream.print()
env.execute("FlinkSource01")
}
}
3)Fold(Deprecated)
KeyedStream → DataStream:一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。
该方法已经标记为废弃!
转换前后数据类型:KeyedStream->DataStream。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item =>
(item, 1)).keyBy(0)
val streamReduce = stream.fold(100)(
(begin, item) => (begin + item._2)
)
streamReduce.print()
env.execute("FirstJob")
4)Aggregations
滚动聚合具有相同key的数据流元素,我们可以指定需要聚合的字段(field)。DataStream<T>中的T为聚合之后的结果。
KeyedStream → DataStream:分组数据流上的滚动聚合操作。 min 和 minBy 的区别是 min 返回的是一个最小值,而 minBy 返回的是其字段中包含最小值的元素(同样原理适用于 max 和 maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
//对KeyedStream中元素的第一个Filed求和
DataStream<String> dataStream1 = keyedStream.sum(0);
//对KeyedStream中元素的“count”字段求和
keyedStream.sum("count");
//获取keyedStream中第一个字段的最小值
keyedStream.min(0);
//获取keyedStream中count字段的最小值的元素
keyedStream.minBy("count");
keyedStream.max("count");
keyedStream.maxBy(0);
min和minBy的区别是:min返回指定字段的最小值,而minBy返回最小值所在的元素。
转换前后数据类型:KeyedStream->DataStream。
实例
object FlinkSource01 {
def main(args: Array[String]): Unit = {
// 获取运行环境
val env :StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
val path ="text01.txt"
val stream =env.readTextFile(path)
// val list =List(1,2,3,4)
// val stream= env.fromCollection(list)
// val stream =env.generateSequence(1,10)
import org.apache.flink.api.scala._
val keybyReduceStream = stream.flatMap(_.split(" "))
.map(w=> (w,1))//把单词转成(word,1)这种形式
// .keyBy(0)
.keyBy(_._1)
// reduce
.reduce((x, y) => (x._1, x._2 + y._2))
// Aggregations
// .sum(1)
// .min(1)
.keyBy(0)
.minBy(1)
keybyReduceStream.print()
env.execute("FlinkSource01")
}
// case class WordWithCount(word:String,count:Long)
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split("
")(1).toLong)).keyBy(0)
val streamReduce = stream.sum(1)
streamReduce.print()
env.execute("FirstJob")
(3)stream合并操作
1)Connect
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中, 内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamMap = stream.flatMap(item => item.split(" ")).filter(item =>
item.equals("hadoop"))
val streamCollect = env.fromCollection(List(1,2,3,4))
val streamConnect = streamMap.connect(streamCollect)
streamConnect.map(item=>println(item), item=>println(item))
env.execute("FirstJob")
2)CoMap,CoFlatMap
ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.readTextFile("test.txt")
val streamFlatMap = stream1.flatMap(x => x.split(" "))
val stream2 = env.fromCollection(List(1,2,3,4))
val streamConnect = streamFlatMap.connect(stream2)
val streamCoMap = streamConnect.map(
(str) => str + "connect",
(in) => in + 100
)
env.execute("FirstJob")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.readTextFile("test.txt")
val stream2 = env.readTextFile("test1.txt")
val streamConnect = stream1.connect(stream2)
val streamCoMap = streamConnect.flatMap(
(str1) => str1.split(" "),
(str2) => str2.split(" ")
)
streamConnect.map(item=>println(item), item=>println(item))
env.execute("FirstJob")
3)split
DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
num =>
# 字符串内容为 hadoop 的组成一个 DataStream,其余的组成一个 DataStream
(num.equals("hadoop")) match{
case true => List("hadoop")
case false => List("other")
}
)
env.execute("FirstJob")
4)select
SplitStream→DataStream:从一个 SplitStream中获取一个或者多个 DataStream。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(num =>
(num.equals("hadoop")) match{
case true => List("hadoop")
case false => List("other")
}
)
val hadoop = streamSplit.select("hadoop")
val other = streamSplit.select("other")
hadoop.print()
env.execute("FirstJob")
5)Union
DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。注意 :如果你将一个DataStream 跟它自己做 union 操作,在新的 DataStream 中,你将看到每一个元素都
出现两次。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.readTextFile("test.txt")
val streamFlatMap1 = stream1.flatMap(x => x.split(" "))
val stream2 = env.readTextFile("test1.txt")
val streamFlatMap2 = stream2.flatMap(x => x.split(" "))
val streamConnect = streamFlatMap1.union(streamFlatMap2)
env.execute("FirstJob")
五、分区
1、自定义分区(partitionCustom)
使用用户自定义的分区函数对指定key进行分区,partitionCustom只支持单分区。
dataStream.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return key.hashCode() % numPartitions;
}
},1);
转换前后的数据类型:DataStream->DataStream
2、随机分区(shuffle)
均匀随机将元素进行分区。
dataStream.shuffle();
转换前后的数据类型:DataStream->DataStream
3、rebalance
以轮询的方式为每个分区均衡分配元素,对于优化数据倾斜该方法非常有效。
dataStream.rebalance();
转换前后的数据类型:DataStream->DataStream
4、broadcast
使用broadcast可以向每个分区广播元素。
dataStream.broadcast();
转换前后的数据类型:DataStream->DataStream
5、rescale
根据上下游task数进行分区。
dataStream.rescale();
转换前后的数据类型:DataStream->DataStream
六、结果数据接收器(Data sink)
数据经过Flink处理之后,最终结果会写到 file、socket、外部系统或者直接打印出来。数据接收器定义在DataStream类下,我们通过addSink()可以来添加一个接收器。同 Source,Flink 也提供了一些预定义的Data Sink让我们直接使用。
1、写入文本文件
将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的 toString()方法来获取。
DataStream提供了两个writeAsText重载方法,写入格式会调用写入对象的toString()方法。
-
writeAsText(String path):将DataStream数据写入到指定文件。
-
writeAsText(String path,WriteMode writeMode):将DataStream数据写入到指定文件,可以通过writeMode来指定如果文件已经存在应该采用什么方式,可以指定OVERWRITE或NO_OVERWRITE。
2、写入CSV文件
DataStream提供了三个写入csv文件的重载方法,对于DataStream中的每个Filed,都会调用其对象的toString()方法作为写入格式。writeAsCsv只能用于元组(Tuple)的DataStream。
writeAsCsv(String path,WriteMode writeMode,String rowDelimiter,String fieldDelimiter)
参数 | 说明 | 实例 |
---|---|---|
path | 写入文件路径 | |
writeMode | 如果写入文件已经存在,采用什么方式处理 | WriteMode.NO_OVERWRITE 或WriteMode.OVERWRITE |
rowDelimiter | 定义行分隔符 | |
fieldDelimiter | 定义列分隔符 |
DataStream提供了两个简易重载方法:
-
writeAsCsv(String path):使用"\n"作为行分隔符,使用","作为列分隔符。
-
writeAsCsv(String path,WriteMode writeMode):使用"\n"作为行分隔符,使用","作为列分隔符。
4、写入Socket
Flink提供了将DataStream作为字节数组写入Socket的方法,通过SerializationSchema
来指定输出格式。
writeToSocket(String hostName,int port,SerializationSchema<T> schema)
5、指定输出格式
自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。
DataStream提供了自定义文件输出的类和方法,我们能够自定义对象到字节的转换。
writeUsingOutputFormat(OutputFormat<T> format)
6、结果打印
DataStream提供了print和printToErr打印标准输出/标准错误流。DataStream中的每个元素都会调用其toString()方法作为输出格式,我们也可以指定一个前缀字符来区分不同的输出。
-
print():标准输出
-
print(String sinkIdentifier):指定输出前缀
-
printToErr():标准错误输出
-
printToErr(String sinkIdentifier):指定输出前缀
对于并行度大于1的输出,输出结果也将输出任务的标识符作为前缀。
7、自定义输出器
我们一般会自定义输出器,通过实现SinkFunction
接口,然后通过DataStream.addSink(sinkFunction)
来指定数据接收器。
addSink(SinkFunction<T> sinkFunction)
注意:对于DataStream中的writeXxx()方法一般都是用于测试使用,因为他们并没有参与 chaeckpoint,所以它们只有"at-last-once"也就是至少处理一次语义。
如果想要可靠输出,想要使用"exactly-once"语义准确将结果写入到文件系统中,我们需要使用flink-connector-filesystem
。此外,我们也可以通过addSink()自定义输出器来使用Flink的checkpoint来完成"exactl-oncey"语义。
(1)、MySql Sink
package DataStreamApi.sink.mysqlSink
import java.text.SimpleDateFormat
import java.util.Properties
import com.google.gson.{Gson, JsonParser}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
object MySqlSinkApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.put("bootstrap.servers", "broker")
props.put("zookeeper.connect", "localhost:2181")
props.put("group.id", "flink_test_sxp")
//key 反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//value 反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(),props)
//默认读取上次保存的offset信息
myConsumer.setStartFromGroupOffsets()
// 从最早的数据开始进行消费,忽略存储的offset信息
myConsumer.setStartFromEarliest()
// 从最新的数据进行消费,忽略存储的offset信息
myConsumer.setStartFromLatest()
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
//从指定位置进行消费
specificStartOffsets.put(new KafkaTopicPartition("node-bullet-crawler-59", 0), 23L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
val dataStream = env.addSource(myConsumer)
val dataStreamMaped= ...
dataStreamMaped.addSink(new SinkToMySql)
env.execute("Flink kafka sink to mysql")
}
}
package DataStreamApi.sink.mysqlSink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
class SinkToMySql extends RichSinkFunction[MessageBean]{
var ps:PreparedStatement = _
var connection:Connection = _
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
* @param parameters
* @throws Exception
*/
override def open(parameters:Configuration ) :Unit= {
super.open(parameters)
connection = getConnection()
val sql = "insert into msg(x1, x2, x3, x3) values(?, ?, ?, ?);"
ps = this.connection.prepareStatement(sql)
}
def getConnection():Connection= {
var con :Connection = null
try {
Class.forName("com.mysql.jdbc.Driver")
con = DriverManager.getConnection("jdbc:mysql://ip:3306/test_sun?useUnicode=true&
characterEncoding=UTF-8", "userid", "password")
} catch {
case e:Exception =>System.out.println("-----------mysql connection has exception ,
msg = "+ e.getMessage)
}
con
}
override def close() :Unit= {
super.close()
//关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @throws Exception
*/
@throws[Exception]
def invoke(value: MessageBean): Unit = { //组装数据,执行插入操作
// println(value.toString)
ps.setInt(1, value.x1.toInt)
ps.setString(2, value.x2)
ps.setString(3, value.x3)
ps.setString(4, filterEmoji(value.x4))
ps.executeUpdate
}
def filterEmoji(source: String): String = {
if (source != null && source.length() > 0) {
source.replaceAll("[\ud800\udc00-\udbff\udfff\ud800-\udfff]", "")//过滤Emoji表情
.replaceAll("[\u2764\ufe0f]","")//过滤心形符号
} else {
source
}
}
}