版权声明:原创文章,转载请注明出处! https://blog.csdn.net/L_15156024189/article/details/86688044
基于compareTo方法
数据
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10
第3列和第8列分别是job和deptno,下面去掉重复的job和deptno,代码:
Mapper
package distinct;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DistinctMapper extends Mapper<LongWritable, Text, Emp, NullWritable> {
private Emp emp = new Emp();
@Override
protected void map(LongWritable key, Text value, Context context) {
// TODO Auto-generated method stub
String empinfo = value.toString();
String[] fields = empinfo.split(",");
try {
if (fields != null && fields.length == 8) {
emp.setJob(fields[2]);
if (fields[7].length() > 0) {
emp.setDeptno(Integer.parseInt(fields[7]));
}
System.out.println(emp);
} else {
System.err.println("Data input error");
System.exit(1);
}
context.write(emp, NullWritable.get());
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
Reducer
package distinct;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class DistinctReducer extends Reducer<Emp, NullWritable, Emp, NullWritable>{
@Override
protected void reduce(Emp emp, Iterable<NullWritable> values, Context context)
{
// TODO Auto-generated method stub
try {
context.write(emp, NullWritable.get());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Main
package distinct;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DistinctMapper extends Mapper<LongWritable, Text, Emp, NullWritable> {
private Emp emp = new Emp();
@Override
protected void map(LongWritable key, Text value, Context context) {
// TODO Auto-generated method stub
String empinfo = value.toString();
String[] fields = empinfo.split(",");
try {
if (fields != null && fields.length == 8) {
emp.setJob(fields[2]);
if (fields[7].length() > 0) {
emp.setDeptno(Integer.parseInt(fields[7]));
}
System.out.println(emp);
} else {
System.err.println("Data input error");
System.exit(1);
}
context.write(emp, NullWritable.get());
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
Emp
package distinct;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
* Hadoop序列化员工类
*
* @author leboop
*
*/
// 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
public class Emp implements WritableComparable<Emp> {
private String job;
private Integer deptno;
@Override
public int compareTo(Emp o) {
// TODO Auto-generated method stub
if(this.deptno==o.getDeptno()) {
return this.job.compareTo(o.getJob());
}
return this.deptno-o.getDeptno();
}
@Override
public void write(DataOutput out) throws IOException {
// 序列化:从对象中将数据写到程序中
out.writeUTF(this.job);
out.writeInt(this.deptno);
}
@Override
public void readFields(DataInput in) throws IOException {
// 反序列化:从输入流中将数据读取到对象中
this.job = in.readUTF();
this.deptno = in.readInt();
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public Integer getDeptno() {
return deptno;
}
public void setDeptno(Integer deptno) {
this.deptno = deptno;
}
@Override
public String toString() {
return "Emp [job=" + job + ", deptno=" + deptno + "]";
}
}
Emp作为Mapper的KeyOut,必须实现WriteableCompatable接口,不然出现Emp类型无法转换成WriteableCompatable,如下:
[LocalJobRunner Map Task Executor #0] WARN org.apache.hadoop.mapred.MapTask - Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.ClassCastException: class distinct.Emp
at java.lang.Class.asSubclass(Class.java:3404)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:887)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1004)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Thread-4] INFO org.apache.hadoop.mapred.LocalJobRunner - map task executor complete.
[Thread-4] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local1697455586_0001
java.lang.Exception: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :class distinct.Emp
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :class distinct.Emp
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:414)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: class distinct.Emp
at java.lang.Class.asSubclass(Class.java:3404)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:887)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1004)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
... 10 more
[main] INFO org.apache.hadoop.mapreduce.Job - Job job_local1697455586_0001 running in uber mode : false
[main] INFO org.apache.hadoop.mapreduce.Job - map 0% reduce 0%
[main] INFO org.apache.hadoop.mapreduce.Job - Job job_local1697455586_0001 failed with state FAILED due to: NA
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 0
结果
如图:
基于重写的equals方法
略。
基于拼接Mapper的KeyOut方法
略。