版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/m0_37786447/article/details/79522756
从一堆单词中找出,拥有相同字符的单词。比如:
输入: 输出:
aap aap aap,apa,paa
paa abfsd basdf,sabdf
acle
basdf
sabdf
apa
代码Anagram.class:
package com.linewell.mapreduce;
import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.collections.IterableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Anagram implements Tool {
public static void main(String[] args) throws Exception {
String[] arg0 = {
"hdfs://192.168.72.129:9000/local/in",
"hdfs://192.168.72.129:9000/local/out3"
};
int status =ToolRunner.run(new Configuration(), new Anagram(), arg0);
System.exit(status);
}
public static class AnagramMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String text = value.toString();
char[] textCharArr = text.toCharArray(); // 把字符串装换成字符数组。
Arrays.sort(textCharArr); // 按字符顺序排序
//String SortedText = new String(textCharArr); // 把字符转换成字符串。
context.write(new Text(String.valueOf(textCharArr)), value);
}
}
public static class AnagramReducer extends Reducer< Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer value = new StringBuffer();
int count = 0; // 用来统计相同字母单词的个数
for (Text text : values) {
if (value.length() > 0) {
value.append(","); // 用逗号进行单词之间的拼接
}
value.append(text);
count++;
}
System.out.println("============="+count);
if (count > 1) { // 过滤单个单词
context.write(key, new Text(value.toString()));
}
}
}
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
Path path = new Path(arg0[1]);
FileSystem fs = path.getFileSystem(conf);
// 删除已经存在的输出目录
if (fs.isDirectory(path)) {
fs.delete(path, true);
}
// 创建对象
Job job = new Job(conf, "anagram");
job.setJarByClass(Anagram.class);
// 自定输入、输出目录
FileInputFormat.setInputPaths(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
// 指定map reduce 对象
job.setMapperClass(AnagramMapper.class);
job.setReducerClass(AnagramReducer.class);
// 指定mapper 和reducer 的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 提交作业
return job.waitForCompletion(true) ? 1 : 0;
}
@Override
public Configuration getConf() {
// TODO Auto-generated method stub
return null;
}
@Override
public void setConf(Configuration arg0) {
// TODO Auto-generated method stub
}
}