版权声明:个人原创,转载请标注! https://blog.csdn.net/Z_Date/article/details/83926197
重写Writable接口
如下代码就是自定义mr数据类型,在wordcount类使用它。
WordCountWritable
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* 自定义wc的数据类型:
* @author lyd
*/
public class WordCountWritable implements Writable{
public String word;
public int counter;
public WordCountWritable(){
}
public WordCountWritable(String word, int counter) {
this.word = word;
this.counter = counter;
}
/**
* 写
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(word);
out.writeInt(counter);
}
/**
* 读
*/
@Override
public void readFields(DataInput in) throws IOException {
this.word = in.readUTF();
this.counter = in.readInt();
}
/**
* @return the word
*/
public String getWord() {
return word;
}
/**
* @param word the word to set
*/
public void setWord(String word) {
this.word = word;
}
/**
* @return the counter
*/
public int getCounter() {
return counter;
}
/**
* @param counter the counter to set
*/
public void setCounter(int counter) {
this.counter = counter;
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return word + ":" + counter;
}
}
WordCount
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.qianfeng.mr.day01.WordCountWritable;
public class WordCount {
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text word = new Text();
IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//获取行数据
String line = value.toString();
//对数据进行拆分 [hello,qianfeng,hi,qianfeng] [hello,1603] [hi,hadoop,hi,spark]
String [] words = line.split(" ");
//循环数组
for (String s : words) {
word.set(s);
context.write(word, one);
}
}
}
/**
* 自定义reducer类
* @author lyd
*
*/
public static class MyReducer extends Reducer<Text, IntWritable, WordCountWritable, NullWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> value,Context context)
throws IOException, InterruptedException {
//定义一个计数器
int counter = 0;
//循环奇数
for (IntWritable i : value) {
counter += i.get();
}
//创建数据类型对象
WordCountWritable wc = new WordCountWritable(key.toString(), counter);
//reduce阶段的最终输出
context.write(wc, null);
}
}
/**
* job的主入口
* @param args
*/
public static void main(String[] args) {
try {
//获取配置对象
Configuration conf = new Configuration();
//创建job
Job job = new Job(conf, "wordcount");
//为job设置运行主类
job.setJarByClass(WordCount.class);
//设置map阶段的属性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置reduce阶段的属性
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(WordCountWritable.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交运行作业job 并打印信息
int isok = job.waitForCompletion(true)?0:1;
//退出job
System.exit(isok);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}