MR中的输入控制(InputFormat和MultipleInputs)
InputFormat
- InputFormat(输入格式化器):MapReduce开始阶段,InputFormat用来产生InputSplit,并基于RecordReader把它切分成record,形成Mapper的输入
- MR内置的InputFormat:
1)TextInputFormat:作为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列LF或CR结束的行,key是每一行的位置偏移量,是LongWritable类型,value是这一行的内容,Text类型。
2)KeyValueTextInputFormat:同样用于读取文本文件,如果行被分隔符(缺省是tab)分为两部分,则第一部分是key,剩下的部分是value;如果没有分隔符,则整行都是key,value为空。
3)SequenceFileInputFormat:用于读取sequence file。sequence file是Hadoop用于存储数据自定义格式的binary文
件。它有两个子类:SequenceFileAsBinaryInputFormat,将 key和value以BytesWritable的类型读出;
SequenceFileAsTextInputFormat,将key和value以Text类型读出。
4)SequenceFileInputFilter:根据filter从sequence文件中取得部分满足条件的数据,通过 setFilterClass指定Filter,内
置了三种 Filter,RegexFilter取key值满足指定的正则表达式的记录;PercentFilter通过指定参数f,取记录行数%f0的
记录;MD5Filter通过指定参数f,取MD5(key)%f0的记录。
5)NLineInputFormat:0.18.x新加入,可以将文件以行为单位进行split,比如文件的每一行对应一个mapper。得到的
key是每一行的位置偏移量(LongWritable类型),value是每一行的内容,Text类型。
6)CompositeInputFormat:用于多个数据源的join。 - 为MR设置指定的InputFormat:job.setInputFormatClass(xxxInputFormat.class);
- 自定义的InputFormat:
- 内置的输入格式化器可以应对大部分需求,但在有些需求下,内置的输入格式化器无法满足,我们需要自己定义输入格式化器。
- 所有的InputFormat都要直接或间接继承!InputFormat抽象类!(注:也有InputFormat接口,getSplit方法的返回不一样)
- InputFormat抽象类中主要定义了如下两个方法:
/**
此方法返回一个RecordReader对象
一个RecordReader包含方法描述,如何从InputSplit中切分出要送入Mapper的key-value对
*/
@Override
public RecordReader<K, V> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
return null;
}
/**
生产InputSplit集合的方法
此方法接受JobContext接受环境信息,得到要处理的文件信息后,进行逻辑切割,产生InputSplit集合返回
*/
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException, InterruptedException {
return null;
}
- 在更多的时候我们不会直接继承InputFormat,而是会选择继承它的一个实现子类,
- 比如:FlieInputFormat–此类是所有来源为文件的InputFormat的基类,默认的TextInputFormat就是继承了它
- FileInputFormat继承了InputFormat抽象类,1)实现了getSplits方法,根据配置的逻辑切割文件,返回InputSplit的集合,2)并提供了isSplitable()方法,子类可以用过在这个方法中返回boolean类型的值表明是否要对文件进行逻辑切割,如果返回false,则无论文件的大小是否超过一个Block都不会进行切割,而将这个文件作为一个逻辑块返回,3)而对createRecordReader方法则没有提供实现,设置为抽象方法,要求子类实现
- 如果要更精细的改变逻辑切块规则则可以覆盖getSplit方法,自己编写代码,而更多的时候直接使用父类的方法,将精力放置在决定如何将InputSplit转化为一个个的recorder
- 注意:FileInputFormat的导包:import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
案例
读取score1.txt文件,从中每四行读取成绩,其中第一行为姓名,后三行为单科成绩,计算总分,输出 姓名:总分格式的文件
张三
语文 97
数学 77
英语 69
李四
语文 87
数学 57
英语 63
王五
语文 47
数学 54
英语 39
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ScoreInputFormat extends FileInputFormat<Text, Text>{
//不对文件做逻辑上的切割
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
return new ScoreRecordReader();
};
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class ScoreRecordReader extends RecordReader<Text, Text>{
private BufferedReader bfreader = null;
private Text key=null;
private Text value=null;
private float progress=0f;
/**
* RecordReader关闭前调用的方法,一般用来释放资源
*/
@Override
public void close() throws IOException {
bfreader.close();
}
/**
* 如果nextKeyValue()方法返回true,则调用此方法获取当前的键
*/
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
/**
* 如果nextKeyValue()方法返回true,则调用此方法获取当前的值
*/
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* 获取当前的进度
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return progress;
}
/**
* 初始化方法
* split:当前的切片
* context:当前上下文
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
FileSplit fis = (FileSplit) split;
Path path = fis.getPath();
FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
FSDataInputStream inputStream = fileSystem.open(path);
bfreader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
}
/**
* 读取下一个键值对,如果读取到返回true,否则返回false
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
String line1 = bfreader.readLine();
if(line1!=null){
key = new Text(line1);
String line2 = bfreader.readLine();
String line3 = bfreader.readLine();
String line4 = bfreader.readLine();
value = new Text(line2+"\r\n"+line3+"\r\n"+line4);
return true;
}
progress=1.0f;
return false;
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ScoreMapper extends Mapper<Text, Text, Text, LongWritable> {
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String name = key.toString();
long score = 0;
String[] str = value.toString().split("\r\n");
for(String s:str){
score = Long.parseLong(s.split(" ")[1]);
context.write(new Text(name), new LongWritable(score));
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ScoreReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long score = 0;
for(LongWritable value : values){
score += Long.parseLong(value.toString());
}
context.write(key, new LongWritable(score));
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Score_Job");
job.setJarByClass(cn.tedu.score.ScoreDriver.class);
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(ScoreInputFormat.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://10.9.157.179:9000/node01/score1.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://10.9.157.179:9000/scoreResult"));
if (!job.waitForCompletion(true))
return;
}
}
MultipleInputs
- MultipleInputs可以将多个输入组装起来,同时为Mapper提供数据,当有多个来源读取数据时使用。
- 甚至,在指定来源的同时还可以为不同来源的数据指定不同的InputFormat和Mapper以应对不同格式的输入数据。
- MultipleInputs:类上的方法
/**
指定数据来源及InputFormat
*/
MultipleInputs.addInputPath(job, path, inputFormatClass);
/**
指定数据来源及InputFormat和Mapper
*/
MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score1.txt"),
ScoreInputFormat.class, ScoreMapper.class);
案例
改造上述案例,同时从另一个文件score2.txt中读取数据统计成绩。score2.txt中的数据是一行为一个学生的成绩
赵六 56 47 69
陈七 73 84 91
刘八 45 56 66
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ScoreInputFormat02 extends FileInputFormat<Text, Text>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
return new ScoreRecordReader02();
}
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class ScoreRecordReader02 extends RecordReader<Text, Text>{
private BufferedReader bfreader = null;
private Text key=null;
private Text value=null;
private float progress=0f;
/**
* RecordReader关闭前调用的方法,一般用来释放资源
*/
@Override
public void close() throws IOException {
bfreader.close();
}
/**
* 如果nextKeyValue()方法返回true,则调用此方法获取当前的键
*/
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
/**
* 如果nextKeyValue()方法返回true,则调用此方法获取当前的值
*/
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* 获取当前的进度
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return progress;
}
/**
* 初始化方法
* split:当前的切片
* context:当前上下文
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
FileSplit fis = (FileSplit) split;
Path path = fis.getPath();
FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
FSDataInputStream inputStream = fileSystem.open(path);
bfreader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
}
/**
* 读取下一个键值对,如果读取到返回true,否则返回false
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
String line = bfreader.readLine();
if(line!=null){
String[] str = line.split(" ");
key = new Text(str[0]);
value = new Text(str[1]+" "+str[2]+" "+str[3]);
return true;
}
progress=1.0f;
return false;
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ScoreMapper02 extends Mapper<Text, Text, Text, LongWritable> {
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String[] str = value.toString().split(" ");
for(String s:str){
context.write(key, new LongWritable(Long.parseLong(s)));
}
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Score_Job");
job.setJarByClass(cn.tedu.score.ScoreDriver.class);
job.setReducerClass(ScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score1.txt"),
ScoreInputFormat.class, ScoreMapper.class);
MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score2.txt"),
ScoreInputFormat02.class, ScoreMapper02.class);
FileOutputFormat.setOutputPath(job, new Path("hdfs://10.9.157.179:9000/score02Result"));
if (!job.waitForCompletion(true))
return;
}
}
MR中的输出控制(OutputFormat和MultipleOutputs)
OutputFormat
- OutputFormat(输出格式化器):MapReduce结束阶段,OutputFormat类决定了Reducer如何产生输出。
- MR内置的OutputFormat:
1)TextOutputFormat
2)SequenceFileOutputFormat
3)SequenceFileAsBinaryOutputFormat
4)MapFileOutputFormt - 为MR设置指定的OutputFormat:job.setOutputFormatClass(xxxOutputFormat.class);
- 自定义的InputFormat:
- 内置的输出格式化器可以应对大部分需求,但在有些需求下,内置的输出格式化器无法满足,我们需要自己定义输出格式化器。
- 所有的OutputFormat都要直接或间接继承!OutputFormat抽象类!
- 0utputFormat抽象类中主要定义了如下三个个方法:
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return null;
}
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext conetext) throws IOException, InterruptedException {
return null;
}
- 在更多的时候我们不会直接继承OutputFormat,而是会选择继承它的一个实现子类,
- 比如:FlieOutputFormat–此类是所有目的地为文件的OutputFormat的基类,默认的TextOutputFormat就是继承了它
- FileOutputFormat继承了OutputFormat抽象类,实现了checkOutputSpecs和getOutputCommitter方法,并将get=RecordWriter方法设置为抽象方法要求子类实现。
案例
输出格式按“#”分隔,输出为一行
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ScoreOutputFormat extends FileOutputFormat<Text, LongWritable>{
@Override
public RecordWriter<Text, LongWritable> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Path path = getDefaultWorkFile(context, "");
FileSystem fs = path.getFileSystem(context.getConfiguration());
FSDataOutputStream out = fs.create(path, false);
return new ScoreRecordWriter(out);
}
}
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class ScoreRecordWriter extends RecordWriter<Text, LongWritable>{
private DataOutputStream out = null;
public ScoreRecordWriter(DataOutputStream out){
this.out=out;
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
out.close();
}
@Override
public void write(Text key, LongWritable value) throws IOException, InterruptedException {
out.write((key.toString()+"~"+value.get()+"#").getBytes());
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Score_Job");
job.setJarByClass(cn.tedu.score.ScoreDriver.class);
job.setReducerClass(ScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(ScoreOutputFormat.class);
MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score1.txt"),
ScoreInputFormat.class, ScoreMapper.class);
MultipleInputs.addInputPath(job, new Path("hdfs://10.9.157.179:9000/node01/score2.txt"),
ScoreInputFormat02.class, ScoreMapper02.class);
FileOutputFormat.setOutputPath(job, new Path("hdfs://10.9.157.179:9000/score03Result"));
if (!job.waitForCompletion(true))
return;
}
}
MultipleOutputs
- MultipleOutputs可以令一个Reducer产生多个输出文件
- 主要方法:
/**
为特定kv打上指定标记
*/
MultipleOutputs<Text,LongWritable> mos = new MultipleOutputs<Text,LongWritable>(context);
mos.write("flag", key, value);
/**
为指定标记内容指定输出方式
可以指定多个
*/
MultipleOutputs.addNamedOutput(job, "flag", XxxOutputFormat.class, Key.class, Value.class);
案例
改造words案例,将首字母为a-j的输出到"small"中,其他输出到"big"中
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class Wc2Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private MultipleOutputs<Text, IntWritable> mos = null;
@Override
protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
mos = new MultipleOutputs<>(context);
}
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3s,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable v3 : v3s){
count += v3.get();
}
String word = k3.toString();
if(word.matches("^[a-j].*$")){
mos.write("small", new Text(word), new IntWritable(count));
}else{
mos.write("big", new Text(word), new IntWritable(count));
}
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Wc2Driver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Wc2_Job");
job.setJarByClass(cn.tedu.mr.outputformat.wc2.Wc2Driver.class);
job.setMapperClass(cn.tedu.mr.outputformat.wc2.Wc2Mapper.class);
job.setReducerClass(cn.tedu.mr.outputformat.wc2.Wc2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/wcdata"));
MultipleOutputs.addNamedOutput(job, "small", Wc2OutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "big", TextOutputFormat.class, Text.class, IntWritable.class);
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/wcresult"));
if (!job.waitForCompletion(true))
return;
}
}
GroupingComparator
- 在MR的Shuffle过程中,包含sort group操作,其依据默认是key的comparaTo方法来实现,也可以额外配置job.setGroupingComparatorClass;
案例
改造words案例,实现统计a-h和i-z开头的单词数量统计
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Wc3Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String [] words = v1.toString().split(" ");
for(String word : words){
context.write(new Text(word), new IntWritable(1));
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Wc3Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3s,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable i : v3s){
count += i.get();
}
if(k3.toString().matches("^[a-h].*$")){
context.write(new Text("a-h"), new IntWritable(count));
}else{
context.write(new Text("i-z"), new IntWritable(count));
}
}
}
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Text.Comparator;
public class Wc3Comparator extends Comparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
DataInput in = new DataInputStream(new ByteArrayInputStream(b1,s1,l1));
Text ta = new Text();
ta.readFields(in);
DataInput in2 = new DataInputStream(new ByteArrayInputStream(b2,s2,l2));
Text tb = new Text();
tb.readFields(in2);
if(ta.toString().matches("^[a-h].*$") && tb.toString().matches("^[a-h].*$")){
return 0;
}else if(ta.toString().matches("^[i-z].*$") && tb.toString().matches("^[i-z].*$")){
return 0;
}else{
return ta.compareTo(tb);
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Wc3Driver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "wc3_job");
job.setJarByClass(cn.tedu.mr.gc.wc3.Wc3Driver.class);
job.setMapperClass(cn.tedu.mr.gc.wc3.Wc3Mapper.class);
job.setReducerClass(cn.tedu.mr.gc.wc3.Wc3Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//--设置比较器
job.setGroupingComparatorClass(Wc3Comparator.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/wcdata"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/wcresult"));
if (!job.waitForCompletion(true))
return;
}
}