Hadoop--MapReduce9--数据倾斜

统计下面文本的wordcount

a a a a a a b b b a a a 
a a a a c c b c a a a c
a b b c a a d d e e f f
f g a a a b a b h h g j
a a a a a a b b b a a a 
a a a a a a b b b a a a 
a a a a a a b b b a a a 
a a a a a a b b b a a a 
a a a a a a b b b a a a 

       由于文本中单词数量比重不一致比如“a”数量特别多,在传统map端读取每一行,分割单词key=单词  value=1,输出到reduce端,接收单词“a”的reducetask会任务特别重,而其余reducetask相对于轻松许多,这种情况下由于key值分布不均衡导致不同reducetask任务不同从而出现数据倾斜

解决数据倾斜的方法:

1.在map端聚合一次,避免大量重复数据输出到reduce端。

2.map端将key值利用某种规则打散,使其分散到不同reduce中,生成中间数据;再次读取该中间结果处理,得到最终结果。

1.map端局部聚合Combiner

        通过job.setCombinerClass()方法来设置map端局部聚合类,其本身就是一个Reducer接口实现类,在map端输出最终结果时,会先调用该方法实现局部聚合,最后将结果输出到reduce端。

public static class SkewWordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		Text k = new Text();
		IntWritable v = new IntWritable(1);

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			System.out.println(value.toString());
			String[] wordAndCount = value.toString().split(" ");
			for(String w : wordAndCount){
				k.set(w);
				context.write(k, v);
			}			
		}
	}

	public static class SkewWordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		IntWritable v = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable value : values) {
				System.out.println("key = " + key + " value = " + value);
				count += value.get();
			}
			v.set(count);
			context.write(key, v);
		}

	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJarByClass(SkewWordcount2.class);		
		job.setMapperClass(SkewWordcountMapper.class);
		job.setReducerClass(SkewWordcountReducer.class);	
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 设置maptask端的局部聚合逻辑类
		job.setCombinerClass(SkewWordcountReducer.class);	
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);	
		FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\input"));
		FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\skew-out2" + System.currentTimeMillis()));	
		job.setNumReduceTasks(1);
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
	
	}

2.将key值打散,将map端输出的每一个key-value均匀分散

第一步:

map阶段读取每一个单词,使用单词-随机数作为key,value=1,输出。

redcue阶段,按照key来统计每个key出现的次数,输出。

第二步:

map阶段读取第一步reduce输出的文件,其形式为word-num   次数,将每一行分隔取出word作为key,次数作为value,输出。

reduce阶段按照word key聚合,统计每一个key出现的总词数,输出即可。

第一步:

public static class SkewWordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		Random random = new Random();
		Text k = new Text();
		IntWritable v = new IntWritable(1);
		int numReduceTasks = 0;

		@Override
		protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			numReduceTasks = context.getNumReduceTasks();
		}

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String[] words = value.toString().split(" ");
			for (String w : words) {
				k.set(w + "-" + random.nextInt(numReduceTasks));
				context.write(k, v);
			}
		}
	}

	public static class SkewWordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		IntWritable v = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable value : values) {
				System.out.println("key = " + key + " value = " + value);
				count += value.get();
			}
			v.set(count);
			context.write(key, v);
		}

	}

	public static void main(String[] args) throws Exception {

		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);	
		job.setJarByClass(SkewWordcount.class);
		job.setMapperClass(SkewWordcountMapper.class);
		job.setReducerClass(SkewWordcountReducer.class);	
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 设置maptask端的局部聚合逻辑类
		//job.setCombinerClass(SkewWordcountReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);	
		FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\input"));
		FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\skew-out1"));
		job.setNumReduceTasks(3);

		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
	
	}

其输出结果为:

a-1	20
b-0	12
c-2	4
d-1	2
e-0	1
f-2	1
g-1	1
h-0	1

第二步:

public static class SkewWordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		Text k = new Text();
		IntWritable v = new IntWritable(1);

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			System.out.println(value.toString());
			String[] wordAndCount = value.toString().split("\t");
			k.set(wordAndCount[0].split("-")[0]);
			v.set(Integer.parseInt(wordAndCount[1]));
			context.write(k, v);				
		}
	}

	public static class SkewWordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		IntWritable v = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable value : values) {
				System.out.println("key = " + key + " value = " + value);
				count += value.get();
			}
			v.set(count);
			context.write(key, v);
		}

	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJarByClass(SkewWordcount2.class);		
		job.setMapperClass(SkewWordcountMapper.class);
		job.setReducerClass(SkewWordcountReducer.class);	
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 设置maptask端的局部聚合逻辑类
		//job.setCombinerClass(SkewWordcountReducer.class);	
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);	
		FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\skew-out1"));
		FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\wordcount\\skew-out2" + System.currentTimeMillis()));	
		job.setNumReduceTasks(1);
		
		boolean res = job.waitForCompletion(true);
		System.exit(res?0:1);
	
	}

猜你喜欢

转载自blog.csdn.net/u014106644/article/details/88646905