问题:有1/10/100 亿个 数/字符串(URL),找到其中出现次数最多的10个 数/字符串(URL)
解决思路:
1.通过hash取模的方式分成电脑可以运行的数据大小;
2.在每一块中排序,然后使用一个最小堆维护出现次数最多的几个;
3.取所有的块中出现次数最多的十个 数/字符串。
先运用hadoop中的mapreduce来解决一波--
开始生成了10亿个数的文件 (大概6G多),然后放到集群中把电脑跑爆了(垃圾电脑开了四个虚拟机)
然后就重新生成了1亿个数的数据(653M),才得以运行成功。
生成数据代码:
#include<stdio.h> #include<stdlib.h> #include<math.h> int main(){ freopen("data.txt","w",stdout); int x; for (int i=0;i<100000000;i++) { x = rand()%1000000000; printf("%d\n",x); } return 0; }
生成的数据:
扫描二维码关注公众号,回复:
1521553 查看本文章
上传数据到hdfs:
map代码:
package com.rm_int; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class myMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException { Integer x = Integer.parseInt(value.toString()); context.write(new IntWritable(x.hashCode()%20) , new IntWritable(x)); } }
reduce代码:
package com.rm_int; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class myReduce extends Reducer<IntWritable, IntWritable,IntWritable, Text>{ @Override protected void reduce(IntWritable key, Iterable<IntWritable> value, Reducer<IntWritable, IntWritable, IntWritable, Text>.Context context) throws IOException, InterruptedException { class Node{ int value;//值 int num;//个数 } List list = new ArrayList<Integer>(); for (IntWritable x:value){ list.add(x.get()); } Queue que = new PriorityQueue<Node>(10,new Comparator<Node>() { @Override public int compare(Node o1, Node o2) { return o1.num - o2.num; } }); list.sort(new Comparator<Integer>() { @Override public int compare(Integer x, Integer y) { return x-y; } }); Node node = new Node(); Node nodeTo; for (int i=0;i<list.size();i++){ if (i==0){ node.value=(int) list.get(i); node.num=1; }else { if ((int)list.get(i) == (int)list.get(i-1)){ node.num++; }else{ if (que.size()==10){ nodeTo = (Node) que.peek(); if (nodeTo.num<node.num){ que.remove(); que.add(node); } }else{ que.add(node); } node = new Node(); node.value = (int) list.get(i); node.num=1; } } if (i==list.size()-1){ if (que.size()==10){ nodeTo = (Node) que.peek(); if (nodeTo.num<node.num){ que.remove(); que.add(node); } }else{ que.add(node); } } } while (!que.isEmpty()){ node = (Node) que.poll(); context.write(key, new Text(node.value+" "+node.num)); } } }
mapreduce结果:
将结果从hdfs中下载下来:
合并取最多次数的代码:
import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; import java.util.Comparator; import java.util.PriorityQueue; import java.util.Queue; import java.util.Scanner; public class Main { public static void main(String[] args) throws IOException { class Node{ int value;//值 int num;//个数 } InputStream is = new FileInputStream("F://output.txt"); OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream("F://answer.txt")); Scanner cin = new Scanner(is); Queue que = new PriorityQueue<Node>(10,new Comparator<Node>() { @Override public int compare(Node o1, Node o2) { return o1.num - o2.num; } }); Node node; Node nodeTo; while (cin.hasNext()){ String str = cin.nextLine(); String[] x = str.split(" |\t"); node = new Node(); node.value = Integer.parseInt(x[1]); node.num = Integer.parseInt(x[2]); if (que.size()==10){ nodeTo = (Node) que.peek(); if (nodeTo.num<node.num){ que.remove(); que.add(node); } }else{ que.add(node); } } Node ans[] = new Node[10]; int index = 10; while (!que.isEmpty()){ node = (Node) que.poll(); ans[--index] = node; } for (int i=0;i<10;i++) osw.write(ans[i].value+"\t"+ans[i].num+"\r\n"); osw.close(); is.close(); } }
最后结果: