java mapreduce
寻找每一年全球的最高气温
输入值的key是文件中的行偏移量,map函数不需要该信息,所以将其忽略。value是一行文本信息
- map的功能是从中找出每年的温度,统计到一个对应的数组中。
- reduce的功能是遍历每年的列表,并从其中找到最高温度。
Mapper类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
- Mapper类是一个泛型类型,形参有四个,分别指定map函数的输入键(行偏移量,长整数),输入值(一行文本),输出键(年份),输出值(气温,整数)
- map()方法:输入一个键和一个值
- Context实例用于输出内容的写入,将年份数据按照Text对象进行读写,气温对象封装在IntWritable类型中
Reducer类
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
- Reducer类也有四个函数指定输入输出类型
- reduce函数的输入类型必须匹配map函数的输出类型
第三部分代码负责运行MapReduce作业
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定map类和reduce类
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
//设置map函数和reduce函数的输出类型,这两个函数的输出类型一般相同。如果不同就分别用setMapOutputKeyClass(),setMapOutputValueClass来设置map函数的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入的类型通过InputFormat类控制,默认为文本输入格式
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
- Job对象:用来指定作业执行规范。Job.setJarByClass(**.class),向集群中传递一个类名,hadoop利用这个类来查找包含它的jar文件,从而把jar文件上传到集群时,不用指定jar文件的名称
- FileInputFormat中的静态方法addInputPath()定义入数据路径,这个路径可以是单个文件、一个目录(目录下的所有文件当做输入)或者符合特定文件模式的一系列文件
- FileOutputFormat 中的静态方法setOutputPath()用来指定输出文件的路径(只能有一个路径),在运行作业前该路径应该是不存在的,否则会报错。
测试运行
windows下测试未完成
问题:重新启动后,hadoop命令无法识别,只能进入hadoop-2.9.1/bin中启动;并且不知道如何恰当的编译java文件为jar包。。