MapReduce工作流程图
流程步骤:
- 生成驱动Jar包,上传到Yarn集群;
- hadoop jar jar文件启动客户端,Yarn集群根据切片,计算MapTask数,分配NodeManager资源;
- 通过默认TextInputFormat方式传输数据到MapTask节点,进入Map阶段;
- 经逻辑运算后,通过outputcontroller把数据写入到环形缓冲区<k,v>;
- 达到默认80%阈值后,溢写到磁盘。生成文件前需要经过内存中分区和键内排序。如果需要Combine归并,还需要进行归并操作。
- 溢写后的文件生成分区文件,然后下载到ReduceTask所有主机磁盘中。
- 把下载的Map阶段的输出文件进行合并并经Reduce归并排序,最后通过默认TextOutputFormat方式生成结果文件。
Shuffle机制
Shuffle意指Map方法之后到进入Reduce方法之前的过程。此过程经过不断的读写,分区排序,形如Shuffle(洗牌)。
流程图如下:
1,Map()方法把数据写入环形缓冲区,缓冲区默认大小100M,达到80%阈值后溢写出来。
2,溢写会经过分区,排序,再combine合并(可选操作)。然后把多次溢写的进行归并,最终写入到磁盘。
3,把多个MapTask的输出文件拷贝到Reduce机器的内存缓冲中,内存不够时,会溢写到磁盘,然后进行归并排序,按键分组,最终输出结果文件。
shuffle例子:
订单id按正序排序,价格按高到低排序,输出三个文件,每个文件只有一条数据。
1001 Tmall_01 998
1001 Tmall_06 88.8
1001 Tmall_03 522.8
1002 Tmall_03 522.8
1002 Tmall_04 132.4
1002 Tmall_05 372.4
1003 Tmall_01 998
1003 Tmall_02 8.5
1003 Tmall_04 132.4
自定义OrderBean,实现org.apache.hadoop.io.WritableComparable,重写compareTo方法:
package com.even.order;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* des: compareTo()发生在Map阶段分区后会进行键内排序,即shuffle第4步
* author: Even
* create date:2019/1/2
*/
public class OrderBean implements WritableComparable<OrderBean> {
private int order_id;
private double price;
public OrderBean() {
}
public OrderBean(int order_id, double price) {
this.order_id = order_id;
this.price = price;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
@Override
public int compareTo(OrderBean o) {
if (order_id < o.getOrder_id())
return -1;
else if (order_id > o.getOrder_id())
return 1;
else if (price > o.getPrice())
return -1;
else if (price < o.getPrice())
return 1;
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(order_id);
dataOutput.writeDouble(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
order_id = dataInput.readInt();
price = dataInput.readDouble();
}
@Override
public String toString() {
return order_id +
"\t" + price;
}
}
OrderMapper,map阶段的逻辑运算,继承org.apache.hadoop.mapreduce.Mapper,重写map():
package com.even.order;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* des: 拆分数据
* author: Even
* create date:2019/1/2
*/
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
int order_id = Integer.parseInt(fields[0]);
double price = Double.parseDouble(fields[2]);
context.write(new OrderBean(order_id, price), NullWritable.get());
}
}
OrderReducer,继承org.apache.hadoop.mapreduce.Reducer,输出最终结果文件:
package com.even.order;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* des: 输出结果文件
* author: Even
* create date:2019/1/2
*/
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
OrderPartitioner,继承org.apache.hadoop.mapreduce.Partitioner,设置分区规则:
package com.even.order;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* des: Map从环形缓冲区溢写后发生分区,shuffle第3步
* author: Even
* create date:2019/1/2
*/
public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
/*i代表job.setNumReduceTasks方法设置的个数*/
/*取3的模,会输出三个分区*/
return (orderBean.getOrder_id() & Integer.MAX_VALUE) % i;
}
}
OrderGroupingComparator,按键分组:
package com.even.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* des: 发生在Shuffle的第11步骤,按Key分组
* author: Even
* create date:2019/1/2
*/
public class OrderGroupingComparator extends WritableComparator {
public OrderGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean orderA = (OrderBean) a;
OrderBean orderB = (OrderBean) b;
/*id不同代表不是同一个对象,大的往下排,相同的不用动*/
return Integer.compare(orderA.getOrder_id(), orderB.getOrder_id());
}
}
OrderDriver,MapReducer的驱动类:
package com.even.order;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* des: 订单id按正序排序,价格按高到低排序,输出三个文件,每个文件只有一条数据。
* author: Even
* create date:2019/1/2
*/
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*1,定义Job*/
Job job = Job.getInstance(new Configuration());
/*2,设置驱动类*/
job.setJarByClass(OrderDriver.class);
/*3,设置Mapper类和Reducer类*/
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
/*4,设置Map阶段输出结果的键类型和值类型*/
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
/*5,设置Reduce阶段输出结果的键类型和值类型*/
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
/*6,设置分区规则以及ReduceTask数*/
job.setPartitionerClass(OrderPartitioner.class);
job.setNumReduceTasks(3);
/*7,分组,发生在shuffle的按键分组步骤*/
job.setGroupingComparatorClass(OrderGroupingComparator.class);
/*8,输入输出路径*/
FileInputFormat.setInputPaths(job, new Path("E:\\ideaworkspace\\bigdataByEven\\evenhadoop\\src\\main\\resources\\file\\orderin"));
FileOutputFormat.setOutputPath(job, new Path("E:\\ideaworkspace\\bigdataByEven\\evenhadoop\\src\\main\\resources\\file\\orderout"));
/*9,提交等待*/
System.out.println(job.waitForCompletion(true) ? 1 : 0);
}
}