大数据学习之路16-倒排索引创建,输入切片FileSplit应用

我们需要将文本使用mapreduce整理成需求的样子。也就是单词加文件名的样子,那我们又怎么能知道map方法现在调用的这行数据属于哪个文件呢?调用这个方法的是mapworker,而且调用这个map方法的还不止一个worker。虽然调用map方法的有很多worker但是,每个worker再调用方法的时候都会传一个context而这个context就包含了这个worker正在读的数据属于哪个文件。因为这个worker肯定知道自己读的是哪一部分数据。这样worker所读取的数据就有一个专业的术语:切片。当文件很小时,一个切片很可能就是一个文件,可是当文件很大时,一个文件就会分成几个切片,分别交给几个worker去处理。

所以再捋一下思路:worker调用map方法时传递的context包含这一行数据所属的切片。通过这个切片,我们就可以找到所属的文件。

我们可以通过context.getInputSplit();拿到对切片信息的描述。InputSplit这个类是一个抽象类,而我们要拿到的是对这个抽象类的实现,这里我们要用的是FileSplit,我们现在写的代码基本上都是针对文件,但是mapreduce不仅仅能处理文本文件,它还可以处理比如数据库文件之类的。FileSplit导包的时候导包名长的那个。拿到FileSplit切片信息之后我们就可以通过fileSplit.getPath().getName()拿到文件名。

一个切片对应一个worker,一个切片中的一行数据对应一个map方法,所以这两行代码我们不需要写在map方法里,我们我们应该写在map方法外。因为一个worker只需要拿一次文件名就行。之前我们用过cleanup方法,这个方法是在一个worker结束的时候才会去做cleanup方法里面的操作。同样的,worker在正式进行操作之前也会调用setup方法做一些初始化的操作。然后才真正的去拿数据处理。所以我们可以将这两行代码写在这个方法中。

以下贴出代码:

package com.test.wordcount4;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount1 {
	     
        public static class fileMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        	 Text k = new Text();
   	         IntWritable v = new IntWritable();
   	         String fileName = null;
   	         @Override
   	        protected void setup(Context context)
   	        		throws IOException, InterruptedException {
   	        	FileSplit inputSplit = (FileSplit) context.getInputSplit();
   	        	fileName = inputSplit.getPath().getName();
   	        }
        	@Override
        	protected void map(LongWritable key, Text value,Context context)
        			throws IOException, InterruptedException {
        		String line = value.toString();
        		String[] split = line.split(" ");
        		for (String str : split) {
					k.set(str+"-"+fileName);
					v.set(1);
					context.write(k,v);
					
				}
        	}
        }
        public static class fileReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        	 IntWritable v = new IntWritable();
        	   @Override
        	protected void reduce(Text key, Iterable<IntWritable> values,Context context)
        			throws IOException, InterruptedException {
        		   int count = 0;
        		for (IntWritable value : values) {
					count+=value.get();
				}
        		v.set(count);
        		context.write(key, v);
        	}
        }
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
			  Configuration conf = new Configuration();
			  Job job = Job.getInstance(conf);
			  job.setJarByClass(WordCount1.class);
			  job.setMapperClass(fileMapper.class);
			  job.setReducerClass(fileReducer.class);
			  job.setMapOutputKeyClass(Text.class);
			  job.setMapOutputValueClass(IntWritable.class);
			  job.setOutputKeyClass(Text.class);
			  job.setOutputValueClass(IntWritable.class);
			  FileInputFormat.setInputPaths(job, new Path(args[0]));
			  FileSystem fs = FileSystem.get(conf);
			  Path p = new Path(args[1]);
			  if(fs.exists(p)){
				  fs.delete(p, true);
			  }
			  FileOutputFormat.setOutputPath(job, p);
			  job.setNumReduceTasks(2);
			  boolean res = job.waitForCompletion(true);
			  System.out.println(res?"mr程序成功执行":"mr程序好像被外星人抓走了");
		}
}
package com.test.wordcount4;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount2 {
      public static class fileMapper extends Mapper<LongWritable, Text, Text, Text>{
    	       Text k = new Text();
    	       Text v = new Text();
    	       @Override
    	    protected void map(LongWritable key, Text value, Context context)
    	    		throws IOException, InterruptedException {
    	    	String line = value.toString();
    	    	String[] split = line.split("-");
    	    	k.set(split[0]);
    	    	v.set(split[1]);
    	    	context.write(k, v);
    	    }
      }
      
      public static class fileReducer extends Reducer<Text, Text, Text, Text>{
    	     Text v = new Text();
    	      @Override
    	    protected void reduce(Text key, Iterable<Text> values, Context context)
    	    		throws IOException, InterruptedException {
    	    	  StringBuilder sb  = new StringBuilder();
    	    	for (Text value : values) {
					sb.append(value.toString()).append(",");
				}
    	    	v.set(sb.toString());
    	    	context.write(key, v);
    	    }
      }
      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		        Configuration conf = new Configuration();
		        Job job = Job.getInstance(conf);
		        job.setJarByClass(WordCount2.class);
		        job.setMapperClass(fileMapper.class);
		        job.setReducerClass(fileReducer.class);
		        job.setMapOutputKeyClass(Text.class);
		        job.setMapOutputValueClass(Text.class);
		        job.setOutputKeyClass(Text.class);
		        job.setOutputValueClass(Text.class);
		        FileInputFormat.setInputPaths(job, new Path(args[0]));
		        FileSystem fs = FileSystem.get(conf);
		        Path p = new Path(args[1]);
		        if(fs.exists(p)){
		        	fs.delete(p, true);
		        }
		        FileOutputFormat.setOutputPath(job, p);
		        job.setNumReduceTasks(2);
		        boolean res = job.waitForCompletion(true);
		        System.out.println(res?"mr程序成功执行":"mr程序好像被外星人抓走了");
	  }
}

第一段代码的运行结果:

第二段代码的运行结果:

猜你喜欢

转载自blog.csdn.net/qq_37050372/article/details/81739188