package com.bigdata;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.bigdata.hadoop_MapReduce2.WCMapper.MapReduceReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
public class hadoop_MapReduce2 extends Configured implements Tool {
// step 1 :Mapper class
public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text MapOutputkey = new Text();
// 出现一次就记录一次
private IntWritable mapOutputValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
// 读取文件的每一件,将Text类型装换为String类型
String linevalue = value.toString();
// 分割单词,以空格分割
String[]strs=linevalue.split(" ");
// 分割之后将单词从数组中一个个拿出来,组成<keyvalue>,比如<hadoop,1>
for (String str : strs) {
// 设置key的输出
MapOutputkey.set(str);
// map输出
context.write(MapOutputkey,mapOutputValue);
}
}
// step 2 :Reducer class
public static class MapReduceReducer extends Reducer<Text, IntWritable, Text , IntWritable>{
private IntWritable outputValue = new IntWritable();
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
// 对值进行迭代累加
for(IntWritable value : values) {
// total
sum+=value.get();
}
// set output value
outputValue.set(sum);
// 最终输出
context.write(key, outputValue);
}
}
}
// step 3 :Driver
public int run(String[] args)throws Exception{
// 获取集群中的相关配置信息
Configuration configuration = this.getConf();
// 创建一个job任务
Job job=Job.getInstance(configuration,this.getClass().getSimpleName());
// 整个MapReduce程序运行的入口,或者叫jar包的入口,jar具体运行的是哪个类
job.setJarByClass(this.getClass());;
// 设置Job
// input输入,输入的路径
Path inpath =new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);
// output输出,输出路径
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);
// 设置mapper
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Reduce
job.setReducerClass(MapReduceReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 提交Job-》YARN
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0:1;
}
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
//run job
int status = ToolRunner.run(configuration, new hadoop_MapReduce2(), args);
//exit program
System.exit(status);
/* args = new String[] {
"hdfs://hadoop3:9000/wc.input",
"hdfs://hadoop3:9000/output"
};
int status =new hadoop_MapReduce2().run(args);
// 关闭
System.exit(status);
}
*/
}
}
MapReduce的框架代码
猜你喜欢
转载自blog.csdn.net/weixin_38676040/article/details/79170918
今日推荐
周排行