自定义类
package groupby;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class join implements Writable {
private int empno ;
private String ename ;
private String deptno ;
private String deptname ;
private String flage ;
public join(int empno,String ename,String deptno,String deptname,String flage ){
this.ename =ename ;
this.deptname = deptname ;
this.deptno = deptno ;
this.empno = empno ;
this.flage = flage ;
}
public join(){}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getDeptno() {
return deptno;
}
public void setDeptno(String deptno) {
this.deptno = deptno;
}
public String getDeptname() {
return deptname;
}
public void setDeptname(String deptname) {
this.deptname = deptname;
}
public String getFlage() {
return flage;
}
public void setFlage(String flage) {
this.flage = flage;
}
@Override
public String toString() {
return
empno + "\t" + ename +"\t" + deptno + "\t" + deptname ;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(empno);
out.writeUTF(ename);
out.writeUTF(deptno);
out.writeUTF(deptname);
out.writeUTF(flage);
}
@Override
public void readFields(DataInput in) throws IOException {
this.empno = in.readInt();
this.ename = in.readUTF() ;
this.deptno = in.readUTF() ;
this.deptname = in.readUTF();
this.flage = in.readUTF() ;
}
}
package groupby;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.io.IOException;
import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath;
public class mapreduce {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String input = "data1" ;
String output = "out1" ;
final Configuration co = new Configuration() ;
//获取 Job 对象
final Job job = Job.getInstance(co);
//设置class
job.setJarByClass(groupby.mapreduce.class);
//设置mapper 和 Reduce
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//设置 Mapper 阶段输出数据的key 和value
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(join.class);
//设置 Reducer 阶段输出数据的key 和value
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(join.class);
//设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
//删除输出路径中的 文件
Path outDir = getOutputPath(job) ;
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
File file = new File(outDir.toUri()) ;
if(file.isDirectory()){
File[] childrenFiles = file.listFiles();
for (File childFile:childrenFiles){
childFile.delete() ;
}
}
file.delete();
}
//提交 job
final boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
/**<LongWritable, Text, Text, Access>
* 文件中的偏移量,单行文件内容, 分类的key , 存储数据自定义的类
*
*
*/
public static class MyMapper extends Mapper<LongWritable, Text, IntWritable,join> {
private String name = "" ;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit in = (FileSplit) context.getInputSplit();
name = in.getPath().getName() ;
}
//join(int empno,String ename,String deptno,String deptname,String flage )
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] js = value.toString().split("\t") ;
if(name.contains("dept")){
if(js.length ==3){
context.write(new IntWritable(Integer.valueOf(js[0].trim())),new join(0,"",js[0].trim(),js[1].trim(),"d"));
}
}else{
if(js.length == 8){
int deptno = Integer.valueOf(js[7].trim() );
join jj = new join(Integer.valueOf(js[0].trim() ),js[1].trim(),js[7].trim(),"","e") ;
context.write(new IntWritable(deptno),jj);
}
}
}
}
/**
* Text, Access, NullWritable,Access
* 对应mapper 中的
* 分类的key , 存储数据自定义的类
*/
public static class MyReducer extends Reducer<IntWritable,join , NullWritable, join> {
@Override
protected void reduce(IntWritable key, Iterable<join> values, Context context) throws IOException, InterruptedException {
String name = "";
for (join value : values) {
if(value.getFlage().equalsIgnoreCase("d")){
name = value.getDeptname() ;
break;
}
}
for (join value : values) {
if(value.getFlage().equalsIgnoreCase("e")){
value.setDeptname(name);
context.write(NullWritable.get(),value);
}
}
}
}
}