版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_28844767/article/details/82688511
为了实现TopN,我们需要一个散列表数据结构,从而可以得到键的全序
首先用一个类来实现并测试使用。
import scala.collection.SortedMap
import scala.collection.immutable.TreeMap
object TopN {
def topN[T](L:List[(T,Integer)],N:Int): SortedMap[Integer,T] ={
if((L == null) || (L.isEmpty)){
return null
}
var topN: SortedMap[Integer, T] = TreeMap[Integer,T]()
for(element:(T,Integer) <- L){
// element._1类型为T
// element._2 是频度,类型为Integer
topN +=((element._2,element._1))
// 只保留top N
if(topN.size > N){
topN -=(topN.firstKey)
}
}
topN
}
// 测试
def main(args: Array[String]): Unit = {
val tuples: List[(String, Integer)] = List(("a",2),("b",3),("c",1),("d",7))
val map: SortedMap[Integer, String] = topN[String](tuples,2)
for(element <- map){
System.out.println(element._2+"\t"+element._1)
}
}
}
使用MapReduce/Hadoop(唯一键)实现:
package mapreduce
import java.lang
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.SortedMap
import scala.collection.immutable.TreeMap
object TopN {
def main(args: Array[String]): Unit = {
val conf: Configuration = new Configuration()
conf.set("fs.defaultFS","hdfs://ran:9000")
System.setProperty("HADOOP_USER_NAME","ran")
val job = Job.getInstance(conf)
job.setJarByClass(TopN.getClass)
job.setMapperClass(TopN_Mapper.getClass)
job.setReducerClass(TopN_Reducer.getClass)
job.setMapOutputKeyClass(NullWritable.get().getClass)
job.setMapOutputValueClass(new Text().getClass)
job.setOutputKeyClass(NullWritable.get().getClass)
job.setOutputValueClass(new Text().getClass)
FileInputFormat.setInputPaths(job,args(0))
val outputPath: Path = new Path(args(1))
val fs: FileSystem = FileSystem.get(conf)
if(fs.exists(outputPath)){
fs.delete(outputPath,true)
}
FileOutputFormat.setOutputPath(job,outputPath)
val isDone: Boolean = job.waitForCompletion(true)
System.exit(if(isDone) 0 else 1)
}
object TopN_Mapper extends Mapper[LongWritable,Text,NullWritable,Text]{
//定义本地topN所需的数据结构
private var topN: SortedMap[Double, Text] = new TreeMap[Double,Text]()
//默认为top 5
private var N = 5;
override def setup(context: Mapper[LongWritable, Text, NullWritable, Text]#Context): Unit = {
//"top.n"由作业的驱动器设置
val conf: Configuration = context.getConfiguration
// N = conf.get("top.n").toInt
}
/**
*
* @param key
* @param value 是一个String,格式<cat_weight>,<cat_id>;<cat_name>
* @param context
*/
override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, NullWritable, Text]#Context): Unit = {
val tokens: Array[String] = value.toString.split(",")
// cat_weight = tokens(0)
// <cat_id>;<cat_name> = tokens(1)
val weight: Double = tokens(0).toDouble
topN +=((weight,value))
//只保留topN
if(topN.size > N){
topN-=(topN.firstKey)
}
System.out.println(topN.toString())
}
/**
* 在各个映射器的最后执行一次cleanup()函数
* 在这里建立"topN cat"
* @param context
*/
override def cleanup(context: Mapper[LongWritable, Text, NullWritable, Text]#Context): Unit = {
//现在从这个映射器发出topN元素
System.out.println(topN.toString())
topN.values.foreach( context.write(NullWritable.get(),_) )
}
}
object TopN_Reducer extends Reducer[NullWritable,Text,NullWritable,Text]{
//默认为top 5
private var N = 5;
override def setup(context: Reducer[NullWritable, Text, NullWritable, Text]#Context): Unit = {
//"top.n"由作业的驱动器设置
val conf: Configuration = context.getConfiguration
// N = conf.get("top.n").toInt
}
/**
*
* @param key 为null(单个规约器)
* @param values 是一个String列表,列表的每个元素 格式<cat_weight>,<cat_id>;<cat_name>
* @param context
*/
override def reduce(key: NullWritable, values: lang.Iterable[Text], context: Reducer[NullWritable, Text, NullWritable, Text]#Context): Unit = {
var finalTopN: SortedMap[Double, Text] = TreeMap[Double, Text]()
//聚集所有本地topN列表
while (values.iterator().hasNext){
val catRecord: Text = values.iterator().next()
val tokens: Array[String] = catRecord.toString.split(",")
System.out.println(tokens(0))
val weight: Double = tokens(0).toDouble
finalTopN+=((weight,catRecord))
if(finalTopN.size > N){
finalTopN-=(finalTopN.firstKey)
}
}
//发出最终的topN 10 列表
for(text <- finalTopN.values){
context.write(NullWritable.get(),text)
}
}
}
}
Spark实现:唯一键
pom.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ran</groupId>
<artifactId>ToN10</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>2.7.6</hadoop.version>
<spark.version>2.3.0</spark.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapred -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapred</artifactId>
<version>0.22.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- log4j日志文件管理包版本 -->
<!-- 日志文件管理包 -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
</project>
log4j.properties文件:
### set log levels ###
log4j.rootLogger=INFO, stdout, log, error
log4j.Logger=search,Test
### console ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [qh_cloud][%p] [%-d{yyyy-MM-dd HH:mm:ss}] %C.%M(%L) | %m%n
### log file ###
log4j.appender.log = org.apache.log4j.DailyRollingFileAppender
log4j.appender.log.File = ${catalina.base}/logs/qh_cloud_info.log
log4j.appender.log.Append = true
log4j.appender.log.Threshold = INFO
log4j.appender.log.DatePattern='.'yyyy-MM-dd
log4j.appender.log.layout = org.apache.log4j.PatternLayout
log4j.appender.log.layout.ConversionPattern = [qh_cloud][%p] [%-d{yyyy-MM-dd HH:mm:ss}] %C.%M(%L) | %m%n
### exception ###
log4j.appender.error = org.apache.log4j.DailyRollingFileAppender
log4j.appender.error.File = ${catalina.base}/logs/qh_cloud_error.log
log4j.appender.error.Append = true
log4j.appender.error.Threshold = ERROR
log4j.appender.error.DatePattern='.'yyyy-MM-dd
log4j.appender.error.layout = org.apache.log4j.PatternLayout
log4j.appender.error.layout.ConversionPattern = [qh_cloud][%p] [%-d{yyyy-MM-dd HH:mm:ss}] %C.%M(%L) | %m%n
扫描二维码关注公众号,回复:
4419581 查看本文章
说明:以下代码均为scala语言编写。