测试数据:
file1.txt
1,9819,100,121
2,8918,2000,111
3,2813,1234,22
4,9100,10,1101
5,3210,490,111
6,1298,28,1211
7,1010,281,90
8,1818,9000,20
file2.txt
100,3333,10,100
101,9321,1000,293
102,3881,701,20
103,6791,910,30
104,8888,11,39
105,2345,880,40
106,1234,700,40
mapper代码
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
public class TOPNMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
int len;
int[] top;
private static final Text KEY = new Text("K");
@Override
protected void setup(Context context) throws IOException, InterruptedException {
len = context.getConfiguration().getInt("N", 10);
top = new int[len + 1];
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString().trim();
String[] arr = line.split(",");
if(arr.length > 0 && arr.length == 4) {
int payment = Integer.valueOf(arr[2]);
add(payment);
}
}
private void add(int payment) {
top[0] = payment;
Arrays.sort(top);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for(int i = 1; i < top.length; i++) {
context.write(KEY, new IntWritable(top[i]));
}
}
}
reducer代码
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Arrays;
public class TOPNReducer extends Reducer<Text, IntWritable, IntWritable, IntWritable> {
int len;
int[] top;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
len = context.getConfiguration().getInt("N", 10);
top = new int[len + 1];
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for(IntWritable val : values) {
int payment = val.get();
add(payment);
}
}
private void add(int payment) {
top[0] = payment;
Arrays.sort(top);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for(int i = len; i >= 1; i--) {
context.write(new IntWritable(len - i + 1), new IntWritable(top[i]));
}
}
}
job代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 该案例对输入数据进行取前N最大值处理,降序排列。
*/
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if(args.length != 2) {
System.err.println("Usage: MaxTemperature<input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
conf.setInt("N", 5);
Job job = Job.getInstance(conf, "TOP N job");
job.setJarByClass(Job.class);
job.setMapperClass(TOPNMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(TOPNReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outDirPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDirPath)) {
fs.delete(outDirPath, true);
}
FileOutputFormat.setOutputPath(job, outDirPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
结果:
1 9000
2 2000
3 1234
4 1000
5 910