今天我们想要做的就是把域名相同的分在同一个区。可是如果按照我们以前默认的分区规则是达不到我们的要求的。
默认的分区规则是:key.hashcode()%reduceworker数,这个默认的分区规则的所属类为HashPartitioner
以下为HashPartitioner的源码:
package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
key.hashCode() & Integer.MAX_VALUE
我们可以看到这里有这样的与运算,为什么要这样做呢?因为hashcode太长了,
所以在做整数运算的时候就有可能会产生负数。所以就要提前进行截断运算,与32个1进行与运算
如果出现负数的分区号,是没人领的。reduce的分区号都是正数。
所以我们就要想办法控制分区规则。
任务:统计url的访问次数信息,并且按照其域名,将统计结果输出在不同的文件中。
思路:想办法让map端worker在将数据分区时,按照我们需要的分归属地划分。
实现方式:自定义一个Partitioner.
这里加载数据列表编号的时候,我们使用了静态代码块,这样做的原因是,这个自定义的分区类,调用的频率很高,所以我们每个worker只让他加载一次列表就可以了。
这里的域名有三种,所以reduce就要分三块。如果少了会报错。但是如果为一不会报错,因为如果只有一个的话就不需要分区了。所以根本就不会调用我们自定义的分区方法。
以下为源代码:
package com.test.partitioncount;
import java.util.HashMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text,IntWritable>{
static HashMap<String,Integer> map = new HashMap<>();
static{
map.put("163.com", 0);
map.put("qq.com", 1);
map.put("sina.com", 2);
}
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String field = key.toString().split("/")[0];
Integer code = map.get(field);
return code;
}
}
package com.test.partitioncount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class ReMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(" ");
k.set(split[1]);
context.write(k, v);
}
}
public static class ReReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count++;
}
v.set(count);
context.write(key, v);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setMapperClass(ReMapper.class);
job.setReducerClass(ReReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setPartitionerClass(MyPartitioner.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileSystem fs = FileSystem.get(conf);
Path p = new Path(args[1]);
if(fs.exists(p)){
fs.delete(p, true);
}
FileOutputFormat.setOutputPath(job, p);
job.setNumReduceTasks(3);
boolean res = job.waitForCompletion(true);
System.out.println(res?"mr程序成功执行":"mr程序好像被外星人抓走了");
}
}
源数据截图:
运行成功截图: