自定义Source概述
自定义Source是Flink学习中非常重要的一环,一般实现一个Source,可以继承下面几个类之一
SourceFunction
RichSourceFunction
ParallelSourceFunction
RichParallelSourceFunction
前两个是单并行度,后2个是多并行度,无论是否多并行度,我们一般选择带Rich的SourceFunction, 也就是RichSourceFunction和RichParallelSourceFunction,这样我们可以拿到丰富的上下文信息和实现各种初始化逻辑,比如建立数据库连接等,对我们高度自由定制逻辑提供了方便之门。
多并行度问题
本篇文章主要介绍通过自定义多并行度来读取MySQL数据。对于多并行度的各个任务,彼此是独立运行的,没法相互构通,智能的去合作读取一整块MySQL数据,在Source的初始化接口open里定义了什么查询语句,就读取什么数据。所以不加干涉的话,基本上有多少个并行度,open里定义的查询就会机械地重复多少次。这样笨拙的并行度显然不是我们想要的。
问题解决思路
前面说过带Rich的SourceFunction,提供了丰富的上下文信息,通过getRuntimeContext,可以获取到任务id和任务总数。我们在这上面做文章,就可以让各个任务分别读取属于当前任务的数据,然后各个任务读取的数据合起来正好是整份我们想要的数据。
逻辑实现例子
0 例子需求
设置3个并行度,读取mysql一个表中9条数据,每个任务只读取属于自己的3条
1 自定义Source代码
//定义输出类型
case class FunnelBean(appkey:Int, funnel_name:String, steps:String)
class MySQLSource extends RichParallelSourceFunction[FunnelBean]{
var connection:Connection = null
var pstat:PreparedStatement = null
override def open(parameters: Configuration): Unit = {
val total_task = getRuntimeContext.getNumberOfParallelSubtasks
//根据当前的任务id号,决定当前任务应当读取mysql哪些数据行
val subtask_index = getRuntimeContext.getIndexOfThisSubtask
println(s"subtask_index = ${subtask_index} total_task=${total_task}")
val from_offset = subtask_index * 3
connection = MySQLUtils.getConnection()
val sql = s"select appkey, funnel_name, steps from `funnel` limit $from_offset, 3 "
pstat = connection.prepareStatement(sql)
}
override def run(sourceContext: SourceFunction.SourceContext[FunnelBean]): Unit = {
val rs = pstat.executeQuery()
var count = 0
while (rs.next()){
count += 1
val appkey = rs.getInt("appkey")
val funnel_name = rs.getString("funnel_name")
val steps = rs.getString("steps")
sourceContext.collect(FunnelBean(appkey,funnel_name,steps))
}
val subtask_index = getRuntimeContext.getIndexOfThisSubtask
println(s"任务id: ${subtask_index} 读取数据条数: ${count}")
}
override def cancel(): Unit = {
}
override def close(): Unit = {
MySQLUtils.close(connection,pstat)
}
2 测试source代码
object SourceApp01 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val datastream = env.addSource(new MySQLSource)
.setParallelism(3) //设置3个并行度
val parallelism = datastream.parallelism
println(s"数据源并行度:$parallelism")
datastream.print()
env.execute("my datasource")
}
}
3 运行结果
数据源并行度:3
# 自定义Source输出信息
subtask_index = 1 total_task=3
subtask_index = 0 total_task=3
subtask_index = 2 total_task=3
任务id: 1 读取数据条数: 3
任务id: 2 读取数据条数: 3
任务id: 0 读取数据条数: 3
# 测试类输出结果, 正好是我们想要的9条数据
1> FunnelBean(0,测试,注册#登录#参与测试)
3> FunnelBean(1,网站测试,测试001#测试002)
4> FunnelBean(2,付款漏斗,浏览商品#提交订单#付款)
2> FunnelBean(3,选择漏斗,浏览商品#加入购物车)
1> FunnelBean(4,付款漏斗,浏览商品#提交订单#付款)
4> FunnelBean(5,222hhh,浏览商品#加入购物车)
1> FunnelBean(6,提交订单漏斗,浏览商品#加入购物车#提交订单)
2> FunnelBean(7,测试漏斗20161121,E1#E2#E3#E4)
2> FunnelBean(8,提交订单漏斗,浏览商品#加入购物车#提交订单)