Flink 自定义源算子之 读取MySQL

1、功能说明:

在Flink 自定义源算子中封装jdbc来读取MySQL中的数据

2、代码示例

Flink版本说明:flink_1.13.0、scala_2.12

自定义Source算子,这里我们继承RichParallelSourceFunction,因为要使用open方法来初始化数据库连接对象

Tips:这种实现方式为可并行算子,当并行度>1时,每个并行任务都会读取相同的数据,使用的时候需要注意

package com.baidu.bean

case class User(id: Long, name: String)
class MysqlSource extends RichParallelSourceFunction[User] {
  // 定义 Connection、PreparedStatement对象
  var connection: Connection = null
  var ps: PreparedStatement = null

  // 函数初始化方法,常用来初始化资源对象,常用来做一次性的设置
  // 当 MysqlSource对象被创建时,调用一次
  override def open(parameters: Configuration): Unit = {
    // 初始化 Connection、PreparedStatement对象
    // 加载数据库驱动
    Class.forName("com.mysql.jdbc.Driver")
    // 获取连接
    connection = DriverManager.getConnection("jdbc:mysql://worker01/flink", "root", "worker123")
    // 读取user表
    ps = connection.prepareStatement("select *  from user")
  }

  override def run(ctx: SourceFunction.SourceContext[User]): Unit = {
    // 执行查询操作,获取查询结果
    val resultSet = ps.executeQuery()
    // 将查询结果封装到user对象
    while (resultSet.next()) {
      val user = User(resultSet.getLong(1),
        resultSet.getString(2))
      ctx.collect(user)
    }
  }

  // 关闭连接资源
  override def cancel(): Unit = {
    connection.close()
    ps.close()
  }
}

使用 MysqlSource 来读取数据(作为有界流来处理):

  test("使用 自定义Source算子,读取mysql数据") {
    // 1. 获取流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 2. 将 自定义数据源 作为数据源
    val ds: DataStream[User] = env.addSource(new MysqlSource).setParallelism(4)

    // 3. 打印DataStream
    ds.print()

    // 4. 出发程序执行
    env.execute()
  }

执行结果:

猜你喜欢

转载自blog.csdn.net/weixin_42845827/article/details/131462189