需求:
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
作业:1/统计每一个用户(手机号)所耗费的总上行流量、下行流量,总流量
思路:自定义pojo 实现 WritableComparable 接口,重现 CompareTo 方法,可是实现倒序
首先:讲数据根据 手机号 流量实体 统计出来
其次:再次将统计出来的结果 按照 流量实体 手机号进行统计
(0)自定义实体
ackage com.mapreduce.flowSum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class FlowBean implements WritableComparable<FlowBean>{
private long upFlow;
private long downFlow;
private long sumFlow;
//序列化框架在反序列化的时候创建对象的实例会去调用我们的无参构造函数
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow, long sumFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;
}
public void set(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
//序列化的方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//反序列化的方法
//注意:字段的反序列化的顺序跟序列化的顺序必须保持一致
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow =in.readLong();
this.sumFlow =in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" +downFlow + "\t" +sumFlow;
}
/**
* 这里进行我们自定义比较大小的规则
*/
@Override
public int compareTo(FlowBean o) {
return (int) (o.getSumFlow()-this.getSumFlow());
}
}
(1)第一步,根据手机号统计上下限流量信息
package com.mapreduce.flowSum;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class FlowSumFirst {
//在kv中传输我们的自定义的对象是可以的 ,不过必须要实现hadoop的序列化机制 也就是implement Writable
public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
Text k= new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//将读取到的每一行数据进行字段的切分
String line = value.toString();
String[] fields = StringUtils.split(line,"\t");
//抽取我们业务所需要的字段
String phoneNum = fields[1];
long upFlow = Long.parseLong(fields[fields.length -3]);
long downFlow = Long.parseLong(fields[fields.length -2]);
k.set(phoneNum);
v.set(upFlow, downFlow);
context.write(k,v);
}
}
public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
FlowBean v = new FlowBean();
//这里reduce方法接收到的key就是某一组《a手机号,bean》《a手机号,bean》 《b手机号,bean》《b手机号,bean》当中的第一个手机号
//这里reduce方法接收到的values就是这一组kv对中的所以bean的一个迭代器
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long upFlowCount = 0;
long downFlowCount = 0;
for(FlowBean bean : values){
upFlowCount += bean.getUpFlow();
downFlowCount += bean.getDownFlow();
}
v.set(upFlowCount, downFlowCount);
context.write(key, v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumFirst.class);
//告诉程序,我们的程序所用的mapper类和reducer类是什么
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);
//告诉框架,我们程序输出的数据类型
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告诉框架,我们要处理的数据文件在那个路劲下
FileInputFormat.setInputPaths(job, new Path("d://bigDataJob/flowbean/input"));
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, new Path("d://bigDataJob/flowbean/output"));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
(2)倒序排序
package com.mapreduce.flowSum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 实现流量汇总并按照流量大小的倒序排序 前提:处理的数据是已经汇总的结果文件
* @author AllenWoon
*
*/
public class FlowSumSort {
public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phoNum = fields[0];
long upFlowSum = Long.parseLong(fields[1]);
long downFlowSum = Long.parseLong(fields[2]);
k.set(upFlowSum, downFlowSum);
v.set(phoNum);
context.write(k, v);
}
}
public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
@Override
protected void reduce(FlowBean bean, Iterable<Text> PhoneNum, Context context)
throws IOException, InterruptedException {
context.write(PhoneNum.iterator().next(), bean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumSort.class);
//告诉程序,我们的程序所用的mapper类和reducer类是什么
job.setMapperClass(FlowSumSortMapper.class);
job.setReducerClass(FlowSumSortReducer.class);
//告诉框架,我们程序输出的数据类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
//TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告诉框架,我们要处理的数据文件在那个路劲下
FileInputFormat.setInputPaths(job, new Path("d://bigDataJob/flowbean/output"));
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, new Path("d://bigDataJob/flowbean/outputsort"));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
MAPREDUCE中的Combiner
1:combiner是MR程序中Mapper和Reducer之外的一种组件
2:combiner组件的父类就是Reducer
3:combiner和reducer的区别在于运行的位置:
Combiner是在每一个maptask所在的节点运行
Reducer是接收全局所有Mapper的输出结果;
4: combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量
具体实现步骤:
- 自定义一个combiner继承Reducer,重写reduce方法
- 在job中设置: job.setCombinerClass(CustomCombiner.class)
//可以注释上面demo reduce 换成 Combinner方式
//job.setReducerClass(WordCountReducer.class);
job.setCombinerClass(WordCountReducer.class);
5:combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来