MapReduce编程小案例.11th—数据倾斜场景
数据:
a a a a a a b b b a a a a a a a c c b c a a a c a b b c a a d d e e f f f g a a a b a b h h g j |
需求:
需要做wordcount
但是,会有一个问题存在:
a特别多,
负责处理a这个单词数据的reduce worker就会很累(负载不均衡,过大)
思考:如何处理?会让整个数据处理过程中,数据倾斜的状况得到缓解。
思路:
将key进行打散(具体方案:给key拼接一个随机字符串),以均衡reduce端worker的负载;
数据倾斜场景part2-解决代码方法:
package cn.edu360.mr.wc.skew; import java.io.IOException; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SkewWordcount { public static class SkewWordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Random random = new Random(); Text k = new Text(); IntWritable v = new IntWritable(1); int numReduceTasks = 0; @Override protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { numReduceTasks = context.getNumReduceTasks(); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for (String w : words) { k.set(w + "\001" + random.nextInt(numReduceTasks)); context.write(k, v); } } } public static class SkewWordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count +=value.get(); } v.set(count); context.write(key, v); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SkewWordcount.class); job.setMapperClass(SkewWordcountMapper.class); job.setReducerClass(SkewWordcountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("f://mrdata/wordcount/input")); FileOutputFormat.setOutputPath(job, new Path("f://mrdata/wordcount/skew-out")); job.setNumReduceTasks(3); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package cn.edu360.mr.wc.skew; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SkewWordcount2 { public static class SkewWordcount2Mapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] wordAndCount = value.toString().split("\t"); v.set(Integer.parseInt(wordAndCount[1])); k.set(wordAndCount[0].split("\001")[0]); context.write(k, v); } } public static class SkewWordcount2Reducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } v.set(count); context.write(key, v); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SkewWordcount2.class); job.setMapperClass(SkewWordcount2Mapper.class); job.setReducerClass(SkewWordcount2Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("f:/mrdata/wordcount/skew-out")); FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/wordcount/skew-out2")); job.setNumReduceTasks(3); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }