数据:
child parent
张 张爸
张 张妈
王 王爸
王 王妈
张爸 张爷爷
张爸 张奶奶
张妈 张姥爷
张妈 张姥姥
王爸 王爷爷
王爸 王奶奶
王妈 王姥爷
王妈 王姥姥
输出:
grandchild grandparent
张 张爷爷
张 张奶奶
张 张姥爷
张 张姥姥
分析:
map端:
表一 key:child value: 1-parent
张 1-张爸
张 1-张妈
张爸 1-张爷爷
张爸 1-张奶奶
表二 key:parent value: 2-child
张爸 2-张
张妈 2-张
张爷爷 2-张爸
张奶奶 2-张爸
reduce 端:
张 list(1-张爸,1-张妈)
张爸 list(1-张爷爷,1-张奶奶,2-张)
张爷爷 list(2-张爸)
java代码:
package com.mxm.day26_1;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//主类
public class Single3 extends Configured implements Tool {
// Mapper实现类
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
//
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行数据
String line = value.toString();
// 按格式分割
String[] split = line.split("\t");
// 区分parent与child
if (line.contains("parent")) {
return;
}
// 获取分割后的数据元素
String child = split[0];
String parent = split[1];
// 分成两个表 添加标记 为reduce计算做准备
// 表一
context.write(new Text(parent), new Text("1-" + child));
// 表二
context.write(new Text(child), new Text("2-" + parent));
}
}
// Reducer实现类
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
// 文件头判断
boolean flag = false;
protected void reduce(Text key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
//
if (!flag) {
context.write(new Text("grandChild"), new Text("grandParent"));
flag = true;
}
// 把数据放到List容器中
ArrayList<String> grandChildList = new ArrayList<>();
ArrayList<String> grandParentList = new ArrayList<>();
for (Text v : value) {
// 按-分割 区分Parent与child
String[] split = v.toString().split("-");
if (split[0].equals("1")) {
// 把分割后的数据放到容器中
grandChildList.add(split[1]);
} else if (split[0].equals("2")) {
// 把分割后的数据放到容器中
grandParentList.add(split[1]);
}
}
// 先判断 再做笛卡尔积-->双重for循环
if (grandChildList.size() > 0 && grandParentList.size() > 0) {
for (String grandChild : grandChildList) {
for (String grandParent : grandParentList) {
// 输出
context.write(new Text(grandChild), new Text(grandParent));
}
}
}
}
}
@Override
public int run(String[] args) throws Exception {
// 获取配置对象
Configuration conf = getConf();
// 实例对象
Job job = Job.getInstance(conf);
// 主类
job.setJarByClass(Single3.class);
// mapper类
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// Reducer类
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 输入输出
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
int status = job.waitForCompletion(true) ? 0 : -1;
return status;
}
public static void main(String[] args) throws Exception {
int status = ToolRunner.run(new Configuration(), new Single3(), args);
System.exit(status);
}
}