大数据之排序、combiner、压缩

1、自定义分区

需求:统计结果进行分区,根据手机号前三位来进行分区
总结:
1)自定义类继承partitioner<key,value>
2)重写方法getPartition()
3)业务逻辑
4)在driver类中加入
setPartitionerClass
5)注意:需要指定setNumReduceTasks(个数=分区数+1)
新增PhonenumPartitioner类


  
  
package com.hsiehchou.logs1;
import org.apache.hadoop.io. Text;
import org.apache.hadoop.mapreduce. Partitioner;
/**
* 自定义分区,根据手机号前三位
* 默认分区方式,hash
*/
public class PhonenumPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition( Text key, FlowBean value, int numPartitions) {
//1.获取手机号的前三位
String phoneNum = key.toString().substring( 0, 3);
//2.分区
int partitioner = 4;
if ( "135".equals(phoneNum)){
return 0;
} else if ( "137".equals(phoneNum)){
return 1;
} else if ( "138".equals(phoneNum)){
return 2;
} else if( "139".equals(phoneNum)){
return 3;
}
return partitioner;
}
}

FlowCountDriver类中增加


  
  
//加入自定义分区
job.setPartitionerClass(PhonenumPartitioner. class) ;
//注意,结果文件几个?
job.setNumReduceTasks( 5) ;
//7.设置数据输入的路径
FileInputFormat.setInputPaths( job, new Path( "E:/test/flow/in")) ;
//8.设置数据输出的路径
FileOutputFormat.setOutputPath( job, new Path( "E:/test/flow/out2")) ;

2、排序

需求:每个分区内进行排序?
总结:
1)实现WritableComparable接口
2)重写compareTo方法

combineTextInputFormat设置切片的大小 maptask

实现
FlowBean类


  
  
package com.hsiehchou.logs2;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
//定义属性:上行流量 下行流量 总流量总和
private long upFlow;
private long dfFlow;
private long flowsum;
public FlowBean(){}
public FlowBean(long upFlow,long dfFlow){
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowsum = upFlow + dfFlow;
}
public long getUpFlow(){
return upFlow;
}
public void setUpFlow(long upFlow){
this.upFlow = upFlow;
}
public long getDfFlow(){
return dfFlow;
}
public void setDfFlow(long dfFlow){
this.dfFlow = dfFlow;
}
public long getFlowsum(){
return flowsum;
}
public void setFlowsum(long flowsum){
this.flowsum = flowsum;
}
//序列化
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowsum);
}
//反序列化
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dfFlow = in.readLong();
flowsum = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + dfFlow + "\t" + flowsum;
}
public int compareTo(FlowBean o) {
//倒序
return this.flowsum > o.getFlowsum() ? - 1: 1;
}
}

FlowSortMapper类


  
  
package com.hsiehchou.logs2;
import org.apache.hadoop.io. LongWritable;
import org.apache.hadoop.io. Text;
import org.apache.hadoop.mapreduce. Mapper;
import java.io. IOException;
public class FlowSortMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
@Override
protected void map( LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.接入数据
String line = value.toString();
//2.切割 \t
String[] fields = line.split( "\t");
//3.拿到关键字段:手机号 上行流量 下行流量
String phoneNr = fields[ 0];
long upFlow = Long.parseLong(fields[ 1]);
long dfFlow = Long.parseLong(fields[ 2]);
//4.写出到reducer
context.write( new FlowBean(upFlow,dfFlow), new Text(phoneNr));
}
}

FlowSortReducer类


  
  
package com.hsiehchou.logs2;
import org.apache.hadoop.io. Text;
import org.apache.hadoop.mapreduce. Reducer;
import java.io. IOException;
public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce( FlowBean key, Iterable< Text> values, Context context) throws IOException, InterruptedException {
//手机号 流量
context.write(values.iterator().next(),key);
}
}

FlowSortPartitioner类


  
  
package com.hsiehchou.logs2;
import org.apache.hadoop.io. Text;
import org.apache.hadoop.mapreduce. Partitioner;
public class FlowSortPartitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition( FlowBean key, Text value, int numPartitions) {
//1.获取手机号的前三位
String phoneNum = value.toString().substring( 0, 3);
//2.分区
int partitioner = 4;
if ( "135".equals(phoneNum)){
return 0;
} else if ( "137".equals(phoneNum)){
return 1;
} else if ( "138".equals(phoneNum)){
return 2;
} else if( "139".equals(phoneNum)){
return 3;
}
return partitioner;
}
}

FlowSortDriver类


  
  
package com.hsiehchou.logs2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建job任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.指定kjar包位置
job.setJarByClass(FlowSortDriver. class);
//3.关联使用的Mapper
job.setMapperClass(FlowSortMapper. class);
//4.关联使用的Reducer类
job.setReducerClass(FlowSortReducer. class);
//5.设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(FlowBean. class);
job.setMapOutputValueClass(Text. class);
//6.设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text. class);
job.setOutputValueClass(FlowBean. class);
//加入自定义分区
job.setPartitionerClass(FlowSortPartitioner. class);
//注意,结果文件几个
job.setNumReduceTasks( 5);
//7.设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path( "E:/test/flow/out"));
//8.设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path( "E:/test/flow/out4"));
//9.提交任务
boolean rs = job.waitForCompletion( true);
System.exit(rs? 0: 1);
}
}

3、combiner 合并

1)combiner是一个组件
注意:是Mapper和Reducer之外的一种组件
但是这个组件的父类是Reduer

2)如果想使用combiner继承Reduer即可

3)通过编写combiner发现与reducer代码相同
只需在driver端指定
setCombinerClass(WordCountReduer.class)
注意:前提是不能影响业务逻辑<a,1><c,1> <a,2><a,1> = <a,3>
数学运算:
(3 + 5 + 7)/3 = 5
(2 + 6)/2 = 4
不进行局部累加:(3 + 5 + 7 + 2 + 6)/5 = 23/5
进行了局部累加:(5+4)/2 = 9/2=4.5 不等于 23/5=4.6

4、数据压缩

为什么对数据进行压缩?
mapreduce操作需要对大量数据进行传输
压缩技术有效的减少底层存储系统读写字节数,hdfs。
压缩提高网络带宽和磁盘空间效率。
数据压缩节省资源,减少网络I/O。
通过压缩可以影响到mapreduce性能。(小文件优化,combiner)代码角度进行优化。
注意:利用好压缩提高性能,运用不好会降低性能。
压缩 -》 解压缩
mapreduce常用的压缩编码

压缩格式 是否需要安装 文件拓展名 是否可以切分
DEFAULT 直接使用 .deflate
bzip2 直接使用 .bz2
Gzip 直接使用 .gz
LZO 需要安装 .lzo
Snappy 需要安装 .snappy

性能测试

压缩格式 原文件大小 压缩后大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 20MB/s 60MB/s
LZO 8.3GB 3GB 50MB/s 70MB/s
bzip2 8.3GB 1.1GB 3MB/s 10MB/s

猜你喜欢

转载自www.cnblogs.com/hsiehchou/p/10403457.html