package com.qf.gp1707.day06
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkWC {
def main(args: Array[String]): Unit = {
//配置信息类
val conf: SparkConf = new SparkConf().setAppName("SparkWC")//.setMaster("local[*]")
//上下文对象
val sc: SparkContext = new SparkContext(conf)
//wc下可以只放入简单一个文件测试,文件参考如下wc1.log
//读取数据 注意此处需要在idea中配置运行参数 参考我的arg0:hdfs://192.168.88.131:9000/wc/
// age1:hdfs://192.168.88.136:9000/wc1/
val lines = sc.textFile(args(0))
//处理数据
val words: RDD[String] = lines.flatMap(_.split(" "))
val paired: RDD[(String, Int)] = words.map((_, 1))
val reduced: RDD[(String, Int)] = paired.reduceByKey(_+_)
val res: RDD[(String, Int)] = reduced.sortBy(_._2, false)
//保存数据
res.saveAsTextFile(args(1))
println(res.collect().toBuffer)
//结束任务
sc.stop()
}
}
参考日志文件如下:
Hello tom
Hello jerry
Hello tom
Hello kitty
Hello tom
Hello jerry
运行结果如下:
ArrayBuffer((hello,6), (tom,3), (jerry,2), (kitty,1))