结合案例讲解MapReduce重要知识点 ----------- 倒排序

文章及其内容:​ index.html : hadoop is good hadoop hadoop is ok​ page.html : hadoop has hbase hbase is good hbase and hive​ content.html : hadoop spark hbase are good ok

输出: and page.html:1 are content.html:1 hadoop index.html:3;page.html:1;content.html:1 hbase page.html:3;content.html:1


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DescSortCombiner extends Reducer<Text, Text, Text, Text>{

	 * index.html_hadoop list(1,1,1)
	 * index.html_is list(1,1)
	 * index.html_good list(1)
	 * index.html_ok list(1)
	 * page.html_hadoop list(1)
	 * hadoop index.html:3
	 * hadoop page.html:1
	protected void reduce(Text key, Iterable<Text> value,Context context) throws IOException,
			InterruptedException {
		 int counter = 0;
		 Text k = new Text();
		 Text v = new Text();
		String s [] = key.toString().split("_");
		for (Text t : value) {
			counter += Integer.parseInt(t.toString());
		context.write(k, v);


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

 * @author lyd
public class DescSort  extends Configured implements Tool{
	 * 自定义的myMapper
	 * @author lyd
	static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{

		protected void setup(Context context)throws IOException, InterruptedException {

		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			InputSplit is = context.getInputSplit();
			String fileName = ((FileSplit)is).getPath().getName();
			String lines [] = line.split(" ");
			for (String s: lines) {
				context.write(new Text(fileName+"_"+s), new Text(1+""));
			 * index.html_hadoop 1
			 * index.html_is 1
			 * index.html_good 1
			 * index.html_hadoop 1
			 * index.html_hadoop 1
			 * index.html_is 1
			 * index.html_ok 1
			 * page.html_hadoop 1

		protected void cleanup(Context context)throws IOException, InterruptedException {
	 * 自定义MyReducer
	 * @author lyd
	static class MyReducer extends Reducer<Text, Text, Text, Text>{

		protected void setup(Context context)throws IOException, InterruptedException {
		List<String> li = new ArrayList<String>();
		protected void reduce(Text key, Iterable<Text> value,Context context)
				throws IOException, InterruptedException {
			 * index.html_hadoop list(1,1,1)
			 * index.html_is list(1,1)
			 * index.html_good list(1)
			 * index.html_ok list(1)
			 * page.html_hadoop list(1)
			 *hadoop list(index.html:3,page.html:1)
			 int counter = 0;
			 for (Text t : value) {
				counter += Integer.parseInt(t.toString());
			String s [] = key.toString().split("_");
			li.add(s[1]+" "+s[0]+":"+counter);*/
			String v = "";
			for (Text t : value) {
				v += t.toString() +";";
			context.write(key, new Text(v.substring(0, v.length()-1)));
		protected void cleanup(Context context)throws IOException, InterruptedException {
			/*for (String s : li) {
				String ss [] = s.split(" ");
	public int run(String[] args) throws Exception {
		Configuration conf = super.getConf();
		Job job = Job.getInstance(conf, "model03");
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1]))){
			fs.delete(new Path(args[1]), true);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		int isok = job.waitForCompletion(true) ? 0 : 1;
		return isok;
	 * job的主入口
	 * @param args
	public static void main(String[] args) {
		try {
			String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs();
			System.exit(ToolRunner.run(new DescSort(), argss));
		} catch (Exception e) {

