统计下面文本的wordcount
a a a a a a b b b a a a
a a a a c c b c a a a c
a b b c a a d d e e f f
f g a a a b a b h h g j
a a a a a a b b b a a a
a a a a a a b b b a a a
a a a a a a b b b a a a
a a a a a a b b b a a a
a a a a a a b b b a a a
由于文本中单词数量比重不一致比如“a”数量特别多,在传统map端读取每一行,分割单词key=单词 value=1,输出到reduce端,接收单词“a”的reducetask会任务特别重,而其余reducetask相对于轻松许多,这种情况下由于key值分布不均衡导致不同reducetask任务不同从而出现数据倾斜。
解决数据倾斜的方法:
1.在map端聚合一次,避免大量重复数据输出到reduce端。
2.map端将key值利用某种规则打散,使其分散到不同reduce中,生成中间数据;再次读取该中间结果处理,得到最终结果。
1.map端局部聚合Combiner
通过job.setCombinerClass()方法来设置map端局部聚合类,其本身就是一个Reducer接口实现类,在map端输出最终结果时,会先调用该方法实现局部聚合,最后将结果输出到reduce端。
public static class SkewWordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(value.toString());
String[] wordAndCount = value.toString().split(" ");
for(String w : wordAndCount){
k.set(w);
context.write(k, v);
}
}
}
public static class SkewWordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
System.out.println("key = " + key + " value = " + value);
count += value.get();
}
v.set(count);
context.write(key, v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SkewWordcount2.class);
job.setMapperClass(SkewWordcountMapper.class);
job.setReducerClass(SkewWordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置maptask端的局部聚合逻辑类
job.setCombinerClass(SkewWordcountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\skew-out2" + System.currentTimeMillis()));
job.setNumReduceTasks(1);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
2.将key值打散,将map端输出的每一个key-value均匀分散
第一步:
map阶段读取每一个单词,使用单词-随机数作为key,value=1,输出。
redcue阶段,按照key来统计每个key出现的次数,输出。
第二步:
map阶段读取第一步reduce输出的文件,其形式为word-num 次数,将每一行分隔取出word作为key,次数作为value,输出。
reduce阶段按照word key聚合,统计每一个key出现的总词数,输出即可。
第一步:
public static class SkewWordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Random random = new Random();
Text k = new Text();
IntWritable v = new IntWritable(1);
int numReduceTasks = 0;
@Override
protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
numReduceTasks = context.getNumReduceTasks();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String w : words) {
k.set(w + "-" + random.nextInt(numReduceTasks));
context.write(k, v);
}
}
}
public static class SkewWordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
System.out.println("key = " + key + " value = " + value);
count += value.get();
}
v.set(count);
context.write(key, v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SkewWordcount.class);
job.setMapperClass(SkewWordcountMapper.class);
job.setReducerClass(SkewWordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置maptask端的局部聚合逻辑类
//job.setCombinerClass(SkewWordcountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\skew-out1"));
job.setNumReduceTasks(3);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
其输出结果为:
a-1 20
b-0 12
c-2 4
d-1 2
e-0 1
f-2 1
g-1 1
h-0 1
第二步:
public static class SkewWordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(value.toString());
String[] wordAndCount = value.toString().split("\t");
k.set(wordAndCount[0].split("-")[0]);
v.set(Integer.parseInt(wordAndCount[1]));
context.write(k, v);
}
}
public static class SkewWordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
System.out.println("key = " + key + " value = " + value);
count += value.get();
}
v.set(count);
context.write(key, v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SkewWordcount2.class);
job.setMapperClass(SkewWordcountMapper.class);
job.setReducerClass(SkewWordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置maptask端的局部聚合逻辑类
//job.setCombinerClass(SkewWordcountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\skew-out1"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\skew-out2" + System.currentTimeMillis()));
job.setNumReduceTasks(1);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}