测试数据:
dept.txt
#deptno dname loc
30 sales chicago
20 research dallas
10 accounting newyork
employee.txt
#empno ename job mgr hiredate sal comm deptno loc
7499 allen salesman 7698 1981-02-20 1600 300 30
7782 clark managers 7639 1981-06-09 2450 10
7654 martin salesman 7698 1981-03-20 1250 1400 30 boston
7900 james clerk 7698 1981-01-09 950 30
7788 scott analyst 7566 1981-09-01 3000 100 20
Emplyee:
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Emplyee implements WritableComparable<Emplyee> {
private String empno = "";
private String ename = "";
private String deptno = "";
private String dname = "";
private int flag = 0;//0=员工/1=部门
public Emplyee() {
}
public Emplyee(String empno, String ename, String deptno, String dname, int flag) {
this.empno = empno;
this.ename = ename;
this.deptno = deptno;
this.dname = dname;
this.flag = flag;
}
public Emplyee(Emplyee e) {
this.empno = e.getEmpno();
this.ename = e.getEname();
this.deptno = e.getDeptno();
this.dname = e.getDname();
this.flag = e.getFlag();
}
@Override
public int compareTo(Emplyee o) {
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(empno);
dataOutput.writeUTF(ename);
dataOutput.writeUTF(deptno);
dataOutput.writeUTF(dname);
dataOutput.writeInt(flag);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.empno = dataInput.readUTF();
this.ename = dataInput.readUTF();
this.deptno = dataInput.readUTF();
this.dname = dataInput.readUTF();
this.flag = dataInput.readInt();
}
@Override
public String toString() {
return this.empno + " " + this.ename + " " + this.dname + " " + this.deptno;
}
public String getEmpno() {
return empno;
}
public String getEname() {
return ename;
}
public String getDeptno() {
return deptno;
}
public String getDname() {
return dname;
}
public int getFlag() {
return flag;
}
public void setEmpno(String empno) {
this.empno = empno;
}
public void setEname(String ename) {
this.ename = ename;
}
public void setDeptno(String deptno) {
this.deptno = deptno;
}
public void setDname(String dname) {
this.dname = dname;
}
public void setFlag(int flag) {
this.flag = flag;
}
}
mapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class JoinOneMapper extends Mapper<LongWritable, Text, IntWritable, Emplyee> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString().trim();
if(!line.startsWith("#")) {
String[] arr = line.split("\t");
if(arr.length == 3) {//部门数据
Emplyee e = new Emplyee();
e.setDeptno(arr[0]);
e.setDname(arr[1]);
e.setFlag(0);
context.write(new IntWritable(Integer.valueOf(arr[0])), e);
} else {//员工信息
Emplyee e = new Emplyee();
e.setEmpno(arr[0]);
e.setEname(arr[1]);
//e.setDeptno(arr[7]);
e.setFlag(1);
context.write(new IntWritable(Integer.valueOf(arr[7])), e);
}
}
}
}
reducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class JoinOneReducer extends Reducer<IntWritable, Emplyee, NullWritable, Text> {
@Override
protected void reduce(IntWritable key, Iterable<Emplyee> values, Context context) throws IOException, InterruptedException {
Emplyee dept = null;
List<Emplyee> list = new ArrayList<Emplyee>();
for(Emplyee val : values) {
if(val.getFlag() == 0) {//部门信息
dept = new Emplyee(val);
} else if(val.getFlag() == 1) {//员工信息
list.add(new Emplyee(val));
}
}
//循环员工信息列表,依次输出信息
for(Emplyee emp : list) {
emp.setDname(dept.getDname());
emp.setDeptno(dept.getDeptno());
context.write(NullWritable.get(), new Text(emp.toString()));
}
}
}
job
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 两个输入数据关联。
* 输出数据格式类似如下:
* select e.empno, e.ename, d.dname,d.deptno from emp e join dept d on e.deptno = d.deptno;
*
* 思路:
* key:deptno
* 思路1:value(Text):empno_ename_0/deptno_dname_1
*
* 思路2:自定义实体bean(字段:empno、ename/deptno/dname/flag)
*/
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if(args.length != 2) {
System.err.println("Usage: Join<input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Join job1");
job.setJarByClass(JobMain.class);
job.setMapperClass(JoinOneMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Emplyee.class);
job.setReducerClass(JoinOneReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outDirPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDirPath)) {
fs.delete(outDirPath, true);
}
FileOutputFormat.setOutputPath(job, outDirPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
结果:
7782 clark accounting 10
7788 scott research 20
7900 james sales 30
7654 martin sales 30
7499 allen sales 30