前言
MapReduce框架让我们可以专注于算法逻辑,而不需要去关注代码实现
但如果有需求,我们几乎可以自定义MR流程中的全部组件,如下
- 大框架
Mapper
,Reducer
,Runner
,还有常用到的自定义类Bean
- 分组逻辑
GroupingComparator
,分区逻辑Partitioner
,Map端预聚合逻辑Combiner
- 输入流
InputFormat
,输出流OutputFormat
自定义这些类,可以加深对MR的理解
此篇文章主要介绍第1点中提到的组件,也是最常用到的自定义组件
第2点:https://blog.csdn.net/IAmListening/article/details/89792033
第3点:https://blog.csdn.net/IAmListening/article/details/89792588
数据以及处理思路
测试数据如下
2019-03-01,张三,语文,80
2019-03-01,张三,数学,99
2019-03-01,李四,语文,70
2019-03-01,李四,数学,60
2019-03-01,王五,语文,80
2019-03-01,王五,数学,80
2019-03-01,赵六,语文,50
2019-03-01,赵六,数学,30
2019-05-03,张三,语文,90
2019-05-03,张三,数学,76
2019-05-03,李四,语文,80
2019-05-03,李四,数学,31
2019-05-03,王五,语文,80
2019-05-03,王五,数学,87
2019-05-03,赵六,语文,20
2019-05-03,赵六,数学,89
处理结果如下
# hdfs dfs -cat /dataOut/demo1
赵六 yaya 语文均分:35, 数学均分:59
# hdfs dfs -cat /dataOut/demo2
张三 yaya 语文均分:85, 数学均分:87
李四 yaya 语文均分:75, 数学均分:45
王五 yaya 语文均分:80, 数学均分:83
流程
- 自定义输入流读取文本数据, Map端将文本处理为key-value对
- 自定义key,compareTo方法只判断名字和科目是否相同
- 在combiner预聚合,得到单一科目的总分
这里即使combiner未运行,也不会影响整体逻辑 - 按名字分组,按名字的hash值分区
- Reduce端累加各科目的总分,进而得到平均数
- 自定义输出流,在key和value间加了"yaya"
每个类的具体实现
Runner
重点归纳
- 数据进入Map端后,先进行分区;map方法运行结束之后,才开始排序
排序结束后,键值相同的元素会被聚合到一起,可用combiner对其进行处理
然后根据分组规则,在Reduce端中进一步聚合
所以自定义键的compereTo方法,其实就是默认的分组(或预分组)方法 - 因为Configuration全局一致,所以可以写入键值对,来传递信息
- 用于设置输入输出路径的两个方法(
addInputPath
和setOutputPath
),实际上就是将路径添加到了Configuration中
输入路径的key是"mapreduce.input.fileinputformat.inputdir"
输出路径的key是"mapreduce.output.fileoutputformat.outputdir"
自定义输入输出流时,可以通过这两个key来获取Path
// 可以选择继承Configured类,就不用手动实现set和get了
public class MooMoo extends Configured implements Tool {
// 主函数
public static void main(String[] args) {
Configuration configuration = new Configuration();
// 设置hdfs集群的连接方式,这里使用的是一个非高可用的集群
configuration.set("fs.defaultFS", "hdfs://cdh1:8020");
// 因为configuration全局一致,所以可以设置键值对来传递信息
configuration.set("master","carney");
try {
ToolRunner.run(configuration, new MooMoo(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration configuration = this.getConf();
Job job = Job.getInstance(configuration, "review");
/* 这段代码用于设置输入输出路径
* 用于设置路径这两个的方法,实际上就是将路径添加到了Configuration中
* 输入路径的key是"mapreduce.input.fileinputformat.inputdir"
* 输出路径的key是"mapreduce.output.fileoutputformat.outputdir"
*/
FileInputFormat.addInputPath(job, new Path("/inputdemo.txt"));
Path outPath = new Path("/dataOut");
// 以下这段代码,会检测目标文件是否存在.如果存在,就删除
// 因为提交事务之后,会尝试建立输出目录.如果目录已存在,就会抛出异常
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}
/* 调试模式中可以看到下面这行代码会发生异常(Method threw 'java.lang.IllegalStateException' exception. Cannot evaluate org.apache.hadoop.mapreduce.Job.toString())
* 这是因为job还未提交,但是该方法试图调用job.toString().
* 不过没有关系,系统会自动处理这个异常
*/
FileOutputFormat.setOutputPath(job, outPath);
// 基本设置
job.setMapperClass(MooMapper.class);
job.setReducerClass(MooReducer.class);
job.setMapOutputKeyClass(MooBean.class);
job.setMapOutputValueClass(MooBean.class);
// 设置其它自定义类
job.setNumReduceTasks(2);
job.setInputFormatClass(MooIn.class);
job.setOutputFormatClass(MooOut.class);
job.setGroupingComparatorClass(MooGrouping.class);
job.setPartitionerClass(MooPartitioner.class);
job.setCombinerClass(MooCombiner.class);
// 提交事务
boolean b = job.waitForCompletion(true);
return b ? 0 : -1;
}
}
Bean
MR流程就是在获取, 传递, 整合key-value对.但如果要传递大量信息, 仅使用已有的Text.class
,IntWritable.class
等类是不够的,于是就需要自定义类.
自定义类大多是实现Writable
接口的POJO(Plain Ordinary Java Object)类,但如果要将自定义类作为key,就需要实现WritableComparable
接口(因为key键会被自动排序)
在我们的示例中,(为了方便,)自定义类既是key,又是value
重点归纳
将自定义类作为key时,并不是每个key都必须互异
理想的key应该是分组规则或预分组规则
只有这样,GroupingComparator才能配合Combiner完成分段聚合
public class MooBean implements WritableComparable<MooBean> {
// 这里的date在当前job逻辑中是无效信息,但仍建议保留
private String date;
private String name;
private String course;
private Integer score;
public MooBean() { }
// 阿里代码手册建议分离构造函数和初始化方法
public MooBean(String info){
init(info);
}
// 初始化方法,切分字符串并分别赋值
public void init(String info){
String[] split = info.split(",");
date = split[0];
name = split[1];
course = split[2];
score = new Integer(split[3]);
}
/* 将自定义类作为key时,并不是每个key都必须互异
* 理想的key应该是分组规则或预分组规则
* 只有这样,GroupingComparator才能配合Combiner完成分段聚合
*/
@Override
public int compareTo(MooBean o) {
// 只要名字和课程相同,就视为相同的key,聚合在一起
System.out.println("开始排序");
// 先比较名字
int temp = name.compareTo(o.name);
// 再比较课程
return temp == 0 ? course.compareTo(o.course) : temp;
}
@Override
public String toString() {
// 使用StringBuilder拼接字符串
StringBuilder stringBuilder = new StringBuilder(name);
return stringBuilder.append(":{").append(course).append("-").append(score).append("}").toString();
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeUTF(course);
dataOutput.writeInt(score);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
name = dataInput.readUTF();
course = dataInput.readUTF();
score = dataInput.readInt();
}
// setter和getter放到最后,建议阅读时忽略以下代码
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCourse() {
return course;
}
public void setCourse(String course) {
this.course = course;
}
public Integer getScore() {
return score;
}
public void setScore(Integer score) {
this.score = score;
}
}
Mapper
重点归纳
- 测试发现,Map端写入Configuration的信息,在Reduce端无法读取
- new对象是一个很耗资源的过程
MR源代码有一个很常见的写法:通过对同一对象的重新赋值来近似实现多对象
其实很容易被它这种写法坑到,但是这种处理方式确实可以显著的节省资源 - 只有相同的键,才可能在combiner聚合.因为分组逻辑在combiner之后
public class MooMapper extends Mapper<LongWritable, Text, MooBean, MooBean> {
// 用于测试在Map端和Reduce端传递属性
String masterName;
/* new对象是一个很耗资源的过程
* MR源代码有一个很常见的写法:通过对同一对象的重新赋值来近似实现多对象
* 其实很容易被坑它这种写法坑到,但是这种处理方式可以显著的节省资源
*/
// 定义一个全局变量,不断对它重新赋值,而不是重新new
MooBean mooBean;
/* setup方法只会在Mapper对象创建的时候运行一次,常用于初始化一些资源
* 我们可以在此方法中连接jdbc,或者建立输入流来获取额外的信息.
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
mooBean = new MooBean();
// 以下代码用于测试:这里在尝试借助configuration,从Map端传递信息给Reduce端
Configuration configuration = context.getConfiguration();
masterName = configuration.get("master");
// 此处输出的masterName是在Runner方法中设定的"carney"
System.out.println("Mapper初始化.master=" + masterName);
configuration.set("master", "willson");
}
// 只会在结束的时候运行一次,常用于释放一些资源(比如关闭流)
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 用于测试,此处输出的masterName为"willson"
masterName = context.getConfiguration().get("master");
System.out.println("Mapper运行结束.master="+masterName);
}
// 核心方法,输入流每读取一行,就调用一次map方法,用来将该行处理为key-value对
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String string = value.toString();
// 给对象重新赋值
mooBean.init(string);
// 被判定为相同的键,会在combiner直接聚合
context.write(mooBean, mooBean);
}
}
Reducer
重点归纳
Reducer的逻辑不能依赖于Combiner
Combiner是调优中相对复杂的一个点,因为combiner不一定会发生,不一定在map的哪个阶段发生.
这样的不确定性会显著增加设计Combiner的难度
public class MooReducer extends Reducer<MooBean, MooBean, Text, Text> {
// 这三个全局变量都是为了避免频繁的new对象
private StringBuilder stringBuilder;
private Text outKey;
private Text outValue;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// setup方法用于初始化.
stringBuilder = new StringBuilder();
outKey = new Text();
outValue = new Text();
// 测试代码.此处输出的master是"carney",也就是说,Map端的更改无效
String masetr = context.getConfiguration().get("master");
System.out.println("Reducer开始初始化...master=" + masetr);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
System.out.println("Reducer运行结束...");
}
// 核心方法
@Override
protected void reduce(MooBean key, Iterable<MooBean> values, Context context) throws IOException, InterruptedException {
// 可以将这两个变量也做成全局变量
int chineseScore = 0;
int mathScore = 0;
for (MooBean tempBean : values) {
// 分别统计语文和数学的分数,做一个加和
if (tempBean.getCourse().equals("语文")){
chineseScore = chineseScore + tempBean.getScore();
} else if (tempBean.getCourse().equals("数学")){
mathScore = mathScore + tempBean.getScore();
}
}
// 拼接字符串
stringBuilder.append("语文均分:").append(chineseScore / 2).append(", 数学均分:").append(mathScore / 2);
// 不new对象,而是重新赋值
outKey.set(key.getName());
outValue.set(stringBuilder.toString());
context.write(outKey, outValue);
// 清空stringBuilder
stringBuilder.delete(0, stringBuilder.length());
}
}