flink学习笔记-dataSet参数传递:构造器传递、withParameters传递、全局参数传递。

在dataSet代码当中,经常用到一些参数,我们可以通过构造器的方式传递参数,或者使用withParameters方法来进行参数传递,或者使用ExecutionConfig来进行参数传递

  • 1:使用构造器来传递参数

    object FlinkParameter {
    
      def main(args: Array[String]): Unit = {
        val env=ExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
        val sourceSet: DataSet[String] = env.fromElements("hello world","abc test")
        val filterSet: DataSet[String] = sourceSet.filter(new MyFilterFunction("test"))
        filterSet.print()
        env.execute()
      }
    }
    
    class MyFilterFunction (parameter:String) extends FilterFunction[String]{
      override def filter(t: String): Boolean = {
        if(t.contains(parameter)){
          true
        }else{
          false
        }
      }
    }
    
  • 2:使用withParameters来传递参数

    import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.configuration.Configuration
    
    object FlinkParameter {
      def main(args: Array[String]): Unit = {
        val env=ExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
        val sourceSet: DataSet[String] = env.fromElements("hello world","abc test")
        val configuration = new Configuration()
        configuration.setString("parameterKey","test")
        val filterSet: DataSet[String] = sourceSet.filter(new MyFilter).withParameters(configuration)
        filterSet.print()
        env.execute()
      }
    }
    class MyFilter extends RichFilterFunction[String]{
      var value:String ="";
      override def open(parameters: Configuration): Unit = {
        value = parameters.getString("parameterKey","defaultValue")
      }
      override def filter(t: String): Boolean = {
        if(t.contains(value)){
          true
        }else{
          false
        }
      }
    }
    
  • 3:全局参数传递

    import org.apache.flink.api.common.ExecutionConfig
    import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.configuration.Configuration
    
    object FlinkParameter {
      def main(args: Array[String]): Unit = {
        val configuration = new Configuration()
        configuration.setString("parameterKey","test")
    
        val env=ExecutionEnvironment.getExecutionEnvironment
        env.getConfig.setGlobalJobParameters(configuration)
        import org.apache.flink.api.scala._
        val sourceSet: DataSet[String] = env.fromElements("hello world","abc test")
    
        val filterSet: DataSet[String] = sourceSet.filter(new MyFilter)
        filterSet.print()
        env.execute()
      }
    }
    class MyFilter extends RichFilterFunction[String]{
      var value:String ="";
      override def open(parameters: Configuration): Unit = {
        val parameters: ExecutionConfig.GlobalJobParameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
    
        val globalConf:Configuration =  parameters.asInstanceOf[Configuration]
        value = globalConf.getString("parameterKey","test")
      }
      override def filter(t: String): Boolean = {
        if(t.contains(value)){
          true
        }else{
          false
        }
      }
    }
    
发布了40 篇原创文章 · 获赞 59 · 访问量 1392

猜你喜欢

转载自blog.csdn.net/qq_26719997/article/details/105099229