MapJoin案例详解
1.MapJoin的重要知识点
MapJoin适用于有一张十分小的表和一张甚至多张非常小的表的场景,这样的话就可以在MapTask阶段将非常小的那几张表加载进内存,提前处理业务从而减少Reduce端的压力,以减少数据倾斜。
2.案例操作
2.1需求
order.txt
pid | id | amount |
---|---|---|
pd.txt
id | pname |
---|---|
要求输出以下形式
id | pname | amount |
---|---|---|
2.2编程思路
1)创建一个驱动类,类中写明加载缓存数据的代码,设置reduce的数量为0,因为用不到reduce端,直接用map端输出就行
2)因为要把小的表作为缓存文件加载到内存的数据结构中,所以在Mapper的setup方法中就要执行这个操作来获取缓存文件并加载到内存
3)map方法用来封装输出的KV
2.3代码实现
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 1 获取job信息 【需要获取hadoop的配置文件才能运行】
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置加载jar包路径
job.setJarByClass(MapJoinDriver.class);
// 3 关联mapper
job.setMapperClass(MapJoinMapper.class);
// 4 设置Map输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 加载缓存数据
job.addCacheFile(new URI("file:///E:/input/inputtablecache/pd.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
// 6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\input\\inputtable2"));
//此处输入的文件关系到Mapper里面map()中对这个文件的处理,所以要注意输入的文件内容的问题
FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\mapjoin668888"));
// 7 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Map<String, String> pdMap = new HashMap<>();
private Text outK = new Text();
//setup()方法拿到缓存文件并将其加载到内存
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//在setup方法里面获取到Driver添加的缓存文件,然后将小表的数据保存到一个内存的数据结构中
URI[] cacheFiles = context.getCacheFiles();//getCacheFiles()获取缓存文件,因为是数组,所以可以添加多个文件,一个文件就是一个值
//URI类型的数组,说明存进去之后就不是文件了,需要转类型
//需要获取一个fs的客户端
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream hdfsfis = fs.open(new Path(cacheFiles[0]));
//通过fs客户端读取缓存,所以用open()创建一个输入流
//open()方法返回的就是一个FSDataInputStream对象
//如果用FSDataInputStream的redLine()方法即hdfsfis.readLine()读取会出现中文乱码,所以官方建议用
//BufferedReader包装一下,得到一个reader
BufferedReader reader = new BufferedReader(new InputStreamReader(hdfsfis, "UTF-8"));
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {//先判断一下不为空
//因为要存到内存中,在内存中数据是以数据集
//04\t小米
String[] split = line.split("\t");
//封装数据到一个HashMap的集合,因为有键值对
pdMap.put(split[0], split[1]);
}
//关闭流
IOUtils.closeStream(reader);
IOUtils.closeStream(hdfsfis);//FSDataInputStream也是流,也在IOUtils里面
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1. 获取一行数据 // 1001\t01\t1
String orderLine = value.toString();
// 2. 切割一行数据
String[] orderString = orderLine.split("\t");
// 3. 通过orderString的第二列数据pid,去pdMap里面获取相应的pname
String pname = pdMap.get(orderString[1]);
// 4. 封装outK outV
/* outK.set(orderString[1]);
outK.set(pname);
outK.set(orderString[2]);*///会被覆盖
outK.set(orderString[0] + "\t" + pname + "\t" + orderString[2]);
// 5. 写出 outK outV
context.write(outK, NullWritable.get());
}
}