MapReduce应用编程开发

  • MapReduce的编程
  • 在集群上的运作
  • MapReduce类型与格式

MapReduce的编程

  1. 设计思路
    MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
    map: (k1; v1) → [(k2; v2)]
    输入:键值对(k1; v1)表示的数据
    处理:文档数据记录(如文本文件中的行,或数据表格中的行)将以“键值对”形式传入map函数;map函数将处理这些键值对,并以另一种键值对形式输出处理的一组键值对中间结果[(k2; v2)]
    输出:键值对[(k2; v2)]表示的一组中间数据
    reduce: (k2; [v2]) → [(k3; v3)]
    输入: 由map输出的一组键值对[(k2; v2)] 将被进行合并处理将同样主键下的不同数值合并到一个列表[v2]中,故reduce的输入为(k2; [v2])
    处理:对传入的中间结果列表数据进行某种整理或进一步的处理,并产生最终的某种形式的结果输出[(k3; v3)]
    输出:最终输出结果[(k3; v3)]
    在这里插入图片描述
    各个map函数对所划分的数据并行处理,从不同的输入数据产生不同的中间结果输出
    各个reduce也各自并行计算,各自负责处理不同的中间结果
    数据集合进行reduce处理之前,必须等到所有的map函数做完
    在进入reduce前需要有一个同步障(barrier)
    这个阶段也负责对map的中间结果数据进行收集整理(aggregation & shuffle)处理,以便reduce更有效地计算最终结果, — 最 终汇总所有reduce的输出结果即可获得最终结果
    2、编写Mapper类(以统计sogou500w的Uid为例 – 每位用户搜索的次数)
public class UidMapper extends Mapper<Object, Text, Text, IntWritable> {
	public static final IntWritable ONE = new IntWritable(1);
	private Text uidText = new Text();
	public void map(Object key, Text value, Context context)
			throws InterruptedException, IOException {
		String line = value.toString();
		String[] arr = line.split("\t");
		if (null != arr && arr.length == 6) {
			String uid = arr[1];
			if (null != uid && !"".equals(uid.trim())) {
				uidText.set(uid);
				context.write(uidText, ONE);
			}
		}
	}
}

3、编写Reducer类

public class UidReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	private IntWritable result = new IntWritable();
	public void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws InterruptedException, IOException {
		int sum = 0;
		for (IntWritable val : values) {
			sum += val.get();
		}
		result.set(sum);
		context.write(key, result);
	}
}

4、编写main类

 public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		if (null == args || args.length != 2) {
			System.err.println("<Usage>: UidCollectot <input> <output>");
			System.exit(1);
		}
		Path inputPath = new Path(args[0]);
		Path outputPath = new Path(args[1]);

		Job job = new Job(new Configuration(), "UidCollector");
		job.setJarByClass(UidCollector.class);

		job.setMapperClass(UidMapper.class);
		job.setReducerClass(UidReducer.class);
                                     job.setMapperClass(UidMapper.class);
		job.setReducerClass(UidReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

在集群上的运作

1、打包作业
单机上运行的程序不需要任何修改就可以直接在集群上运行,但是需要把程序打包为JAR文件发给集群
如果每个JAR文件都有一个作业,可以在JAR文件的manifest中指定要运行的main类。如果main类不在manifest中,则必须在命令行指定。

2、启动作业
为了启动作业,需要运行驱动程序,使用-conf选项来指定想要运行作业的集群(同样,也可以使用-fs和-jt选项)
如:hadoop jar uid.jar day0909.UidCollector /0909-sogou /out
JobClient的runjob()方法启动作业并检查进程,有任何变化,就输出一行map和reduce进度总结。输入如下:
在这里插入图片描述
通过web UI查看Job状态
Hadoop的Web界面用来浏览作业信息,对于跟踪作业运行进度、查找作业完成后的统计信息和日志非常有用。
关于集群的概要信息,包括集群的负载情况和使用情况。这表明当前正在集群上运行的map和reduce的数量,作业提交的数量,可用的tasktracker节点数和集群的负载能力,集群中可用map和reduce的任务槽数(“Map Task Capacity”和“Reduce Task CapaciLy”),每个节点平均可用的任务槽数等信息。
获取结果
一旦作业完成,有许多方法可以获取结果。每个reducer产生一个输出文件,因此,在输出的目录中会有part file,命名为part-00000。

在这里插入图片描述
查看part-r-00000文件结果如下:
在这里插入图片描述

作业调试

最经典的方法通过打印语句来调试程序,这在Hadoop中同样适用。然而,需要考虑复杂的情况:当程序运行在几十台、几百台甚至几千台节点上时,如何找到并检测调试语句分散在这些节点中的输出呢?为了处理这种情况,要查找一个特殊情况,用一个调试语句记录到一个标准错误中,它将发送一个信息来更新任务的状态信息以提示查看错误日志。Web UI简化了这个操作。

MapReduce类型与格式

1、MapReduce中的键值对
Hadoop的MapReduce中,map和reduce函数遵循如下常规格式:
map: (K1, V1) list(k2, v2)
reduce: (K2,list(v2)) list(k3, v3)
一般来说,map函数输入的键/值的类型(K1和V1)类型不同于输出类型(K2和V2)。虽然,reduce函数的输入类型必须与map函数的输出类型相同,但reduce函数的输出类型(K3和V3)可以不同于输入类型。

combiner函数,它与reduce函数的形式相同(它是Reducer的一个实现),不同之处是它的输出类型是中间的键/值对类型(K2和V2),这些中间值可以输入reduce函数:
map: (K1, V1) list(k2, v2)
combine: (K2,list(v2)) list(k2, v2)
reduce: (K2,list(v2)) list(k3, v3)
example:
map reduce
car 1 — car {1,1}
car 1 —
river 1 — river {1}
map combine(map端) reduce
car 1 ------- car {2} ------ car {2}
car 1
river 1 ------- river {1} ------ river {1}
combine与reduce函数通常是一样的,在这种情况下,K3与K2类型相同.V3与V2类型相同。
partition函数将中间的键/值对(K2和V2)进行处理,并且返回一个分区索引(partition index)。实际上,分区单独由键决定(值是被忽略的)。
panition: (K2, V2) integer

Combiner & Partitioner 编程

WordMap.java

public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	private static IntWritable one = new IntWritable(1);
	public void map(
			LongWritable key,
			Text value,
			org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws java.io.IOException, InterruptedException {
		String keyword = value.toString(); 
		String[] words = keyword.split(" ");
		for(String word : words){
			context.write(new Text(word), one);
		}
		
	}
}

WordReduce.java

public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(
			Text key,
			java.lang.Iterable<IntWritable> values,
			org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws java.io.IOException, InterruptedException {
		int sum = 0;
		for(IntWritable count : values){
			sum+=count.get();
		}
		context.write(key, new IntWritable(sum));
	}
}

WordCountPartioner.java

public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
	@Override
	public int getPartition(Text key, IntWritable value, int numPartitions) {
		// TODO Auto-generated method stub
		int a = key.hashCode()%numPartitions;
		if(a>=0)
			return a;
		else 
			return 0;
	}
}

WordCount.java

public class WordCount {
	public static void main(String[] args)throws Exception {
		input = args[0];
		output = args[1];
		Configuration conf = new Configuration();
		Job job = new Job(conf, "WordCount");
		job.setNumReduceTasks(2);
		job.setJarByClass(WordCount.class);
		job.setCombinerClass(Reducer.class);
		job.setPartitionerClass(WordCountPartitioner.class);
		job.setMapperClass(WordMapper.class);
		job.setReducerClass(WordReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(input));
		FileOutputFormat.setOutputPath(job, new Path(output));
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		System.exit(job.waitForCompletion(true)?0:1);
	}
}

输入格式
1.输入分片与记录
层次关系是:输入分片(split)与map对应,是每个map处理的唯一单位。每个分片包括多条记录,每个记录都有对应键值对。
输入切片的接口:InputSplit接口(在org.interface InputSplit extends Writable){
long getLength() throws IOException;
String[] getLocations() throws IOException;
}
一个分片并不包含数据本身,而是指向数据的引用。存储位置供MapReduce系统使用以便将map任务尽量放在分片数据附近,而长度用来排序分片,以便优化处理最大的分片,从而最小化作业运行时间。InputSplit不需要开发人员直接处理,由InputFormat创建。
A a = new A();
A b = a;
a 是对象的引用(地址)

FileInputFormat类:
FileInputFormat 是所有使用文件作为其数据源的 InputFormat 实现的基类。它 提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现。把分片分割成记录的作业由其子类来完成。
FileInputFormat.add(job, new Path(a.txt))

FilelnputFormat 类的输入路径:
作业的输入被设定为一组路径, 这对限定作业输入提供了很大的灵活性。FileInputFormat 提供四种静态方法来设定 JobConf 的输入路径:
Public static void addInputPath(JobConf, Path path)
Public static void addInputPaths(JobConf,String commaSeparatedPaths)
Public static void setInputPaths(JobConf, Path… inputPaths)
Public static void setInputPaths(JobConf, String commaSeparatedPaths)

2、文本输入
TextlnputFormat
TextInputFormat 是默认的 InputFormat。每条记录是一行输入。键是 LongWritable 类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符),它是 Text 类型的。所以,包含如下文本的文件:

On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
被切分为每个分片 4 条记录:
(0,On the top of the Crumpetty Tree)
(33,The Quangle Wangle sat,)
(57,But his face you could not see,)
(89,On account of his Beaver Hat.)
输入分片和HDFS块之间可能不能很好的匹配,出现跨块的情况
KeyValueTextlnputFormat
TextInputFormat 的键,即每一行在文件中的字节偏移量,通常并不是特别有用。通常情况下,文件中的每一行是一个键/值对,使用某个分界符进行分隔,比如制表符。例如 以下由 Hadoop 默认 OutputFormat(即 TextOutputFormat)产生的输出。如果要正确处理这类 文件,KeyValueTextInputFormat 比较合适。
可以通过 key.value.separator.in.input.line 属性来指定分隔符。它的默认值是一个制表符。
以下是一个示例,其中一表示一个(水平方向的)制表符:
linel->On the top of the Crumpetty Tree
line2->The Quangle Wangle sat,
line3->But his face you could not see.
line4->On account of his Beaver Hat.

与 TextInputFormat 类似,输入是一个包含 4 条记录的分片,不过此时的键是每行排在Tab 之前的 Text 序列:
(linel, On the top of the Crumpetty Tree)
(line2, The Quangle Wangle sat,)
(line3, But his face you could not see,)
(line4, On account of his Beaver Hat.)
NLineInputFormat:与TextInputFormat一样,键是文件中行的字节偏移量,值是行本身。主要是希望mapper收到固定行数的输入。
3、多种输入
MultipleInputs类处理多种格式的输入,允许为每个输入路径指定InputFormat和Mapper。
两个mapper的输出类型是一样的,所以reducer看到的是聚集后的map输出,并不知道输入是不同的mapper产生的。
重载版本:addInputPath(),没有mapper参数,主要支持多种输入格式只有一个mapper。
案例
CityMapJoinDemo.java

总结

1、掌握MapReduce的编程,编写Mapper类,Reducer类和main类。
2、知道作业是如何在集群上运作的,打包、启动、查看结果。
3、知道如何从web UI查看Job状态。
4、掌握MapReduce的出入输出格式。
5、理解MapReduce中键值对的概念

猜你喜欢

转载自blog.csdn.net/qq_39429714/article/details/84868814