MapReduce(分布式计算模型)---序列化和分区
一、序列化
- 在MapReduce中,要求数据能够被序列化
- MapReduce的序列化机制默认采用的AVRO
- MapReduce对AVRO的序列化机制进行了封装,提供了更简便的序列化形式 - 实现接口Writable
案例一、创建一个flow类并对其序列化
package cn.zyj.flow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Flow implements Writable{
private String phone;
private String name;
private String addr;
private int flow;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public int getFlow() {
return flow;
}
public void setFlow(int flow) {
this.flow = flow;
}
//反序列化
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
//按照什么顺序写的就按照什么顺序读
this.phone = in.readUTF();
this.addr = in.readUTF();
this.name = in.readUTF();
this.flow = in.readInt();
}
//序列化
//只需要将有必要的属性来依次写出即可序列化
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(phone);
out.writeUTF(addr);
out.writeUTF(name);
out.writeInt(flow);
}
}
二、分区
- 分区操作是shuffle操作中的一个重要过程,作用就是将map的结果按照规则分发到不同reduce中进行处理,从而按照分区得到多个输出结果。
- Partitioner是分区的基类,如果需要定制partitioner也需要继承该类
- HashPartitioner是MapReduce的默认partitioner。计算方法是:which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
- 默认情况下,reduceTask数量为1
- 很多时候MapReduce自带的分区规则并不能满足业务需求,为了实现特定的效果,可以需要自己来定义分区规则
- 如果定义了几个分区,则需要定义对应数量的ReduceTask
案例一、求每个城市中每个人使用的流量
Mapper:
package cn.tedu.partflow;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class PartFlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
Flow f = new Flow();
f.setPhone(arr[0]);
f.setAddr(arr[1]);
f.setName(arr[2]);
f.setFlow(Integer.parseInt(arr[3]));
context.write(new Text(f.getName()), f);
}
}
Partitioner:
package cn.tedu.partflow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class AddrPartitioner extends Partitioner<Text, Flow> {
// 指定分类规则
@Override
public int getPartition(Text key, Flow value, int numReduceTasks) {
// 按照地区分类
// 先拿到地区
String addr = value.getAddr();
if (addr.equals("bj"))
return 0;
else if (addr.equals("sh"))
return 1;
else
return 2;
}
}
Reducer:
package cn.tedu.partflow;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class PartFlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
public void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Flow val : values) {
sum += val.getFlow();
}
context.write(key, new IntWritable(sum));
}
}
Driver:
package cn.tedu.partflow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PartFlowDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.tedu.serialflow.SerialFlowDriver.class);
job.setMapperClass(PartFlowMapper.class);
job.setReducerClass(PartFlowReducer.class);
// 设置分区类
job.setPartitionerClass(AddrPartitioner.class);
// 设置ReduceTask的数量
job.setNumReduceTasks(3);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://10.42.3.8:9000/txt/flow.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://10.42.3.8:9000/result/partflow"));
if (!job.waitForCompletion(true))
return;
}
}