//spark实现本地文件读取并用reduce进行聚合
package day04 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Test04 { def main(args: Array[String]) { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Test04") val sc :SparkContext=new SparkContext(conf) val pRdd : RDD[Int]=sc.parallelize(Array(1,2,3,4,5)) val textFile:RDD[String]=sc.textFile("D:\\1.txt") val reduceRdd:Int=pRdd.reduce(_+_) println(reduceRdd.toString) sc.stop() } }
//spark实现词频统计(scala版)
package day03 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount_Scala { def main(args: Array[String]) { //local[*] 使用当前环境的默认cpu核数作为并行的线程数 //local 使用一个线程来运行 val conf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount_Scala") val sc: SparkContext = new SparkContext(conf) val file: RDD[String] = sc.textFile("D:\\1.txt") val flatMap: RDD[String] = file.flatMap(_.split("\\s")) val map: RDD[(String, Int)] = flatMap.map((_,1)) val key = map.reduceByKey(_+_) val map1: RDD[(Int, String)] = key.map(x=>(x._2,x._1)) val map2: RDD[(Int, String)] = map1.sortByKey(false) //不写或写true-->正序 false-->倒序 val map3: RDD[(String, Int)] = map2.map(x=>(x._2,x._1)) map3.foreach(x=>{ println("当前的单词是 : " + x._1) println("单词出现的次数是 : " + x._2) }) }
}
//spark实现词频统计(java版)
package day03; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class WordCount_Java { public static void main(String[]args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount_Java"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> file = jsc.textFile("D:\\1.txt"); JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); JavaPairRDD<String, Integer> wordMap = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairRDD<String, Integer> reduceWord = wordMap.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); JavaPairRDD<Integer, String> pairRDD = reduceWord.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { public Tuple2<Integer, String> call (Tuple2<String, Integer> t2) throws Exception { return new Tuple2<Integer, String>(t2._2, t2._1); } }); JavaPairRDD<Integer, String> sortPairRDD = pairRDD.sortByKey(false); JavaPairRDD<String, Integer> resultRdd = sortPairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { public Tuple2<String, Integer> call (Tuple2<Integer, String> t2) throws Exception { return new Tuple2<String, Integer>(t2._2, t2._1); } }); resultRdd.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call (Tuple2<String, Integer> t2) throws Exception { System.out.println("出现的单词是 : " + t2._1); System.out.println("单词出现的次数是 : " + t2._2); } }); } }