1、什么是 MapReduce
首先让我们来重温一下 hadoop 的四大组件:
HDFS:分布式存储系统
MapReduce:分布式计算系统
YARN:hadoop 的资源调度系统
Common:以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等。
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
Hadoop 当中的 MapReduce 就是这样的一个分布式程序运算框架,它把大量分布式程序都会 涉及的到的内容都封装进了,让用户只用专注自己的业务逻辑代码的开发。它对应以上问题 的整体结构如下:
MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行 。
MapTask:阶段并发任,负责mapper 阶段的任务处理 YARNChild 。ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理YARNChild。
2、MapReduce 示例程序编写及编码规范
1、 该程序有一个 main 方法,来启动任务的运行,其中 job 对象就存储了该程序运行的必要信息,比如指定 Mapper 类和 Reducer 类
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
2、 该程序中的 TokenizerMapper 类继承了 Mapper 类
3、 该程序中的 IntSumReducer 类继承了 Reducer 类
总结:MapReduce 程序的业务编码分为两个大部分,一部分配置程序的运行信息,一部分编写该 MapReduce 程序的业务逻辑,并且业务逻辑的 map 阶段和 reduce 阶段的代码分别继承 Mapper 类和 Reducer 类。
MapReduce 程序编写规范:
1、用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)
2、Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
3、Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
4、Mapper 中的业务逻辑写在 map()方法中
5、map()方法(maptask 进程)对每一个<K,V>调用一次
6、Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式
7、Reducer 的业务逻辑写在 reduce()方法中
8、Reducetask 进程对每一组相同 k 的<K,V>组调用一次 reduce()方法 >9、用户自定义的Mapper 和 Reducer 都要继承各自的父类
10、整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的job 对象
WordCount 的业务逻辑:
1、 maptask 阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成 一个 key-value 对,比如单词 hello,就转换成<’hello’,1>发送给 reducetask 去汇总 。
2、 reducetask 阶段将接受 maptask 的结果,来做汇总计数 。下面是具体实现,首先看 Map:
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* 统计每个小文件
* 1、继承Mapper类
* 2、重写map方法
* Mapper类中4个泛型
* KEYIN, 输入的键的类型,这里指的是每一行 的起始字节偏移量 (long)
* VALUEIN,输入的值得类型,这里指的是每一行的内容(String)
* KEYOUT, 输出的键的类型,这里指的是每一个单词(String)
* VALUEOUT,输出的值的类型,这里指的是单词的次数(int)
* map的数据需要经过网络传输到reduce端,所有的数据类型必须具备序列化和反序列化的能力
* 序列化:字符串--->二进制
* 反序列化:二进制--->字符串
* java中的的序列化和反序列化接口: Serializable
* hadoop弃用了java中的,java中的序列化和反序列化太重,过于繁杂
* Long a=5
* Long java中的序列化和反序列会将类结构一并序列化和反序列化
* hadoop中重新定义了 一套序列化和反序列化的接口:Writable 轻量级
* 常用的一些类型实现对应的类型
* long --->LongWritable
* int--->IntWritable
* Null--->NullWritable
* String--->Text
* */
/*
* 获取每一行的单词内容并切分发送
* */
public class myMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/*
* LongWritable key,输入的每一行偏移量,框架读取的
* Text value,输入每一行的内容
* Context context,上下文对象,用于向reduce发送数据,同时用于读取框架读取的东西
* 调用频率:一行调用一次
* */
@Override
protected void map(LongWritable key,
Text value,
Context context)
throws IOException, InterruptedException {
//获取每一行内容,转化为String并切分
String[] datas = value.toString().split("\t");
/*如果要统计,只统计一行的,并不能统计一个文件的
* 全部发送到reduce端进行统计
* key: 就是单词
* value: 1
*/
//循环遍历每一个单词发送给reduce
for (String data : datas) {
Text k = new Text(data);
IntWritable v = new IntWritable(1);
//把单词1发送给reduce
context.write(k, v);
}
}
}
其次看 Reduce:
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* 统计分析
* 继承reducer类
* 重写reducer方法
* 输入:从map过来的,对应map的输出类型
* KEYIN,reduce输入的key类型 是map输出的key的类型
* VALUEIN, reduce输入的value类型 是map输出的value的类型
* 输出:最终的输出结果
* KEYOUT,输出的key的类型 这里指的是最终的单词 ,Text
* VALUEOUT,输出的value的类型,这里指的每一个单词的最终的词频,IntWritable
* */
public class myReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
/*
* map到reduce之间会有一个shuffle(洗牌的过程,将map输出的数据全部打乱,
* 重洗(有一步叫分组,默认的分组是按照map端输出的key分组))的过程,目前使用默认的
* 这里map输出的key是单词,就是按照单词分组
* 会把单词相同的所有数据分为一组
* Text key, 一组中的任意一个key
* Iterable<IntWritable> values, 一组中相同的key对应的所有的value
* Context context,上下文对象,向上承接map,向下输出结果 hdfs/本地
*
* 这个函数调用频率:一组调用一次,有多少组调用几次
* */
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//循环遍历values求和即可
int sum=0;
for (IntWritable value : values) {
//将value转换为int类型 :使用get方法 hadoop--->java
sum+=value.get();
}
//写出
IntWritable rv= new IntWritable(sum);
context.write(key, rv);
}
}
在看 Job :
package mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
* 提交运行MapReduce程序
*
* */
public class myDriver {
public static void main(String[] args) throws IOException {
/*
* 加载配置文件
1、优先加载jar包中的 hdfs-defalut.xml
*2、加载src下的 hdfs-site.xml
*3、加载代码中的
*最终加载的最终生效
* */
//1、加载配置文件
Configuration conf = new Configuration();
//启动一个job:指的是一个mapreduce程序,封装的是map或reduce的相关配置项
//2、创建Job
Job job=Job.getInstance(conf);
//封装上面的job,将自定义的mapper,reducer类封装到job上
//指定当前job的入口:main函数的入口
//参数class类型,1)类名.class,2)对象.getClass,3)class.forname
job.setJarByClass(myDriver.class);
//指定job对应的mapper类
job.setMapperClass(myMapper.class);
//指定job对应的reduce类
job.setReducerClass(myReduce.class);
//指定mapper输出的key和value类型
/*
* 泛型的作用范围:在编译时起作用,检查数据类型,运行时泛型自动擦除
*
* */
/*
* job.setMapOutputKeyClass(Text.class);
* job.setMapOutputValueClass(IntWritable.class);
*/
//指定reduce输出的key和value类型
/*这个方法设定的输出的key和value类型
* 如果map指定了,这个指定的就是reduce
* 如果map没有指定,这个就是map和reduce的
* 当map和reduce的输出key、value类型一致的时候,只需要指定下面的就可以
* */
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
/*
*指定输入,需要处理的文件,添加输入路径
* 参数1:job 参数2:输入路径
*/
FileInputFormat.addInputPath(job, new Path(args[0]));
//指定输出路径,输出路径不能存在,因为是覆盖式的写
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
//这个方法基本不用,不打印日志
//job.submit();
//可以打印日志,参数代表是否打印运行的进程
try {
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion?0:1);
} catch (ClassNotFoundException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
实现完成 运行
mapreduce程序运行的3种方式:
1)将程序打jar提交到集群运行 生产
2)本地运行 windows下运行 本地调试
3)本地运行 提交到集群上
1)打jar包
2)将jar包上传到一台服务器
3)运行
jar包传到哪一个节点 在哪一个节点运行
hadoop jar jar包路径 主类的全限定名 程序运行的参数(按照参数的顺序放在args中)
hadoop jar /home/hadoop/jars/wc.jar com.lee.wordcount.myDriver
4)输出结果:
_SUCCESS 成功标志文件 大小为0
part-r-00000 结果文件
-r 代表的是结果是reduce输出的
part-m-00000
-m 结果是map输出的
打包的时候需要传参 输入 输出
hadoop jar /home/hadoop/jars/wc.jar com.lee.wordcount.myDriver /in /out
报错:
Caused by: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
导包错误 Text **Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop112:9000/output already exists 输出路径已存在**
mapreduce运行过程:
shuffle中排序 分组 根据输出的key进行的
mapreduce的编程中map输出的key的设计很重要
在没有排序的要求的时候
map的key的设计的依据是分组 按照哪一个字段进行分组 将这个字段设置为mapkey
分组:标志词语 每一 、每、相同