数据:
代码:
package nuc.edu.ls.readmore;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Word implements Writable{
private String addr;
private int count;
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public Word(String addr, int count) {
super();
this.addr = addr;
this.count = count;
}
@Override
public String toString() {
return addr+":"+count+"个"+"\t";
}
public Word() {
super();
// TODO Auto-generated constructor stub
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
count = arg0.readInt();
addr = arg0.readUTF();
}
@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
arg0.writeInt(count);
arg0.writeUTF(addr);
}
}
package nuc.edu.ls.readmore;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReadMore {
public static class MapTask extends Mapper<LongWritable, Text, Text, Word> {
String pathname = null;
@Override
protected void setup(Mapper<LongWritable, Text, Text, Word>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 放在这里每台机器只要初始化一次
FileSplit fileSplit = (FileSplit) context.getInputSplit();
pathname = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Word>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] word = value.toString().split("\\s+");
for (String string : word) {
context.write(new Text(string), new Word(pathname, 1));
}
}
}
public static class ReduceTask extends Reducer<Text, Word, Text, Text> {
@Override
protected void reduce(Text arg0, Iterable<Word> arg1, Reducer<Text, Word, Text, Text>.Context arg2)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Map<String, Integer> map = new HashMap<>();
for (Word word : arg1) {
Integer count = map.getOrDefault(word.getAddr(), 0);
count++;
map.put(word.getAddr(), count);
}
Set<String> keySet = map.keySet();
StringBuilder sb = new StringBuilder();
for (String string : keySet) {
sb.append(string + ":" + map.get(string)+"个\t");
}
sb.append("共"+map.size()+"个");
arg2.write(arg0, new Text(sb.toString()));
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "eclipseToCluster");
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(ReadMore.class);
// job.setJar("C:\\Users\\LENOVO\\Desktop\\WordCount.jar");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Word.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("d:/more/"));
FileOutputFormat.setOutputPath(job, new Path("d:/moreout/"));
boolean completion = job.waitForCompletion(true);
System.out.println(completion ? 0 : 1);
}
}
}
实验结果: