Flink-TableAPI & SQLAPI
如何使用TableAPI
1.首先引入相关的table API依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.2</version>
</dependency>
2.简单编程
package com.shufang.flink.table_and_sql_api
import com.shufang.flink.bean.People
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala.StreamTableEnvironment
/**
* 主要是用来测试TableAPI和Flink SQL
* case class People(id: Int, name: String, score: Double)
*/
object TableTest {
def main(args: Array[String]): Unit = {
//获取环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//获取Table环境
val tEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
//获取到一个流,同时将流中的数据转化成样例类
val peopleStream: DataStream[People] = env.socketTextStream("localhost", 9999).map(
line => {
val strings: Array[String] = line.split(",")
People(strings(0).trim.toInt, strings(1), strings(2).trim.toDouble)
}
)
//然后通过TableEnv中获取动态Table DataStream -> Table、
val peopleTable: Table = tEnv.fromDataStream(peopleStream)
//因为是case class People ,tEnv可以根据样例类生成动态表,属性就代表字段
val table: Table = peopleTable.select("id,name,score").filter("score > 80 ")
/**
* 将动态表转化成Stream
* 方式1: tEnv.toAppendStream[(Int, String, Double)](table) //只适合insert类型的table
* 方式2: tEnv.toRetractStream[People](peopleTable) //delete、或者groupby后使用
*/
val tableToStream: DataStream[(Int, String, Double)] = tEnv.toAppendStream[(Int, String, Double)](table)
//Boolean:true新数据,false老数据
val tableToStream02: DataStream[(Boolean, People)] = tEnv.toRetractStream[People](peopleTable)
tableToStream.print()
//执行程序
env.execute("table test")
}
}
//主要步骤->
//获取Table环境
//然后通过TableEnv中获取动态Table DataStream -> Table、
//因为是case class People ,tEnv可以根据样例类生成动态表,属性就代表字段
//将动态表转化成Stream
//动态table的Window操作
people
.window(Tumble over 10000.millis on 'ts as 'tt) //tt属于window的别名,on后面指定时间字段
.groupby('ch,'tt) //group by中必须输入‘tt 窗口的别名
.select('ch,'ch.count)
指定动态Table的时间字段
1.在env中指定时间语义
2.在steam中指定时间字段和延迟时间
3.Table指定时间字段,如果需要用到时间窗口,必须在窗口操作之前提前 指定时间字段
tEnv.fromDataStream(stream,…,'ts.rowtime) //eventtime语义下指定的时间字段
tEnv.fromDataStream(stream,…,'ts.proctime) //processingtime语义下的时间字段
4.在tableApi中 的Tumble over 10000.millis on 'ts
as'alias window