给定数据文件格式如下所示:
2017/07/28 qq.com/a
2017/07/28 qq.com/bx
2017/07/28 qq.com/by
2017/07/28 qq.com/by3
2017/07/28 qq.com/news
2017/07/28 sina.com/news/socail
2017/07/28 163.com/ac
2017/07/28 sina.com/news/socail
2017/07/28 163.com/sport
2017/07/28 163.com/ac
2017/07/28 sina.com/play
2017/07/28 163.com/sport
2017/07/28 163.com/ac
2017/07/28 sina.com/movie
2017/07/28 sina.com/play
2017/07/28 sina.com/movie
2017/07/28 163.com/sport
2017/07/28 sina.com/movie
1.求出每个url被访问的次数
2.求出每个网站被访问次数最多的3个url (分组topn)
3.求出访问次数最多的n个网站 (全局topn)
求每个url被访问的次数
map阶段 读取每一行 输出key为url value=1 即被访问一次
reduce阶段 相同key即相同url被聚合在一起 将value聚合相加即可得到最终结果
代码如下:
public class JobSubmitter {
public static class PageCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split(" ");
context.write(new Text(line[1]), new IntWritable(1));
}
}
public static class PageCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
Iterator<IntWritable> it = values.iterator();
while(it.hasNext()){
count += it.next().get();
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubmitter.class);
job.setMapperClass(PageCountMapper.class);
job.setReducerClass(PageCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\page\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\page\\output"));
job.setNumReduceTasks(1);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
求每个网站被访问次数最多的3个url
map阶段: 读取每一行数据 将域名作为key url作为value输出
reduce阶段: 在每一个reduce方法中,相同域名的url聚合在一起,计数每个url,排序,然后输出前3名即可。
声明Bean用来双元素排序: 实现compare方法
public class PageCount implements Comparable<PageCount>{
private String page;
private int count;
public PageCount(){}
public PageCount(String page, int count) {
this.page = page;
this.count = count;
}
public void set(String page, int count) {
this.page = page;
this.count = count;
}
public String getPage() {
return page;
}
public void setPage(String page) {
this.page = page;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public int compareTo(PageCount o) {
return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("PageCount [page=");
builder.append(page);
builder.append(", count=");
builder.append(count);
builder.append("]");
return builder.toString();
}
}
最终代码:
public class JobSubmitter {
public static class PageTopnMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(" ");
context.write(new Text(split[1].split("/")[0]), new Text(split[1]));
}
}
public static class PageTopnReducer extends Reducer<Text, Text, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
HashMap<String, Integer> hashMap = new HashMap<>();
for (Text url : values) {
if(hashMap.get(url.toString()) == null){
hashMap.put(url.toString(), 1);
}else{
hashMap.put(url.toString(), hashMap.get(url.toString()) + 1);
}
}
Set<Entry<String, Integer>> entrySet = hashMap.entrySet();
List<PageCount> pl = new ArrayList<>();
for (Entry<String, Integer> entry : entrySet) {
pl.add(new PageCount(entry.getKey(), entry.getValue()));
}
pl.sort(null);
Configuration conf = context.getConfiguration();
int topn = conf.getInt("top.n", 3);
for(int i=0; i<pl.size() && i<topn; i++){
context.write(new Text(pl.get(i).getPage()), new IntWritable(pl.get(i).getCount()));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInt("top.n", 3);
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubmitter.class);
job.setMapperClass(PageTopnMapper.class);
job.setReducerClass(PageTopnReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\page\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\page\\output"+System.currentTimeMillis()));
job.setNumReduceTasks(1);
job.waitForCompletion(true);
}
}
求访问次数最多的N个网站--全局topn
map阶段: 读取每一行数据 输出key为域名 value=1
reduce阶段: 全局使用一个reduce,相同域名的数据会聚合在一起,统计每一个域名的访问次数放入一个全局缓存中,由于只有一个reduce因此所有数据都会按照域名技术放入到缓存中,最后利用cleanup方法来输出缓存中数据topn即可。cleanup方法会在每个reducetask执行reduce方法完毕后运行。
缓存可以使用TreeMap来缓存,并且排序。
public class JobSubmitterAllTopn {
public static class PageTopnMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(" ");
context.write(new Text(split[1].split("/")[0]), new IntWritable(1));
}
}
public static class PageTopnReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
TreeMap<PageCount, Object> treeMap = new TreeMap<>();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
treeMap.put(new PageCount(key.toString(), count), null);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
int topn = conf.getInt("top.n", 5);
Set<Entry<PageCount, Object>> entrySet = treeMap.entrySet();
int i= 0;
for (Entry<PageCount, Object> entry : entrySet) {
context.write(new Text(entry.getKey().getPage()), new IntWritable(entry.getKey().getCount()));
i++;
if(i==topn) return;
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInt("top.n", 5);
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubmitterAllTopn.class);
job.setMapperClass(PageTopnMapper.class);
job.setReducerClass(PageTopnReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\page\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\page\\output"+System.currentTimeMillis()));
job.setNumReduceTasks(1);
job.waitForCompletion(true);
}
}
主要利用cleanup方法以及TreeMap的特性,TreeMap可以用来对对象排序,对象Bean需要实现Comparable方法
* A Red-Black tree based {@link NavigableMap} implementation.
* The map is sorted according to the {@linkplain Comparable natural
* ordering} of its keys, or by a {@link Comparator} provided at map
* creation time, depending on which constructor is used.
研究一下Reducer类源码:
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
/**
* The <code>Context</code> passed on to the {@link Reducer} implementations.
*/
public abstract class Context
implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
* Called once at the start of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
reducetask启动则执行run方法:
先执行setup方法,其次对于每一个key执行一次reduce方法,最后执行cleanup方法,reducetask结束。