1-Map
package com.sdg.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
- Create by gao on 2018/6/25
- 这个阶段是map 阶段
- 我怎么知道我的事什么类型
- 默认的是textInputFormat
- <Text ,>:这个组建时一行一行的读取数据,把第一行起始偏移量做key 把这一行的内容作为value
- hello hadoop
- hello
- <0,hellO hadoop>
- <14,hellO> 因此默认的情况下是keyIn 是int 类型 那么自然而然valueIn 的类型是String
- 输出
- int String long 在序列的过程中,hadoop认为jdk的不好用
因此hadoop 自己封装了一个类型
*/
public class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(value);
String[] split = value.toString().split(" ");
for (String word : split) {
//map阶段的输出使用程序的上下问题
context.write(new Text(word),new LongWritable(1));}
}
}
2-Reduce
package com.sdg.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
- Create by gao on 2018/6/25
确定四个类型很重要
*/
public class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {@Override
protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException {
int count = 0;
for (LongWritable value : values) {count += value.get(); } System.out.println(key); context.write(key, new LongWritable(count));
}
}
3-job提交
package com.sdg.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.io.IOException;
/**
Create by gao on 2018/6/25
*/
public class Job01 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//没有配置文件就是本地
//有配置文件就是集群
Configuration conf = new Configuration();
// conf.set("dfs.defaultFS","hdfs://vm2:9000");
//job完成先关信息的组装
Job job = Job.getInstance(conf);
//指定本次job 运行的主类
job.setJarByClass(Job01.class);
//指定本次的map reducer 的类是什么
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//指定本次map阶段输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定mr 阶段最终的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定本次mr程序的输入的数据以及数据的结果在哪里(直接就是在hdfs上就可以)
FileInputFormat.setInputPaths(job, new Path("D:/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("D:/wordcount/output"));
boolean b = job.waitForCompletion(false);//退出系统 System.exit(b ? 0 : 1);
}
}