RDD基本操作
RDD是Spark提供给程序员操作的基本对象,很多Map/Reduce的操作都是在RDD上进行的,
1. 将List转化为RDD
scala> val rdd = sc.parallelize(List(1,2,3,4,5)); rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12
2. 对RDD进行过滤
scala> val filteredRDD = rdd.filter(_ >= 4); filteredRDD: org.apache.spark.rdd.RDD[Int] = FilteredRDD[1] at filter at <console>:14
3. 对filteredRDD执行collect操作
scala> filteredRDD.collect(); res0: Array[Int] = Array(4, 5) //可见满足条件的元素是4和5
4. 对RDD做map操作
scala> var mappedRDD = rdd.map(_ * 2) mappedRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[2] at map at <console>:14
5. 对mappedRDD做collect操作
scala> mappedRDD.collect(); res1: Array[Int] = Array(2, 4, 6, 8, 10)
6. 对rdd做函数式编程
scala> sc.parallelize(List(1,2,3,4,5)).map(_ * 2).filter(_ >=4 ).collect(); res2: Array[Int] = Array(4, 6, 8, 10)
读写HDFS、缓存
1. 将Spark安装目录下的README.md文件上传至HDFS
hdfs dfs -put /home/hadoop/software/spark-1.2.0-bin-hadoop2.4/README.md /user/hadoop
查看README.md的前10行
hdfs dfs -cat /user/hadoop/README.md | head -10
得到结果
# Apache Spark Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, and Python, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing. <http://spark.apache.org/>
2. Spark读取HDFS上的README.md
scala> val rdd = sc.textFile("README.md"); rdd: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[9] at textFile at <console>:12
3. 行数统计
scala> rdd.count(); 15/01/02 03:52:57 INFO scheduler.DAGScheduler: Job 4 finished: count at <console>:15, took 0.396747 s res4: Long = 98
可见,README.md一共98行,Spark读取它用了0.396747秒。执行如下命令,可以确定README.md确实是98行
[hadoop@hadoop spark-1.2.0-bin-hadoop2.4]$ cat /home/hadoop/software/spark-1.2.0-bin-hadoop2.4/README.md | wc -l 98
4. RDD缓存
在第三步的基础上,再次执行rdd.count()操作,
scala> rdd.count(); 15/01/02 03:52:57 INFO scheduler.DAGScheduler: Job 4 finished: count at <console>:15, took 0.051862 s res4: Long = 98
此时使用了0.05秒,远远小于第一次运行count操作的0.39秒,原因是Spark对结果RDD进行了缓存了,
可以使用如下操作显式的将RDD加入缓存
scala> rdd.cache(); res9: rdd.type = README.md MappedRDD[9] at textFile at <console>:12
5. 将rdd中的数据(README.md)回写到HDFS中
scala> rdd.saveAsTextFile("RDD");
注意,RDD是HDFS /user/hadoop目录下的一个目录,真正的内容存储在part-00000文件中
[hadoop@hadoop spark-1.2.0-bin-hadoop2.4]$ hdfs dfs -cat /user/hadoop/RDD/part-00000 | wc -l 98
词频统计
1. 统计HDFS上,README.md的词频(word count)
scala> val wordCountRDD = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_); wordCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[13] at reduceByKey at <console>:14
2. 词频统计
scala> wordCountRDD.collect(); ///结果 res11: Array[(String, Int)] = Array((package,1), (For,2), (processing.,1), (Programs,1), (Because,1), (The,1), (cluster.,1), (its,1), ([run,1), (APIs,1), (computation,1), (Try,1), (have,1), (through,1), (several,1), (This,2), ("yarn-cluster",1), (graph,1), (Hive,2), (storage,1), (["Specifying,1), (To,2), (page](http://spark.apache.org/documentation.html),1), (Once,1), (application,1), (prefer,1), (SparkPi,2), (engine,1), (version,1), (file,1), (documentation,,1), (processing,,2), (the,21), (are,1), (systems.,1), (params,1), (not,1), (different,1), (refer,2), (Interactive,2), (given.,1), (if,4), (build,3), (when,1), (be,2), (Tests,1), (Apache,1), (all,1), (./bin/run-example,2), (programs,,1), (including,3), (Spark.,1), (package.,1), (1000).count(),1), (HDFS,1), (Versions,1), (Data.,1),....
3. 将统计结果写入HDFS
scala> wordCountRDD.saveAsTextFile("WordCountRDD");
查看HDFS上数据,
[hadoop@hadoop spark-1.2.0-bin-hadoop2.4]$ hdfs dfs -cat /user/hadoop/WordCountRDD/part-00000 | head -10 (package,1) (For,2) (processing.,1) (Programs,1) (Because,1) (The,1) (cluster.,1) (its,1) ([run,1) (APIs,1)
词频统计并对词频排序
1. 使用如下一条语句即可完成词频统计以及排序的功能
scala> rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(x => (x._2, x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile("SortedWordCountRDD")
2. 查看HDFS上数据,
[hadoop@hadoop spark-1.2.0-bin-hadoop2.4]$ hdfs dfs -cat /user/hadoop/SortedWordCountRDD/part-00000 | head -10 (,66) (the,21) (Spark,15) (to,14) (for,11) (and,10) (a,9) (##,8) (run,7) (is,6) [hadoop@hadoop spark-1.2.0-bin-hadoop2.4]$ hdfs dfs -cat /user/hadoop/SortedWordCountRDD/part-00000 | tail -10 (core,1) (web,1) ("local[N]",1) (package.),1) (MLlib,1) (["Building,1) (Scala,,1) (mvn,1) (./dev/run-tests,1) (sample,1)
可见,Spark对结果进行了排序