max计算
需求:输出每天最高温度的日期及温度
数据:
20170931 20.1
20170930 30.6
20170931 30.6
20170929 30.02
20170928 10.3
20170928 30.3
20170927 28.3
20170931 28.1
java代码:
max
package com.beicai.wc1;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.chainsaw.Main;
//主类
public class Demo1Max {
//mapper实现类
public static class MyMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(" ");
String date = split[0];
// double temp = Double.parseDouble(split[1]);
double temp = Double.valueOf(split[1]);
context.write(new Text(date), new DoubleWritable(temp));
}
}
//reduce实现类
public static class MyReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<DoubleWritable> value, Context context)
throws IOException, InterruptedException {
// key=date
// value=list(10.1,30.6,12.5)
double max = Double.MIN_VALUE;
for (DoubleWritable v : value) {
double temp = v.get();
if (temp > max) {
max = temp;
}
}
context.write(key, new DoubleWritable(max));
}
}
//驱动类
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Demo1Max.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
int status=job.waitForCompletion(true)?0:-1;
System.exit(status);
}
}
avg
package com.beicai.wc1;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.chainsaw.Main;
public class Demo1Avg {
public static class MyMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(" ");
String date = split[0];
// double temp = Double.parseDouble(split[1]);
double temp = Double.valueOf(split[1]);
context.write(new Text(date), new DoubleWritable(temp));
}
}
public static class MyReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<DoubleWritable> value, Context context)
throws IOException, InterruptedException {
// key=date
// value=list(10.1,30.6,12.5)
double sum=0.0;
int count=0;
for (DoubleWritable v : value) {
sum+=v.get();
count++;
}
context.write(key, new DoubleWritable(sum/count));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Demo1Avg.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
int status=job.waitForCompletion(true)?0:-1;
System.exit(status);
}
}
上传max.jar到linux
上传测试文件到in中 (max.txt): hdfs dfs -put max.txt /usr/hello/in
输入输出: hadoop jar ./max.jar /usr/hello/in/max.txt /usr/hello/in/201 //路径不能重复(存在)
Or: yarn jar ./max.jar /usr/hello/in/max.txt /usr/hello/in/201 //路径不能重复(存在)
50070查看 usr/hello/in/201/part-r-00000
linux查看: hdfs dfs -cat /usr/hello/in/201/part-r-00000