package counter;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 实现自定义计数器功能
*
* @author Administrator
*
*/
public class WordCountApp {
private static String INPUT_PATH = "hdfs://hadoop:9000/in/hello";
private static String OUT_PATH = "hdfs://hadoop:9000/out";
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
fileSystem.delete(new Path(OUT_PATH), true);
Job job = new Job(conf);
job.setJarByClass(WordCountApp.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.waitForCompletion(true);
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
List<String> wordscount = new ArrayList<String>();
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
//自定义计数器
Counter hellocounter = context.getCounter("diff words", "hellocounter");
final String[] splited = value.toString().split("\t");
for (String word : splited) {
final Text key2 = new Text(word);
final LongWritable value2 = new LongWritable(1L);
context.write(key2, value2);
//碰到不同单词就增加1
if(!wordscount.contains(word)){
wordscount.add(word);
hellocounter.increment(1L); }
}
};
}
public static class MyReducer extends Reducer<Text,LongWritable, Text,LongWritable>{
protected void reduce(Text key2, java.lang.Iterable<LongWritable> value2s, org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
Long sum = 0L;
for (LongWritable value2 : value2s) {
sum += value2.get();
}
context.write(key2, new LongWritable(sum));
};
}
}
mapred代码示例--自定义计数器
猜你喜欢
转载自jsh0401.iteye.com/blog/2111553
今日推荐
周排行