文章目录
三-下-0, MapReduce Join 浅析及案例实操
- MapReduce能执行大型数据集间的"连接"(join)操作.
- 连接操作的具体实现技术取决于
数据集的规模及区分方式.
如果一个数据集很大(例如天气记录), 而另外一个集合很小(例如气象站元数据), 小到以至于可以分发到集群中的每一个节点之中.则可以执行一个MapReduce作业,将各个气象站的天气记录放到一块(例如, 根据气象站ID执行部分排序), 从而实现连接. Mapper或Reducer根据各气象站ID从较小的数据集合中找到气象站元数据,使元数据能够被写到各条记录之中. - 连接操作如果由Mapper执行, 则称为"Map Join"; 如果由Reduce执行, 则称为"Reduce Join"; 如果两个数据集的规模均很大, 以至于没有哪个数据及可以被完全复制到集群的每个节点, 我们仍然可以使用MapReduce来进行连接, 至于到底采用Map Join 还是Reduce Join ze取决于数据的组织方式.
1. Reduce Join
[原理说明]
- Map端的主要工作: 为来自不同表或文件的k-v对, 打标签以区别不同来源的记录, 然后
用连接字段作为key
,其余部分和新加的标志作为value
, 最后进行输出. - Reduce端的主要工作: 在Reduce端以连接字段作为key的分组已经完成, 我们只需要
在每一个分组中将那些来源于不同文件的记录(在Map阶段已经打标志)分开
, 最后进行合并就ok了.
1.1 Reduce Join 案例实操
Ctrl + alt + F 把当前变量–>全局变量
[需求分析]
主要实现思路:
- 我们有两张表, order表(id, pid, amount) 和 pd表(pid, pname), 这两张表连接后的表(id, pid, amount, pname, flag), 其中flag是用来标记此行有效数据属于哪一张表, 如下图所示
- 用bean类封装两张表的所有属性(去重), 这个类的toString()方法定义了最终表连接后的输出(id, panme, amount)
- 分析两张表的结构, 我们可以用属于pd表的pname, 去填充属于order表的pname, 然后输出order表中的字段即可完成Join.
[实现步骤和具体代码]
- 先写实体类Bean, 新建TableBean.java, 要注意这个Bean类是对所有表字段的汇总, 除了get,set方法外, 还要有定义最终输出格式的toString, 当然了还要进行序列化操作(必要的无参构造, 继承Writable类, 重写write和read方法).
public class TableBean implements Writable {
//所有连接表的属性
private int id;
private int pid;
private int amount;
private String pname;
private String flag; //用来标记数据属于哪个表
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getPid() {
return pid;
}
public void setPid(int pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
public TableBean() {
}
@Override
public int compareTo(TableBean o) {
if(this.id > o.id){
return 1;
}else{
return -1;
}
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeInt(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.pid = in.readInt();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}
}
- 再写Mapper类, 新建TableMapper.java, 这个类完成的任务主要是:
- 根据不同文件去处理每个切片中的每一行数据
- 为什么要根据不同的文件分开处理, 因为各个文件中的表字段啊, 列数啊都不相同的, 当然要分开处理了啊
- 如何处理? 切割每行数据, 把每一条属性都封装到Bean类中
- 此图是对下面setUp()中的说明;
/**
* KEYIN-->LongWritable, 偏移量
* VALUEIN--> Text, 每个表中的一行数据
* KEYOUT-->Text, pid
* VALUEOUT--> TableBean, 封装所有表属性的bean类
*/
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
private String fileName;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//读取文件名
//1. 为什么用setUP, 因为一个文件中的一个切片对应于一个mapTask, setup方法每次在maptask刚启动时进行加载并且仅加载一次
//2. 为什么要读取文件名? 因为多个文件中的多个表进行合并, 不同表字段数目, 种类不一样, 必须根据俄不同的文件进行不同的map操作
//3. 如何读取文件名, 先读取切片对象, 利用切片的getPath().getName()即可获取到切片所属于的文件名
FileSplit split = (FileSplit) context.getInputSplit();
fileName = split.getPath().getName();
}
//全局变量
Text outK = new Text();
TableBean outV = new TableBean();
//对于每个文件, 每一行数据就要调用一次map()进行处理
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//map方法的任务:
//1. 切割每一行, 分出表中各个列的数据
//2. 把分出的数据放到相应的Bean对象中写出去
//3. 要注意, 不同文件要分开进行上述的处理噢
//1️⃣ 获取一行的数据
String line = value.toString();
//分文件(表)进行处理
if(fileName.contains("order")){
//对 order.text中字段的处理
//2️⃣. 切割放入数组(如果每个文件分隔符不一样, 就可以像这样把下面这行切割代码写入到每个判断分支中)
String[] words1 = line.split("\t");
//3️⃣. 数据处理(把各属性的值放入到Bean中)
outK.set(words1[1]);
outV.setId(Integer.parseInt(words1[0]));
outV.setPid(Integer.parseInt(words1[1]));
outV.setAmount(Integer.parseInt(words1[2]));
outV.setPname("");
outV.setFlag("order");
context.write(outK, outV);
}else{
//对 pd.txt中字段的处理
//2️⃣. 切割放入数组
String[] words2 = line.split("\t");
//3️⃣. 数据处理(把各属性的值放入到Bean中)
outK.set(words2[0]);
outV.setId(0); //不能为null, 否则序列化时候会报空指针异常
outV.setPid(0);
outV.setAmount(0);
outV.setPname(words2[1]);
outV.setFlag("pd");
context.write(outK, outV);
}
}
}
- 再写Reducer类, 新建TableReducer.java, 在Reducer类中我们要完成以下任务:
-
每个ReduceTask处理的是每一个key(pid)对应的多个数据, 如下图, 我们可以把这张表成为合并后的主表
-
通过分析上图, 我们把这张表中每个key下的属与order表(大表)的内容放入到集合orderBeans中, 把属于pd表(小表)的复制到一个临时对象pdBeans中;
-
拿小表的属性去补全大表,再写出大表, reduce的工作就完成啦.
-
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
//每一个ReduceTask都是处理一个key(这里是Pid)下的所有数据
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
//集合存放大表的数据, pdBean对象存储小表的数据
ArrayList<TableBean> orderBeans = new ArrayList<>();
TableBean pdBean = new TableBean();
//values是两表连接后的数据集合, 我们在foreach循环中定义的变量value 代指其中的一行
for (TableBean value : values) {
if(value.getFlag().equals("order")){
TableBean tmpOrderBean = new TableBean();
//把这一行数据暂存到tempBean, Java Bean对象的复制
try {
BeanUtils.copyProperties(tmpOrderBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
//把Order表的数据放到集合中
orderBeans.add(tmpOrderBean);
}else{
try {
BeanUtils.copyProperties(pdBean, value);///注意噢, pid为key, 产品表的数据只有一行,所以我们才只用一个变量赋值
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
for (TableBean orderBean : orderBeans) {
//把order大表集合中的pid换成pname, 再写出就大功告成
orderBean.setPname(pdBean.getPname());
context.write(orderBean, NullWritable.get());
}
}
}
补充说明: Java Bean对象的复制方法
- 最后写Driver类, 七个步骤老生常谈了,不说了.
public class TableDriver {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException, IOException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\\oem\\mapjoin\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\oem\\mapjoin\\output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
最终输出的结果为:
对ReduceJoin 总结如下:
- ReduceJoin的缺点: 在ReduceJoin方式中, 合并的操作是在Reduce阶段完成, Reduce端的处理压力太大, 而Map结点的运算负载则很低(一个MapTask最多处理128MB的数据), 资源利用率不高, 且在Reduce阶段极易产生数据倾斜.
- 解决方法: MapJoin
2. Map Join
[MapJoin 适用场景]
Map Join 适用于一张小表, 一张大表的场景.
[问题引入]
- 既然我们说ReduceJoin方式在Reduce端连接表, 会使得Reduce端处理压力太大, 而且非常容易发生数据倾斜.
- 那么我们采用MapJoin, 一个MapTask一次性处理数据大小最大也就128, 256MB, 处理压力不会很大.
- 那么问题来了: 我们说的是多表(多文件)进行连接(join)的操作, 而一个MapTask仅处理一个切片(每个文件是单独进行切片的), 我们该如何进行多个表(多个表来自多个文件)的连接呢?
[解决办法]
使用缓存(DistributedCache)存储小表文件
- 在Mapper的setup阶段, 将小表文件读取到缓存集合.
- 在驱动类中添加下面代码以进行加载文件到缓存的操作。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置 HDFS 路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
2.1 Map Join 案例实操
[需求]
[需求分析]
[实现步骤和代码]
- Driver类
public class TableDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//加载缓存数据
job.addCacheFile(new URI("file:///D:/user/mapjoin/cache/pd.txt"));
//设置ReduceTask数量为0
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path("D:\\user\\mapjoin\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\user\\mapjoin\\output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
- Mapper类
public class TableMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
private HashMap<String, String> pdMap = new HashMap<>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//读取缓存文件, 把缓存文件的数据经过切割处理放入HashMap中
// context.getCacheFiles() 获取缓存文件的路径数据, 即cacheFiles存储多个缓存文件的路径
URI[] cacheFiles = context.getCacheFiles();
//拿到pd.txt缓存文件的路径
Path pdPath = new Path(cacheFiles[0]);
//获取文件系统对象. 打开文件输入流进行读取
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fis = fs.open(pdPath);
//通过包装流转换为 reader,方便按行读取
BufferedReader reader = new BufferedReader(new
InputStreamReader(fis, "UTF-8"));
//逐行读取, 按行处理
String line;
while(StringUtils.isNotEmpty(line = reader.readLine())){
//切割这一行, 放入hashMap
String[] splits = line.split("\t");
pdMap.put(splits[0], splits[1]);
}
//关闭流
IOUtils.closeStream(reader);
}
Text outK = new Text();
// order表字段: id, pid, amount
// pd表字段: pid, pname
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 读取大表的一行(order.text)
String line = value.toString();
//2. 切割
String[] splits = line.split("\t");
//3. 输出的key为id, 输出的Text为pname , amount
//根据大表的pid获取已经存储在hashmap中的pd表中的pname字段
String pname = pdMap.get(splits[1]);
//4. 拼装输出的行
outK.set(splits[0]+"\t"+ pname+"\t" + splits[2]);
context.write(outK, NullWritable.get());
}
}
. 读取大表的一行(order.text)
String line = value.toString();
//2. 切割
String[] splits = line.split("\t");
//3. 输出的key为id, 输出的Text为pname , amount
//根据大表的pid获取已经存储在hashmap中的pd表中的pname字段
String pname = pdMap.get(splits[1]);
//4. 拼装输出的行
outK.set(splits[0]+"\t"+ pname+"\t" + splits[2]);
context.write(outK, NullWritable.get());
}
}