Hadoop-MapReduce的shuffle过程及其他

Hadoop-MapReduce的shuffle过程及其他

    本篇文章将介绍shuffle的过程以及MapReduce中的其他一些组件。

一、Shuffle

    Shuffle其实是一个过程,并不是MapperReducer的一个组件,这个过程是从map输出数据,到reduce接收处理数据之前,横跨Mapper和Reducer两端的,如下图:

    shuffle分为Mapper阶段和Reducer阶段,下面就两个阶段做具体分析。

1.Mapper阶段

    每个MapperTask有一个环形内存缓冲区,用于存储map任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值默认0.8(io.sort.spill.percent),就会启动一个后台线程把环形缓冲区中的内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

    这里有一个map()方法(简称m)写入的速度和spill溢出(简称s)的速度,m的数据在内存中移动,s是数据由内存到磁盘,虽然s是磁盘的连续写,但是也比不上m的内存速度,产生的现象是s在前面跑,m在后面追,m的速度>s的速度,那么m肯定在一个时间节点上会追上s,那么当m追上s的时候,m写入环形缓冲区的线程就会被阻塞暂停,直到s将环形缓冲区中的数据全部写入到磁盘中,m的写入线程才会被启动。所以环形缓冲区大小和阀值的大小是可以根据业务进行调优的点。

    写磁盘前,要partition、sort、Combiner。如果有后续的数据,将会继续写入环形缓冲区中,最终写入下一个溢出文件中。

    等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。

    如果在最终合并时,被合并的文件大于等于3个,则合并完会再执行一次Combiner,否则不会。

    整体Mapper阶段的流程如下:

input->map->buffer->split(partition-sort-combiner)->merge(partition-sort-combiner(file>=3))->数据落地。

2.Reducer阶段

    Reducer主动找Mapper获取自己负责的分区的数据,并不需要所有的Mapper都执行完成后再获取,哪个Mapper执行完,立即就去复制。

    复制后,来自多个Mapper的数据要进行merge合并操作。合并后进行分组、排序,形成k3v3,进入reduce处理,处理后产生的结果输出到目的地。

    整体Reducer阶段流程如下:

    fetch->merge(combiner)->grouping->sort->reduce->output。

2、Mapper数量

    Mapper的数量在默认情况下不可直接控制干预,Mapper的数量由输入的大小和个数决定。在默认情况下,最终input占据了多少block,就应该启动多少个Mapper。

    此种情况下,如果有大量的小文件需要处理,则会造成Hadoop集群崩溃。大量的小文件,每个小文件都独占一个Mapper处理线程,这样启动线程和关闭线程消耗的资源会很庞大,文件数量到达一个量级会直接导致集群崩溃。

    鉴于以上情况,可以通过配置mapred.min.split.size来控制split的size的最小值。当每个split的大小达不到设置的最小值,Hadoop会将这些达不到最小值的split拼接到一起,使用一个Mapper来处理这些文件,当大小超过最小值,才启动一个新Mapper进行处理。这样就可以避免Mapper线程过多导致集群崩溃的结果。

案例

1.求最大/小值

    求一组数据的最大值或者最小值。

    数据样例:

123
235345
234
654768
234
4545
324

1>MaxMapper

public class MaxMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
	private int max = Integer.MIN_VALUE;
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, IntWritable, NullWritable>.Context context)
					throws IOException, InterruptedException {
		int num = Integer.parseInt(value.toString());
		max = max < num ? num : max;
	}
	@Override
	protected void cleanup(Mapper<LongWritable, Text, IntWritable, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(new IntWritable(max), NullWritable.get());
	}
}

2>MaxReducer

public class MaxReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {
	private int max=Integer.MIN_VALUE;
	@Override
	protected void reduce(IntWritable k3, Iterable<NullWritable> v3s,
			Reducer<IntWritable, NullWritable, IntWritable, NullWritable>.Context context)
					throws IOException, InterruptedException {
		int num=Integer.parseInt(k3.toString());
		max=max>num?max:num;
	}
	@Override
	protected void cleanup(Reducer<IntWritable, NullWritable, IntWritable, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(new IntWritable(max), NullWritable.get());
	}
}

3>MaxDriver

public class MaxDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "Max_Job");
		job.setJarByClass(cn.tedu.max.MaxDriver.class);
		job.setMapperClass(cn.tedu.max.MaxMapper.class);
		job.setReducerClass(cn.tedu.max.MaxReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(NullWritable.class);
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/maxdata"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/maxresult"));
		if (!job.waitForCompletion(true))
			return;
	}
}

2.统计考试成绩

    按月份产生文件,统计每个人的语数外及总分。

    数据样例:

math.txt

1 zhang 85
2 zhang 59
3 zhang 95
1 wang 74
2 wang 67
3 wang 96
1 li 45
2 li 76
3 li 67

chinese.txt

1 zhang 89
2 zhang 73
3 zhang 67
1 wang 49
2 wang 83
3 wang 27
1 li 77
2 li 66
3 li 89

english.txt)

1 zhang 55
2 zhang 69
3 zhang 75
1 wang 44
2 wang 64
3 wang 86
1 li 76
2 li 84
3 li 93
//以下代码涉及到的重要方法
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fn = inputSplit.getPath().getName();

1>ScoreBean

public class ScoreBean implements Writable {
	private String name;
	private String subject;
	private int month;
	private int score;
   //这里省去了以下方法,记得补上
   //……get/set……
   //……有参/无参构造……
   //……read/write……
}

2>Mapper

public class ScoreMapper extends Mapper<LongWritable, Text, Text, ScoreBean> {
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ScoreBean>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String attr[] = line.split(" ");
		String name = attr[1];
		FileSplit fs = (FileSplit) context.getInputSplit();
		String path = fs.getPath().getName();
		String subject = path.substring(0, path.lastIndexOf("."));
		ScoreBean sb = new ScoreBean(attr[1], subject, Integer.parseInt(attr[0]), Integer.parseInt(attr[2]));
		context.write(new Text(name), sb);
	}
}

3>Reducer

public class ScoreReducer extends Reducer<Text, ScoreBean, Text, NullWritable> {
	@Override
	protected void reduce(Text k3, Iterable<ScoreBean> v3s,
			Reducer<Text, ScoreBean, Text, NullWritable>.Context context) throws IOException, InterruptedException {
		String name = k3.toString();
		Map<String, Integer> map = new HashMap<String, Integer>();
		int count = 0;
		Iterator<ScoreBean> it = v3s.iterator();
		while (it.hasNext()) {
			ScoreBean sb = it.next();
			map.put(sb.getSubject(), sb.getScore());
			count += sb.getScore();
		}
		String result = name + " " + map.get("chinese") + " " + map.get("math") + " " + map.get("english") + " "
				+ count;
		context.write(new Text(result), NullWritable.get());
	}
}

4>Partitioner

public class ScoreMonthPartitioner extends Partitioner<Text, ScoreBean>{
	@Override
	public int getPartition(Text key, ScoreBean value, int numPartitions) {
		return value.getMonth()-1;
	}
}

5>Driver

public class ScoreDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "Scorec_Job");
		job.setJarByClass(cn.tedu.score.ScoreDriver.class);
		job.setMapperClass(cn.tedu.score.ScoreMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(ScoreBean.class);
		job.setReducerClass(cn.tedu.score.ScoreReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		job.setPartitionerClass(ScoreMonthPartitioner.class);
		job.setNumReduceTasks(3);
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop:9000/scoredata"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop:9000/scoresult"));
		if (!job.waitForCompletion(true))
			return;
	}
}

二、MapReduce其他功能

1、InputFormat

1.概述

    InputFormat:输入格式化器。

    MapReduce开始阶段阶段,InputFormat类用来产生InputSplit,并把基于RecordReader它切分成record,形成Mapper的输入。

    Hadoop本身提供了若干内置的InputFormat,其中如果不明确指定默认使用TextInputFormat。

2.常见的InputFormat

1>TextInputFormat

    作为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置偏移量,是LongWritable类型的,value是每一行的内容,为Text类型。

2>KeyValueTextInputFormat

    同样用于读取文件,如果行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分为value;如果没有分隔符,整行作为key,value为空。

3>SequenceFileInputFormat

    用于读取sequence file。sequence file是Hadoop用于存储数据自定义格式的binary文件。它有两个子类:SequenceFileAsBinaryInputFormat,将key和value以BytesWritable的类型读出;SequenceFileAsTextInputFormat,将key和value以Text类型读出。

4>SequenceFileInputFilter

    根据filter从sequence文件中取得部分满足条件的数据,通过setFilterClass指定Filter,内置了三种Filter,RegexFilter取key值满足指定的正则表达式的记录;PercentFilter通过指定参数f,取记录行数%f==0的记录;MD5Filter通过指定参数f,取MD5(key)%f==0的记录。

5>NLineInputFormat

    0.18.x版本新加入,可以将文件以行为单位进行split,比如文件的每一行对应一个mapper。得到的key是每一行的位置偏移量(LongWritable类型),value是每一行的内容,Text类型。适用于行少列多的文件。

6>CompositeInputFormat

    用于多个数据源的join。

    可以通过job.setInputFormatClass(XxxInputFormat.class);来设定选用哪种InputFormat。

3.自定义InputFormat

    如果以上InputFormat不够用,我们也可以自己定义InputFormat。

    所有InputFormat都要直接或间接的继承InputFormat抽象类。

1>InputFormat

    InputFormat抽象类中主要定义了如下两个方法:

getSplits(JobContext context)

    生产InputSplit集合的方法,此方法接受JobContext接受环境信息,得到要处理的文件信息后,进行逻辑切割,产生InputSplit集合返回。

List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

createRecordReader(InputSplit split,TaskAttemptContext context)

    此方法返回RecordReader对象。一个RecordReader包含方法描述如何从InputSplit切分出要送入Mapper的K1、V1对。

public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;

2>FileInputFormat

    我们可以直接继承InputFormat,但更多的时候我们会选择继承他的一个实现子类,比如:FileInputFormat。此类是所有来源为文件的InputFormat的基类,默认的TextInputFormat就继承自它。

    FileInputFormat实现了InputFormat抽象类,实现了getSplits方法,根据配置去逻辑切割文件,返回FileSplit的集合,并提供了isSplitable()方法,子类可以通过在这个方法中返回boolean类型的值表明是否要对文件进行逻辑切割,如果返回false则无论文件是否超过一个Block大小都不会进行切割,而将这个文件作为一个逻辑块返回。而对createRecordReader方法则没有提供实现,设置为了抽象方法,要求子类实现。

    如果想要更精细的改变逻辑切块规则可以覆盖getSplits方法自己编写代码实现。

    而更多的时候,我们直接使用父类中的方法而将精力放置在createRecordReader上,决定如何将InputSplit转换为一个个的Recoder。

    案例

    读取score1.txt文件,从中每4行读取成绩,其中第一行为姓名,后3行为单科成绩,计算总分,最终输出为姓名:总分格式的文件。

    文件内容样例:

张三 
语文 97
数学 77
英语 69
李四 
语文 87
数学 57
英语 63
王五 
语文 47
数学 54
英语 39

    分析:

    在此例子中,需要按照每三行一次进行读取称为一个InputSplit,经过比较可以使用NLineInputFormat,但是为了演示,使用自定义InputFormat来做这件事。

/**
 * 自定义InputFormat类,来实现每三行作为一个Recorder触发一次Mapper的效果
 */
public class MyFileInputFormat extends FileInputFormat<Text, Text> {

	/**
	 * 因为要每三行读取作为一个Recoder,所以如果切块造成块内数据行数不是3的倍数可能造成处理出问题
	 * 因此返回false,标识文件不要进行切块
	 */
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}

	/**
	 * 返回自定义的MyRecordReader
	 */
	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		return new MyRecordReader();
	}

}	
/**
 * 自定义的RecordReader,表明了如何将一个InputSplit切割出一个个的Recorder
 *
 */
public class MyRecordReader extends RecordReader<Text, Text> {
	private LineReader lineReader = null;
	private Text key = null;
	private Text value = null;
	private boolean hasMore = true;

	/**
	 * 初始化的方法,将在InputSplit被切割前调用
	 */
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		//获取文件路径
		FileSplit fs = (FileSplit) split;
		Path path = fs.getPath();
		//获取文件系统
		Configuration conf = context.getConfiguration();
		FileSystem fileSystem = path.getFileSystem(conf);
		//从文件系统中读取文件路径得到文件流
		FSDataInputStream fin = fileSystem.open(path);
		//将文件流包装为行读取器
		lineReader = new LineReader(fin);
	}

	/**
	 * 下一个keyvalue方法
	 * 返回值表当前是否读取到了新的纪录
	 */
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		key = new Text();
		value = new Text();

		Text temp = new Text();
		int count = 0;
		for (int i = 0; i < 4; i++) {
			int len = lineReader.readLine(temp);
			if (len == 0) {
				hasMore = false;
				break;
			} else {
				count++;
				value.append(temp.getBytes(), 0, temp.getLength());
				value.append("\r\n".getBytes(), 0, "\r\n".length());
				temp.clear();
			}
		}
		key.set(count+"");

		return count != 0;

	}

	/**
	 * 获取当前key方法
	 */
	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	/**
	 * 获取当前value方法
	 */
	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	/**
	 * 获取处理进度的方法,返回0.0f-1.0f之间的值表示进度
	 */
	@Override
	public float getProgress() throws IOException, InterruptedException {
		if (hasMore)
			return 0.0f;
		else
			return 1.0f;
	}

	/**
	 * 关闭资源的方法
	 * 当切割InputSplit结束会被调用用来释放资源
	 */
	@Override
	public void close() throws IOException {
		lineReader.close();
	}

}
/**
 * 计算成绩的Mapper
 *
 */
public class ScoreMapper extends Mapper<Text, Text, Text, Text> {
	
	public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
		String str = value.toString();
		String lines [] = str.split("\r\n");
		String name = lines[0];
		for(int i = 1;i<lines.length;i++){
			context.write(new Text(name), new Text(lines[i]));
		}
	}
}
/**
 * 计算成绩的Reducer
 */
public class ScoreReducer extends Reducer<Text, Text, Text, IntWritable> {

	
	public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		String name = key.toString();
		int countFen = 0;
		for(Text v : values){
			String line = v.toString();
			String subject = line.split(" ")[0];
			int fen = Integer.parseInt(line.split(" ")[1]);
			countFen += fen;
		}
		
		context.write(new Text(name), new IntWritable(countFen));
	}

}
/**
 * ScoreDriver
 */
public class ScoreDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "score_job");
		job.setJarByClass(ScoreDriver.class);
		job.setMapperClass(ScoreMapper.class);
		job.setReducerClass(ScoreReducer.class);
	
		//指定InputFormat为自定义的MyFileInputFormat
		job.setInputFormatClass(MyFileInputFormat.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/score/score1.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/score/result"));
		
		if (!job.waitForCompletion(true))
			return;
	}
}

 

4.MultipleInputs

    MultipleInputs可以将多个输入组装起来,同时为Mapper提供数据,当我们希望从多个来源读取数据时可以使用。甚至,在指定来源时可以为不同来源的数据指定不同的InputFormat和Mapper以应对不同格式的输入数据。

    此类上提供的静态方法有:

/**
 * 指定数据来源及对应的InputFormat
 */
MultipleInputs.addInputPath(job, path, inputFormatClass);

/**
 * 指定数据来源及对应的InputFormat 和 Mapper
 */
MultipleInputs.addInputPath(job, path, inputFormatClass, mapperClass);

    案例

    改造上述案例,同时从另一个文件score2.txt中读取数据统计成绩。score2.txt中的数据是一行为一个学生的成绩。

    数据样例:

赵六 56 47 69
陈七 73 84 91
刘八 45 56 66

    代码:

/**
 * 计算成绩的Mapper
 */
public class ScoreMapper2 extends Mapper<LongWritable, Text, Text, Text> {
	
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String attrs [] = line.split(" ");
		String name = attrs[0];
		for(int i = 1;i<attrs.length;i++){
			if(i == 1){
				context.write(new Text(name), new Text("语文\t"+attrs[i]));
			}else if(i == 2){
				context.write(new Text(name), new Text("数学\t"+attrs[i]));
			}else if(i == 3){
				context.write(new Text(name), new Text("英语\t"+attrs[i]));
			}
		}
	}
}
/**
 * ScoreDriver
 */
public class ScoreDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "score_job");
		job.setJarByClass(ScoreDriver.class);
		//job.setMapperClass(ScoreMapper.class);
		job.setReducerClass(ScoreReducer.class);
	
		//指定InputFormat为自定义的MyFileInputFormat
		job.setInputFormatClass(MyFileInputFormat.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		//FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/score/score1.txt"));
		MultipleInputs.addInputPath(job, new Path("hdfs://hadoop01:9000/score/score1.txt"), MyFileInputFormat.class,ScoreMapper.class);
		MultipleInputs.addInputPath(job, new Path("hdfs://hadoop01:9000/score/score2.txt"), TextInputFormat.class,ScoreMapper2.class);
		
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/score/result"));
		
		if (!job.waitForCompletion(true))
			return;
		
	}
}

2、OutputFormat

1.概述

    OutputFormat:输出格式化器。

    MapReduce结束阶段,OutputFormat类决定了Reducer如何产生输出。

    Hadoop本身提供了若干内置的OutputFormat,其中如果不明确指定默认使用TextOutputFormat。

2.常见的OutputFormat

1>FileOutputFormat

    (实现OutputFormat接口)- 所有输出到文件的OutputFormats的基类。

2>TextOutputFormat

    以行分隔、包含制表符定界的键值对的文本文件格式。

3>SequenceFileOutputFormat

    二进制键值数据的压缩格式。

4>SequenceFileAsBinaryOutputFormat

    原生二进制数据的压缩格式。

5>MapFileOutputFormat

    一种使用部分索引键的格式。

6>MultipleOutputFormat

    使用键值对参数写入文件的抽象类。

7>MultipleTextOutputFormat

    输出多个以标准行分割、制表符定界格式的文件。

8>MultipleSequenceFileOutputFormat

    输出多个压缩格式的文件。

    可以通过job.setOutputFormatClass(XxxoutputFormat.class);来设定选用哪种OutputFormat。

3.自定义OutputFormat

    如果以上OutputFormat不够用,同样也可以自己定义OutputFormat。

1>OutputFormat

    所有的OutputFormat都要直接或间接的继承OutputFormat抽象类

    OutputFormat抽象类中定义了如下的抽象方法:

getRecordWriter(TaskAttemptContext context)

public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException;
public abstract void checkOutputSpecs(JobContext context ) throws IOException,InterruptedException;
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;

2>FileOutputFormat

    我们可以直接继承OutputFormat,但更多的时候我们会选择继承他的一个实现子类,比如FileOutputFormat。- 此类是所有目的地为文件的OutputFormat的基类,例如默认的TextOutputFormat就继承自它。

    FileOutputFormat实现了OutputFormat接口,默认实现了checkOutputSpecs和getOutputCommitter方法,并将getRecordWriter()设置为抽象方法要求我们去实现。

    如果想要更精细的改变逻辑可以自己去编写getOutputCommitter和checkOutputSpecs方法。

    而更多的时候,我们直接使用父类中的方法而将精力放置在getRecordWriter上,决定如何产生输出。

案例

    编写wordcount案例,并将输出按照'#'进行分割,输出为一行。

    数据样例:

hello tom
hello joy
hello rose
hello joy
hello jerry
hello tom
hello rose
hello joy

    代码如下:

public class MyRecordWriter<K,V> extends RecordWriter<K,V> {
	protected DataOutputStream out = null;
	private final byte[] keyValueSeparator;
	//public static final String NEW_LINE = System.getProperty("line.separator");
	public static final String NEW_LINE = "#";
	
	public MyRecordWriter(DataOutputStream out,String keyValueSeparator) {
		this.out = out;
		this.keyValueSeparator = keyValueSeparator.getBytes();
	}
	
	@Override
	public void write(K key, V value) throws IOException, InterruptedException {
		 if(key!=null){  
			 out.write(key.toString().getBytes());  
			 out.write(keyValueSeparator);  
		 }  
		 out.write(value.toString().getBytes());  
		 out.write(NEW_LINE.getBytes()); 
	}

	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		out.close();
	}
}
public class MyFileOutputFormat<K,V> extends FileOutputFormat<K,V> {
	@Override
	public RecordWriter<K,V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
		Configuration conf = context.getConfiguration();  
		Path file = getDefaultWorkFile(context, "");  
		FileSystem fs = file.getFileSystem(conf);  
		FSDataOutputStream fileOut = fs.create(file, false);  
		return new MyRecordWriter<K,V>(fileOut, " ");
	}
}
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String words [] = line.split(" ");
		for(String word : words){
			context.write(new Text(word), new LongWritable(1));
		}
	}
}
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
	public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
		Iterator<LongWritable> it = values.iterator();
		long count = 0;
		while(it.hasNext()){
			long c = it.next().get();
			count += c;
		}
		context.write(key, new LongWritable(count));
	}
}
public class WCDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "WCJob");
		
		job.setJarByClass(cn.tedu.wc.WCDriver.class);
		job.setMapperClass(cn.tedu.wc.WCMapper.class);
		job.setReducerClass(cn.tedu.wc.WCReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		job.setOutputFormatClass(MyFileOutputFormat.class);
		
		FileInputFormat.setInputPaths(job, new Path("hdfs://yun01:9000/park/words.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://yun01:9000/park/result2"));

		if (!job.waitForCompletion(true))
			return;
	}

}

4.MultipleOutputs

    MultipleOutputs可以令一个Reducer产生多个输出文件。

    为特定kv打上指定标记

MultipleOutputs<Text,LongWritable> mos = new MultipleOutputs<Text,LongWritable>(context);
mos.write("flag", key, value);
/**
为指定标记内容指定输出方式
可以指定多个
*/
MultipleOutputs.addNamedOutput(job, "flag", XxxOutputFormat.class, Key.class, Value.class);

    案例

    改造上面的wordcount案例,将首字母为a-j的输出到"small"中。其他输出到"big"中。

    代码如下:

public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String words [] = line.split(" ");
		for(String word : words){
			context.write(new Text(word), new LongWritable(1));
		}
	}
}
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
	MultipleOutputs<Text,LongWritable> mos = null;
	@Override
	protected void setup(Reducer<Text, LongWritable, Text, LongWritable>.Context context)
			throws IOException, InterruptedException {
		this.mos = new MultipleOutputs<Text,LongWritable>(context);
	}
	
	public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
		Iterator<LongWritable> it = values.iterator();
		long count = 0;
		while(it.hasNext()){
			long c = it.next().get();
			count += c;
		}
		if(key.toString().matches("[a-j][a-z]*")){
			mos.write("small", key, new LongWritable(count));
		}else{
			mos.write("big", key, new LongWritable(count));
		}
	}
}
public class WCDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "WCJob");
		
		job.setJarByClass(cn.tedu.wc.WCDriver.class);
		job.setMapperClass(cn.tedu.wc.WCMapper.class);
		job.setReducerClass(cn.tedu.wc.WCReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		MultipleOutputs.addNamedOutput(job, "small", MyFileOutputFormat.class, Text.class, IntWritable.class);
		MultipleOutputs.addNamedOutput(job, "big", MyFileOutputFormat.class, Text.class, IntWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.242.101:9000/park/words.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.242.101:9000/park/result2"));

		if (!job.waitForCompletion(true))
			return;
	}
}

3、GroupingComparator

    在reduce阶段进行,对mapper发送过来的数据会进行分组的操作,这个过程称为为Grouping。默认情况下会将k2相同的内容作为一组。

    我们可以通过job.setGroupingComparatorClass(MyGroupingComparator.class);方法自己指定Grouping规则。

1.WritableComparator

    案例

    改造WordCount案例,实现统计a-h 和 i-z开头的单词数量统计。

public class WCComparator extends WritableComparator {

	@Override
	public int compare(byte[] b1, int begin1, int len1, byte[] b2, int begin2, int len2) {
		try {
			DataInput in = new DataInputStream(new ByteArrayInputStream(b1,begin1,len1));
			Text ta = new Text();
			ta.readFields(in);
			
			in  = new DataInputStream(new ByteArrayInputStream(b2,begin2,len2));
			Text tb = new Text();
			tb.readFields(in);
			
			if(ta.toString().matches("^[a-n][a-z]*$") && tb.toString().matches("^[a-n][a-z]*$")){
				return 0;
			}else if(ta.toString().matches("^[o-z][a-z]*$") && tb.toString().matches("^[o-z][a-z]*$")){
				return 0;
			}else{
				return 1;
			}
		} catch (IOException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}

}

4、二次排序

    所谓二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一个字段的排序顺序。

/**
 * 开发bean封装数据
 */
 public class NumBean implements WritableComparable<NumBean> {
	private int lnum;
	private int rnum;
	
	public NumBean() {
	}
	
	public NumBean(int lnum, int rnum) {
		this.lnum = lnum;
		this.rnum = rnum;
	}

	public int getLnum() {
		return lnum;
	}
	public void setLnum(int lnum) {
		this.lnum = lnum;
	}
	public int getRnum() {
		return rnum;
	}
	public void setRnum(int rnum) {
		this.rnum = rnum;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.lnum = in.readInt();
		this.rnum = in.readInt();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(lnum);
		out.writeInt(rnum);
	}

	@Override
	public int compareTo(NumBean o) {
		if(this.lnum != o.getLnum()){
			return this.lnum - o.getLnum();
		}else{
			return this.rnum - o.getRnum();
		}
	}
	
}
/**
 * 开发Mapper 用 NumBean作为k2 由于 NumBean覆盖了compareTo方法 可以实现自动二次排序 
 */
public class NumMapper extends Mapper<LongWritable, Text, NumBean, NullWritable> {
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NumBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		//1.读取行
		String line = value.toString();
		//2.切分出lnum 和  rnum
		String []  attrs = line.split(" ");
		int lnum = Integer.parseInt(attrs[0]);
		int rnum = Integer.parseInt(attrs[1]);
		//3.封装数据到bean
		NumBean nb = new NumBean(lnum,rnum);
		//4.发送数据
		context.write(nb, NullWritable.get());
	}
} 
/**
 * 开发Reducer输出结果 shuffle阶段已经完成了二次排序 此处直接输出即可
 */
public class NumReducer extends Reducer<NumBean, NullWritable, IntWritable, IntWritable> {
	@Override
	protected void reduce(NumBean key, Iterable<NullWritable> values,
			Reducer<NumBean, NullWritable, IntWritable, IntWritable>.Context context)
			throws IOException, InterruptedException {
		int lnum = key.getLnum();
		int rnum = key.getRnum();
		context.write(new IntWritable(lnum), new IntWritable(rnum));
	}
}
/**
 * 为了防止重复 数据被grouping成一条数据 造成结果丢失 自定义gourping过程 固定返回-1 表示无论什么情况都不合并数据
 */
public class NumWritableComparator extends WritableComparator {
	@Override
	public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
		return -1;
	}
}
/**
 * 开发Driver组装程序
	 */
public class NumDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "num_job");
		job.setJarByClass(cn.tedu.mr.grouping.num.NumDriver.class);
		job.setMapperClass(cn.tedu.mr.grouping.num.NumMapper.class);
		job.setMapOutputKeyClass(NumBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setReducerClass(cn.tedu.mr.grouping.num.NumReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		job.setGroupingComparatorClass(NumWritableComparator.class);
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/ndata"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/nresult"));
		if (!job.waitForCompletion(true))
			return;
	}
}

5、数据倾斜处理

    在开发MR程序时,可能遇到的数据分配不一致,造成程序性能下降的问题,这个问题称之为数据倾斜问题。

    解决办法:

    如果是因为shuffle分配数据不均匀造成数据倾斜,重写parition均匀分配数据即可。

    如果是数据本身带有倾斜的特点无法通过修改parition来解决倾斜问题,那么可以通过以下几个方法尝试解决:

    1.利用combiner减轻倾斜的情况。

    2.将造成倾斜的数据拿出单独处理。

    3.将一个mr拆分成多个mr降低倾斜造成的危害。

6、小文件处理

1.小文件在hadoop中会有什么问题?

    每个小文件无论多小都会对应一个block,而每一个block在NameNode中都要有元数据的记录,如果存在大量小文件,则NameNode中的大量空间都用来存放这些小文件的元数据信息,其实是相当浪费的,对于NameNode的性能有比较大的影响。

    当使用mapreduce处理大量小文件时,默认情况下mapreduce在进行切片操作时规则是和block切的规则一样,即一个block一个InputSplit,而一个InputSplit就对应一个Mapper,这样会造成开启大量的MapperTask,但是每个MapperTask处理的数据量都很有限。极端情况下开启大量Mapper耗费内存甚至可能造成程序的崩溃。

2.Hadoop Archive

    Hadoop Archive或者HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问。

    HAR是在Hadoop file system之上的一个文件系统,因此所有fs shell命令对HAR文件均可用,只不过是文件路径格式不一样,HAR的访问路径可以是以下两种格式:

    har://scheme-hostname:port/archivepath/fileinarchive

    har:///archivepath/fileinarchive(本节点)

1>使用HAR时需要两点

    第一,对小文件进行存档后,原文件并不会自动被删除,需要用户自己删除;

    第二,创建HAR文件的过程实际上是在运行一个mapreduce作业,因而需要有一个hadoop集群运行此命令。

2>HAR还有一些缺陷

    第一,一旦创建,Archives便不可改变。要增加或移除里面的文件,必须重新创建归档文件。

    第二,要归档的文件名中不能有空格,否则会抛出异常,可以将空格用其他符号替换(使用-Dhar.space.replacement.enable=true 和-Dhar.space.replacement参数)。

3>命令

hadoop archive -archiveName <NAME>.har -p <parent path>[-r <replication factor>]<src>* <dest>

    案例:将hdfs:///small中的内容压缩成small.har

    将某个文件打成har:

hadoop archive -archiveName small.har -p /small/small1.txt /small

    将多个small开头的文件打成har:

hadoop archive -archiveName small.har -p /small/small* /small

    将某个文件夹下所有文件打成har:

hadoop archive -archiveName small.har -p /small /small

    查看HAR文件存档中的文件

hadoop fs -ls har:///small/small.har

    输出har文件内容到本地系统

hadoop fs -get har:///small/small.har/smallx

3.SequenceFile

    SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。目前,也有不少人在该文件的基础之上提出了一些HDFS中小文件存储的解决方案,他们的基本思路就是将小文件进行合并成一个大文件,同时对这些小文件的位置信息构建索引。

    文件不支持复写操作,不能向已存在的SequenceFile(MapFile)追加存储记录。

    当write流不关闭的时候,没有办法构造read流。也就是在执行文件写操作的时候,该文件是不可读取的。


@Test
/**
 * SequenceFile 写操作
 */
public void SequenceWriter() throws Exception{
	final String INPUT_PATH= "hdfs://192.168.242.101:9000/big";
	final String OUTPUT_PATH= "hdfs://192.168.242.101:9000/big2";
	
	//获取文件系统
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000");
	FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
	
	//创建seq的输出流
	Text key = new Text();
	Text value = new Text();
	SequenceFile.Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path(OUTPUT_PATH), key.getClass(), value.getClass());

	//写新的数据
	System.out.println(writer.getLength());
	key.set("small4.txt".getBytes());
	value.set("ddddddd".getBytes());
	writer.append(key, value);
	
	//关闭流
	IOUtils.closeStream(writer);
}

@Test
/**
 * SequenceFile 读操作
 */
public void sequenceRead() throws Exception {
	final String INPUT_PATH= "hdfs://192.168.242.101:9000/big/big.seq";
	
	//获取文件系统
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000");
	FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
	
	//准备读取seq的流
	Path path = new Path(INPUT_PATH);
	SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path, conf);
	//通过seq流获得key和value准备承载数据
	Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
	Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
	//循环从流中读取key和value
	long position = reader.getPosition();
	while(reader.next(key, value)){
		//打印当前key value
		System.out.println(key+":"+value);
		//移动游标指向下一个key value
		position=reader.getPosition();
	}
	//关闭流
	IOUtils.closeStream(reader);
}

@Test
/**
 * 多个小文件合并成大seq文件
 * @throws Exception
 */
public void small2Big() throws Exception{
	final String INPUT_PATH= "hdfs://192.168.242.101:9000/small";
	final String OUTPUT_PATH= "hdfs://192.168.242.101:9000/big/big.seq";
	//获取文件系统
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", "hdfs://192.168.242.101:9000");
	FileSystem fs = FileSystem.get(conf);
	//通过文件系统获取所有要处理的文件
	FileStatus[] files = fs.listStatus(new Path(INPUT_PATH));
	//创建可以输出seq文件的输出流
	Text key = new Text();
	Text value = new Text();
	SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(OUTPUT_PATH), key.getClass(),value.getClass());
	//循环处理每个文件
	for (int i = 0; i < files.length; i++) {
		//key设置为文件名
		key.set(files[i].getPath().getName());
		//读取文件内容
		InputStream in = fs.open(files[i].getPath());
		byte[] buffer = new byte[(int) files[i].getLen()];
		IOUtils.readFully(in, buffer, 0, buffer.length);
		//值设置为文件内容
		value.set(buffer);
		//关闭输入流
		IOUtils.closeStream(in);
		//将key文件名value文件内容写入seq流中
		writer.append(key, value);
	}
	//关闭seq流
	IOUtils.closeStream(writer);

}

4.CompositeInputFormat

    用于多个数据源的join。

    此类可以解决多个小文件在进行mr操作时map创建过多的问题。

    此类的原理在于,它本质上市一个InputFormat,在其中的getSplits方法中,将他能读到的所有的文件生成一个InputSplit

    使用此类需要配合自定义的RecordReader,需要自己开发一个RecordReader指定如何从InputSplit中读取数据。

    也可以通过参数控制最大的InputSplit大小。

CombineTextInputFormat.setMaxInputSplitSize(job, 256*1024*1024);

上一篇:Hadoop-MapReduce基本原理及相关操作

猜你喜欢

转载自my.oschina.net/u/3754001/blog/1802979