一个Mapper对应一个碎片段。
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.util.StringUtils;import java.io.IOException;/** * author: test * date: 2015/1/25. */public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * 输入: * 行所在的下标为key,类型为LongWritable * 行的内容为value,类型为Text * * 输出: * key: Text * value: IntWritable */ //此方法循环调用,从文件的split中,读取每行调用一次,把该行所在的下标为key,以该行的值(内容)为value, protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = StringUtils.split(value.toString(), ' '); for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } }
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * author: test * date: 2015/1/25. */public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * 此方法循环调用,每组调用一次 * 这组的特点是:key相同,value可能有多个 */ protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(new Text(key), new IntWritable(sum)); } }
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * author: test * date: 2015/1/25. */public class RunJob { public static void main(String[] args) { Configuration conf = new Configuration();//装在src或者classPath下的所有配置文件 try { Job job = Job.getInstance(); job.setJarByClass(RunJob.class); job.setJobName("WordCount"); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileSystem fs = FileSystem.get(conf); FileInputFormat.addInputPath(job, new Path("D:/hadoop/input/input")); Path output = new Path("D:/hadoop/output/wc"); if (fs.exists(output)) { fs.delete(output, true);//递归删除 } FileOutputFormat.setOutputPath(job, output); if (job.waitForCompletion(true)) { System.out.println("Job Done!"); } } catch (Exception e) { e.printStackTrace(); } } }
执行:
1.打jar包,名字为wc.jar
2.hadoop jar wc.jar com.xxx.RunJob(入口类)
how to kill a MapReduce job
Depending on the version, do:
version <2.3.0
Kill a hadoop job:
hadoop job -kill $jobId
You can get a list of all jobId's doing:
hadoop job -list
version >=2.3.0
Kill a hadoop job:
yarn application -kill $ApplicationId
You can get a list of all ApplicationId's doing:
yarn application -list
hadoop与job相关的命令:
1.查看 Job 信息:
hadoop job -list
2.杀掉 Job:
hadoop job –kill job_id
3.指定路径下查看历史日志汇总:
hadoop job -history output-dir
4.作业的更多细节:
hadoop job -history all output-dir
5.打印map和reduce完成百分比和所有计数器:
hadoop job –status job_id
6.杀死任务。被杀死的任务不会不利于失败尝试:
hadoop jab -kill-task <task-id>
7.使任务失败。被失败的任务会对失败尝试不利:
hadoop job -fail-task <task-id>