版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_28844767/article/details/80488125
ReduceJoin实现查询:
SQL: select a.id,a.name,b.movie from a join b on a.id = b.id;
package mapreduce.join.reduceJoin;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 描述: ReduceJoin的具体实现
*
* 输入数据是
*
* a.txt
id name
1 huangbo
2 xuzheng
3 wangbaoqiang
*
*
*b.txt
id movie
1 1
1 2
1 3
SQL: select a.id,a.name,b.movie from a join b on a.id = b.id;
采用ReduceJoin来实现
*
*/
public class ReduceJoinMR {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
Job job = Job.getInstance(conf);
job.setJarByClass(ReduceJoinMR.class);
job.setMapperClass(ReduceJoinMRMapper.class);
job.setReducerClass(ReduceJoinMRReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//如果reducer阶段的输出的key-value的类型和mapper阶段的一致,那么可以省略前面的setMapOutClass()
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
/**
* 设置输入输出
*/
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
FileInputFormat.setInputPaths(job, inputPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath,true);
}
FileOutputFormat.setOutputPath(job,outputPath);
/**
* 提交任务
*/
boolean isDone = job.waitForCompletion(true);
System.exit(isDone ? 0 : 1);
}
/**
* Mapper阶段的业务逻辑
*/
private static class ReduceJoinMRMapper extends Mapper<LongWritable, Text, Text, Text>{
/**
* 标记和文件名之间的对应关系
* 0 : a.txt
* 1 : b.txt
*/
private int flag = 0;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
/**
* 为了获取当前的mapTask所处理的数据切片是来自哪个文件
*/
InputSplit inputSplit = context.getInputSplit();
FileSplit fileSplit = (FileSplit)inputSplit;
String fileName = fileSplit.getPath().getName();
/**
* 为了给将来这个mapTask要输出的key-value添加标记使用的。
*/
if(fileName.equals("b.txt")){
flag = 1;
}
}
private Text keyOut = new Text();
private Text valueOut = new Text();
/**
* 1、 获取当前map方法的key-value是来自于哪个文件
* 2、 解析value中的连接条件和查询字段。
* 3、连接条件当做key,查询字段当做value。 最后输出
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
String id = split[0];
String idOrName = split[1];
if(flag == 1){
idOrName = "1-" + idOrName;
}else{
idOrName = "0-" + idOrName;
}
keyOut.set(id);
valueOut.set(idOrName);
/**
* 要对valueOut的值添加标记
*/
context.write(keyOut, valueOut);
}
}
/**
* Reducer阶段的业务逻辑
*/
private static class ReduceJoinMRReducer extends Reducer<Text, Text, Text, Text>{
private Text valueOut = new Text();
/**
* 该两个list的作用就是用来存储所有的两张表的数据
* 重点的动作: 分区数据是来自于哪种表,然后存储进入对应的list中
*/
// a.txt 0
private List<String> nameList = new ArrayList<String>();
// b.txt 1
private List<String> movieList = new ArrayList<String>();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
/**
* 区分数据到两个不同的List中
*/
for(Text t : values){
String[] split = t.toString().split("-");
String flag = split[0];
if(flag.equals("0")){
nameList.add(t.toString().substring(2));
}else{
movieList.add(t.toString().substring(2));
}
}
/**
* 现在再来做join操作
*/
for(String name : nameList){
for(String movie : movieList){
valueOut.set(name + "\t" +movie);
context.write(key, valueOut);
}
}
/**
* 清空两个list,然后让这个list这个list中值不影响下一次连接操作
*/
nameList.clear();
movieList.clear();
}
}
}