package count1
import java.net.URL
import org.apache.spark.{SparkConf, SparkContext}
/**
* 每个学科中子版块的访问量最大的前一名
*/
object URLCount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("SparkWorkCount")//.setMaster("local[2]")
val sc=new SparkContext(conf)
sc.setLogLevel("WARN")
val rdd=sc.textFile("hdfs://192.168.16.100:9000/data/dashuju.log",1).map(lines=>{
val fields=lines.split("\t")
(fields(1),1)
})
val rdd1=rdd.reduceByKey(_+_)
val rdd2=rdd1.map(t=>{
//t._1 url t._2 总次数
val url=t._1
val host=new URL(url).getHost
val count=t._2
(host,url,count)
})
//按照host进行分组
val rdd3=rdd2.groupBy(_._1)
val rdd4=rdd3.mapValues(it=>{
it.toList.sortBy(_._3).reverse.take(1)
})
val res=rdd4.map(x=>(x._2))
val
arr=Array("java.dashuju.cn","php.dashuju.cn","net.dashuju.cn")
for(course<-arr){
val rdd5=rdd2.filter(_._1==course)
val res1=rdd5.sortBy(_._3,false).take(1)
//Array转RDD
val res2=sc.parallelize(res1).saveAsTextFile("hdfs://192.168.16.100:9000" + "/reswork"+course)
println(res1.toBuffer)
}
sc.stop()
}
}
//把代码 打成Jar包上传到到hdfs上
hadoop端启动集群把文件上传到集群上
在spack目录下执行命令:
bin/spark-submit --class count.URLCount --master yarn-client --executor-memory 1G --total-executor-cores 1 /root/URLCount.jar
查看: