主程序代码
package com.mapreduce;
import java.io.IOException;
import javax.imageio.stream.FileImageInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.mapreduce.ModelDemo.MyMapper;
import com.mapreduce.ModelDemo.MyReduce;
public class PartitonerDemo implements Tool {
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1、从输入数据中获取每一个文件中的每一行的值
String line = value.toString();
// 2、对每一行的数据进行切分(看情况)
String[] words = line.split(" ");
// 3、循环处理
for (String word : words) {
value.set(1+"");
// map阶段的输出
context.write(new Text(word), value);
}
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text value, Iterable<Text> list, Context context)
throws IOException, InterruptedException {
int count = 0;
for (Text i : list) {
count += Integer.parseInt(i.toString());
}
context.write(value, new Text(String.valueOf(count)));
}
}
public static void main(String[] args) throws Exception {
int isok = ToolRunner.run(new PartitonerDemo(), args);
// 退出整个job
System.exit(isok);
}
public void setConf(Configuration conf) {
// TODO Auto-generated method stub
conf.set("fs.defaultFS", "hdfs://zwj");
conf.set("dfs.nameservices", "zwj");
conf.set("dfs.ha.namenodes.zwj", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.zwj.nn1","hadoop01:9000");
conf.set("dfs.namenode.rpc-address.zwj.nn2","hadoop02:9000");
conf.set("dfs.client.failover.proxy.provider.zwj", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
}
public Configuration getConf() {
// TODO Auto-generated method stub
return new Configuration();
}
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = getConf();
Job job = Job.getInstance(conf,"job");
job.setJarByClass(PartitonerDemo.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置Partitioner的Class -> 值是自定义MyPartitioner.class MyPartitioner需要extendsPartitioner<Text, Text>
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(4);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
setInputAndOutput(job, conf, args);
return (job.waitForCompletion(true)? 0 : 1);
}
private void setInputAndOutput(Job job, Configuration conf, String[] args) throws Exception{
if(args.length!=2) {
System.out.println("数据格式不正确");
return ;
}
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(args[1]);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
}
}
分区类的代码
package com.mapreduce;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, Text> {
/**
* key 代表 reduce时输出的key value代表reduce最终输出的value
*/
@Override
public int getPartition(Text key, Text value, int numPartitions) {
// TODO Auto-generated method stub
if (key.toString().length() > 0) {
String firstChar = key.toString().substring(0, 1);
if (firstChar.matches("^[A-Z]")) {
return 0 % numPartitions;
} else if (firstChar.matches("^[a-z]")) {
return 1 % numPartitions;
} else if (firstChar.matches("^[0-9]")) {
return 2 % numPartitions;
} else {
return 3 % numPartitions;
}
} else {
return 3 % numPartitions;
}
}
}