案例要求
订单数据orders
订单号 | 商品编号 | 数量 |
---|---|---|
100001 | 03 | 3 |
100002 | 02 | 1 |
100001 | 04 | 4 |
100003 | 01 | 1 |
100004 | 01 | 2 |
商品数据produce
商品编号 | 商品名称 |
---|---|
01 | 小米 |
02 | 华为 |
03 | 中兴 |
04 | 联想 |
要求:对produce和orders按照商品编号进行连接
程序代码
package com.zhiyou100.hadoop.mr.join.reduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//本地运行
conf.set("mapreduce.framework.name", "local");
//集群运行配置
//System.setProperty("HADOOP_USER_NAME", "root");
Job job = Job.getInstance(conf);
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableWritable.class);
//reduce输出类型
job.setOutputKeyClass(TableWritable.class);
job.setOutputValueClass(NullWritable.class);
//文件输入输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\input\\order\\"));
FileOutputFormat.setOutputPath(job, new Path("E:\\output\\order\\"));
System.exit(job.waitForCompletion(true)?0:1);
}
}
class MyMapper extends Mapper<LongWritable, Text, Text, TableWritable>{
private TableWritable tw = new TableWritable();
private Text k = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//1.获取文件(输入切片)名
FileSplit fs = (FileSplit) context.getInputSplit();
String fileName = fs.getPath().getName();
//2.获取输入数据
String line = value.toString();
if(fileName.equals("a.txt")) {
String[] strs = line.split("\t");
tw.setOid(strs[0]);
tw.setPid(strs[1]);
tw.setAmount(Integer.parseInt(strs[2]));
tw.setFname(fileName);
k.set(strs[1]);
}else {
String[] strs = line.split("\t");
tw.setPid(strs[0]);
tw.setPname(strs[1]);
tw.setFname(fileName);
k.set(strs[0]);
}
context.write(k, tw);
}
}
class MyReducer extends Reducer<Text, TableWritable, TableWritable, NullWritable>{
@Override
protected void reduce(Text key, Iterable<TableWritable> values,Context context)
throws IOException, InterruptedException {
//创建一个临时的list存储订单对象
List<TableWritable> orders = new ArrayList<>();
TableWritable produce = new TableWritable();
for (TableWritable value : values) {
if(value.getFname().equals("a.txt")) {
TableWritable order = new TableWritable();
try {
//使用工具类拷贝
BeanUtils.copyProperties(order, value);
} catch (Exception e) {
e.printStackTrace();
}
orders.add(order);
}else {
try {
//使用工具类拷贝
BeanUtils.copyProperties(produce, value);
} catch (Exception e) {
e.printStackTrace();
}
}
}
for (TableWritable order : orders) {
order.setPname(produce.getPname());
context.write(order, NullWritable.get());
}
}
}
输出文件显示:
10004,小米,2
10003,小米,1
10002,华为,1
10001,中兴,3
10001,联想,4