题目:
求每个通信商的上行、下行、总流量 (输出:通信商 上行 下行 总的)
思路:
通过手机号的前三位区分通信运营商
按照运营商分组后,在每个Reduce里计算各运营商总和即可
代码:
//分组比较器
package Comparator02;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyComparator extends WritableComparator {
//将要重新定义的分组的key的类型传进来
public MyComparator(){
super(RecordWritable.class, true);
}
public int compare(WritableComparable a, WritableComparable b){
RecordWritable a1 = (RecordWritable)a;
RecordWritable b1 = (RecordWritable)b;
return a1.getProduct().compareTo(b1.getProduct());
}
}
//实体类
package Comparator02;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class RecordWritable implements WritableComparable {
private String product;//运营商
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.product);
}
public void readFields(DataInput dataInput) throws IOException {
this.product = dataInput.readUTF();
}
public int compareTo(Object o) {
RecordWritable b = (RecordWritable) o;
return this.product.compareTo(b.getProduct());
}
@Override
public String toString() {
return "RecordWritable{" +
"product=" + product +
'}';
}
public String getProduct() {
return product;
}
public void setProduct(String product) {
this.product = product;
}
}
//MapReduce
package Comparator02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MainDemo {
public static void main(String[] args){
try{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Partitioner02-MainDemo");
job.setJarByClass(MainDemo.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(RecordWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置自定义分组
job.setGroupingComparatorClass(MyComparator.class);
FileInputFormat.addInputPath(job, new Path("C:/Users/Administrator/Desktop/Data/input/flow.log"));
FileOutputFormat.setOutputPath(job, new Path("C:/Users/Administrator/Desktop/Data/output/02/00"));
int success = job.waitForCompletion(true) ? 0: 1;
System.exit(success);
}
catch (Exception e){
e.printStackTrace();
}
}
//自定义的Mapper类
public static class MyMapper extends Mapper<Object, Text, RecordWritable, LongWritable>{
RecordWritable k = new RecordWritable();
LongWritable v = new LongWritable();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String tov = "";
String row = value.toString();
String[] words = row.split("\t");
int len = words.length;
v.set(Integer.parseInt(words[len-3]) + Integer.parseInt(words[len-2]));
String[] st = words[1].split(" ");
String st2 = st[0].substring(0,3);
if(st2.matches("(139)|(138)|(137)|(136)|(135)|(134)|(159)|(158)|(157)|(150)|(151)|(152)|(147)|(188)|(187)|(182)|(183)|(184)|(178)")){
k.setProduct("移动");
}
else if(st2.matches("(130)|(131)|(132)|(156)|(155)|(186)|(185)|(145)|(176)")){
k.setProduct("联通");
}
else if(st2.matches("(133)|(153)|(189)|(180)|(181)|(177)|(173)")){
k.setProduct("电信");
}
else {
k.setProduct("其它");
}
context.write(k, v);
}
}
//自定义的Reducer类
public static class MyReducer extends Reducer<RecordWritable, LongWritable, Text, LongWritable>{
public Text k = new Text();
public LongWritable v = new LongWritable();
@Override
protected void reduce(RecordWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for(LongWritable lw: values){
sum += lw.get();
}
k.set(key.getProduct());
v.set(sum);
context.write(k, v);
}
}
}