接上一篇文章,SparkRDD算子接收的都是函数,如filter、map、flatmap等都是接收的匿名函数。
向Spark算子传递函数
Java的两种方法
匿名内部类
如前一篇文章中
JavaRDD<String> filterRDD = textFileRDD.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return s.contains("spark"); //返回带有"spark"内容的行。
}
});
创建类实现Function接口
如果方法比较复杂,可以独立创建类,这里只做示例
public class LearnSpark {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("LearnSpark").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD textFileRDD = sc.textFile("in/README.md");
JavaRDD<String> filterRDD = textFileRDD.filter(new GetSpark());
System.out.println("包含spark的行数:"+ filterRDD.count());
}
static class GetSpark implements Function<String, Boolean> {
public Boolean call(String s) {
return s.contains("spark");
}
}
}
Scala的两种方法
传递匿名函数
val filterRDD = textFileRDD.filter(line => line.contains("spark"))
定义全局单例对象中的静态方法
val filterRDD2 = textFileRDD.filter(GetSpark.fun1)
定义全局单例对象中的静态方法GetSpark.fun1
object GetSpark{
def fun1(s:String):Boolean = s.contains("spark")
}