Flink 源算子之 DataGeneratorSource & DataGenerator

目录

1、功能说明

2、API使用说明

3、代码示例


1、功能说明

        从Flink1.1开始提供了DataGen连接器,它提供了Source类的实现(可并行的源算子),用来生成测试数据,在本地开发或者无法访问外部系统(如kafka)时,它就会非常有用。

        DataGen连接器是内置的,不需要额外的依赖项。


2、API使用说明

方法定义:
public DataGeneratorSource(
        DataGenerator<T> generator, long rowsPerSecond, @Nullable Long numberOfRows)

参数说明:
        DataGenerator<T> generator     :  指定数据生成器对象
        long rowsPerSecond             :  指定数据发射速率(每秒发射的记录数),默认值为Long.MAX_VALUE
        @Nullable Long numberOfRows    :  指定指定输出数据的总行数(为null时,表示一直输出)

关于DataGenerator类
public interface DataGenerator<T> extends Serializable, Iterator<T>

功能说明:
   继承了Iterator,利用迭代器来构造测试数据 

3、代码示例

Flink版本说明:flink_1.13.0、scala_2.12

定义User类:

package com.baidu.bean

case class User(id: Long, name: String)

测试代码:

  test("DataGen 连接器") {

    // 1. 获取流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)

    // 自定义 DataGenerator实现类(用来随机生成User对象)
    val userGenerator = new DataGenerator[User]() {
      // 定义随机数数据生成器
      var generator: RandomDataGenerator = _

      // 初始化数据生成器
      override def open(name: String, context: FunctionInitializationContext, runtimeContext: RuntimeContext): Unit = {
        generator = new RandomDataGenerator
      }

      // 判断迭代器是否有值
      override def hasNext: Boolean = true

      // 生成随机字符串,并返回
      override def next(): User = {
        User(generator.nextLong(1, 99) // 生成1~99区间的随机整数
          , generator.nextHexString(4) // 生成4位字符串
        )
      }
    }

    // 自定义字符串数据生成器
    val stringGenerator = new DataGenerator[String]() {
      // 定义随机数数据生成器
      var generator: RandomDataGenerator = _

      // 初始化数据生成器
      override def open(name: String, context: FunctionInitializationContext, runtimeContext: RuntimeContext): Unit = {
        generator = new RandomDataGenerator
      }

      // 是否有下一个值
      override def hasNext: Boolean = true

      // 生成随机字符串,并返回
      override def next(): String = generator.nextHexString(3)
    }

    val dataGenSource = new DataGeneratorSource(
      userGenerator // 指定数据生成器
      , 2L // 指定发射速率(每秒发射的记录数)
      , null // 指定输出数据的总行数(为null时,表示一直输出)
    )

    // 将DataGeneratorSource做为数据源
    val ds = env.addSource(dataGenSource)

    println(s"并行度: ${ds.parallelism}")

    // 打印DataStream
    ds.print()

    // 出发程序执行
    env.execute()
  }

执行结果:

猜你喜欢

转载自blog.csdn.net/weixin_42845827/article/details/131459561