版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
Spark 进行编程的时候, 初始化工作是在 driver端完成的, 而实际的运行程序是在executor端进行的. 所以就涉及到了进程间的通讯,下面的实例演示函数的传递
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SerDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SerDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)
//传递Searcher函数到exector端
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
//需求: 在 RDD 中查找出来包含 query 子字符串的元素
// query 为需要查找的子字符串
class Searcher(val query: String) extends Serializable{
// 判断 s 中是否包括子字符串 query
def isMatch(s : String) ={
s.contains(query)
}
// 过滤出包含 query字符串的字符串组成的新的 RDD
def getMatchedRDD1(rdd: RDD[String]) ={
rdd.filter(isMatch) //
}
// 过滤出包含 query字符串的字符串组成的新的 RDD
def getMatchedRDD2(rdd: RDD[String]) ={
rdd.filter(_.contains(query))
}
}