版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xianchanggui8674/article/details/78652865
在学习伪分布式hadoop集群的时候,使用了官方的wordcount程序,当时也只是直接执行官方jar包,不知道其工作原理,用过自己写一遍这个程序,加深对mapreduce工作原理的理解。
MapReduce工作原理
input -> map -> shuffle -> reduce -> output
基于MapReduce计算模型编写分布式并行程序非常简单,主要编码工作就是实现Map函数和Reduce函数
其他的并行编程中的种种复杂问题,如分布式存储,工作调度,负载平衡,容错处理,网络通讯等,均由YARN框架负责处理。
MapReduce的一般固定模式
CLASS MR{
static public Class MyMapper…{
//map代码块
}
static public Class MyReducer…{
//reduce代码块
}
main(){
//Driver区代码
}
}
自己编写的MapReduce程序代码
使用mvaen管理jar包,pom.xml如下
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.0</version>
<scope>test</scope>
</dependency>
源码如下,注释写的比较详细了:
package com.FileHadoop.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 统计单词出现个数的mapreduce
* @author xiaopang
*/
public class MyWordCount {
//自定义map处理类
public static class MyMap extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text outKey = new Text();
private static final IntWritable outValue = new IntWritable(1);
//重写父类的map方法吗,自定义处理
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//获取到每一行的值,以制表符或空格隔开
String inValue = value.toString();
//将每一个单词取出,获得是是个数组
StringTokenizer lineValue = new StringTokenizer(inValue);
//遍历这个数组,得到每一个值
while (lineValue.hasMoreTokens()) {
//设置传递给reduce的值
outKey.set(lineValue.nextToken());
//使用上下文context传递参数
context.write(outKey, outValue);
}
}
}
//自定义reduce合并类
public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable outValue = new IntWritable();
//重写父类reduce方法,自动以处理
@Override
public void reduce(Text key, Iterable<IntWritable> value,
Context context) throws IOException, InterruptedException {
//map传递的值经过shuffle排序和重组,将相同的键合并,值为一个数组即:
// <hadoop,1>
// -> <hadoop,{1,1}>,所以reduce方法的第二个值为一个迭代器
// <hadoop,1>
int sum = 0;
//遍历迭代器,取数值的和
for(IntWritable inValue : value){
sum += inValue.get();
}
//设置输入和输出
outValue.set(sum);
context.write(key, outValue);
}
}
//Driver
public int run(String[] args) throws Exception {
//加载hdfs的配置文件
Configuration config = new Configuration();
//获取到job的对象
Job job = Job.getInstance(config,MyWordCount.class.getSimpleName());
//运行的类
job.setJarByClass(MyWordCount.class);
//处理流程,首先是Input
//输入路径接收第一个参数
Path inPath = new Path(args[0]);
//添加输出路径
FileInputFormat.addInputPath(job, inPath);
//设置map
job.setMapperClass(MyMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输出路径,接收输入的第二个参数
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
//提交
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int state = new MyWordCount().run(args);
//执行成功就退出程序
System.exit(state);
}
}
使用自己编写的jar包测试
使用eclipse将程序打包成jar包,上传到linux系统上,命令和原来类似:
bin/yarn jar my-wordcount.jar /user/xiaoshi/wcinput /user/xiaoshi/myoutput
测试成功,查看一下自己编写的程序和自带的示例执行之后结果是一模一样的。