题目:
求每个通信商的上行、下行、总流量 (输出:通信商 上行 下行 总的)
思路:
通过手机号的前三位区分通信运营商
按照运营商分区后,在每个Reduce里计算各运营商总和即可
代码:
//分区比较器
package Partitioner02;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, RecordWritable> {
public int getPartition(Text text, RecordWritable recordWritable, int numPartitions) {
String st = recordWritable.getProduct();
if(st.equals("移动")){
return 0;
}
else if(st.equals("联通")){
return 1;
}
else if(st.equals("电信")){
return 2;
}
else {
return 3;
}
}
}
//实体类
package Partitioner02;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class RecordWritable implements Writable {
private String product;//运营商
private String phone;//手机号
private long upFlow;//上行流量
private long downFlow;//下行流量
private long sumFlow;//总流量
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.product);
dataOutput.writeUTF(this.phone);
dataOutput.writeLong(this.upFlow);
dataOutput.writeLong(this.downFlow);
dataOutput.writeLong(this.sumFlow);
}
public void readFields(DataInput dataInput) throws IOException {
this.product = dataInput.readUTF();
this.phone = dataInput.readUTF();
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
@Override
public String toString() {
return "RecordWritable{" +
"product=" + product +
", phone='" + phone + '\'' +
", upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}
public String getProduct() {
return product;
}
public void setProduct(String product) {
this.product = product;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}
//MapReduce
package Partitioner02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MainDemo {
public static void main(String[] args){
try{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Partitioner02-MainDemo");
job.setJarByClass(MainDemo.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(RecordWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置分区
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(4);
FileInputFormat.addInputPath(job, new Path("C:/Users/Administrator/Desktop/Data/input/flow.log"));
FileOutputFormat.setOutputPath(job, new Path("C:/Users/Administrator/Desktop/Data/output/01/00"));
int success = job.waitForCompletion(true) ? 0: 1;
System.exit(success);
}
catch (Exception e){
e.printStackTrace();
}
}
//自定义的Mapper类
public static class MyMapper extends Mapper<Object, Text, Text, RecordWritable>{
Text k = new Text("1");
RecordWritable v = new RecordWritable();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String row = value.toString();
String[] words = row.split("\t");
int len = words.length;
v.setUpFlow(Integer.parseInt(words[len-3]));
v.setDownFlow(Integer.parseInt(words[len-2]));
v.setSumFlow(v.getUpFlow()+v.getDownFlow());
String[] st = words[1].split(" ");
v.setPhone(st[0]);
String st2 = st[0].substring(0,3);
if(st2.matches("(139)|(138)|(137)|(136)|(135)|(134)|(159)|(158)|(157)|(150)|(151)|(152)|(147)|(188)|(187)|(182)|(183)|(184)|(178)")){
v.setProduct("移动");
}
else if(st2.matches("(130)|(131)|(132)|(156)|(155)|(186)|(185)|(145)|(176)")){
v.setProduct("联通");
}
else if(st2.matches("(133)|(153)|(189)|(180)|(181)|(177)|(173)")){
v.setProduct("电信");
}
else {
v.setProduct("其它");
}
context.write(k, v);
}
}
//自定义的Reducer类
public static class MyReducer extends Reducer<Text, RecordWritable, Text, NullWritable>{
public Text k = new Text();
public Long sum;
@Override
protected void reduce(Text key, Iterable<RecordWritable> values, Context context) throws IOException, InterruptedException {
String st = "";
sum = 0L;
for(RecordWritable rw: values){
st = rw.getProduct();
sum += rw.getSumFlow();
}
st += " " + String.valueOf(sum);
k.set(st);
context.write(k, NullWritable.get());
}
}
}