mapreduce在编程的时候,都有一个固化的模式。真正有意思的是,怎么考虑对问题用MapReduce的方式进行建模。通过以下的案例的分析对mapreduce做一个总结。(陆续更)
文章目录
案例一:找出每个月中气温最高的2天
问题描述
1949-10-01 14:21:02 34c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 11:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 11:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 11:21:02 46c
1951-07-03 12:21:03 47c
问题思路
怎么通过MR来实现?
首先要考虑的问题是,如果通过MR框架来计算,那么就需要对数据进行分组。M端要输出中间级映射。读的时候一行行读,那么kv映射怎么来做?显然可以把年月作为一个key,温度作为value。那么不需要的数据有哪些?时间以及最后温度的符号"c"就可以去掉了。
紧接着考虑的第二个问题是,如果排序的话是什么一个排序?对于年月和温度来说,都是数值,因为要找出每个月的气温最高,因此可以按照年月进行排序,相同年月的分成一组;
接着第三个问题,怎么来比较年月?如果按照年月来排序,首先想到的方法是对字符串进行切割,或者转化成时间戳进行比较,但这样会显得不够简洁。这里不妨通过自定义对象的方法,把类中的年月日当作属性,通过实现比较器方法,再通过序列化反序列化来传输。
接下来考虑第四个问题,目的求最高的两天温度,如果map只是按年月排好,交给reduce,那么reduce中要对年月相同的组的温度进行挨个比较,这样工作量会比较大,可以怎样简化reduce端的操作?考虑的一个方法是,因为温度也是数值,且要求最高温度,因此可以在Map端对温度做个倒序,这样传给reduce后,最高的温度就在最前面。
但是一天中可能存在很多个最高温度,因此在Reduce端要想办法进行判断,只在同一天的温度中取最高。
到此,思路已经基本理清,开始逐步进行实现。
逐步实现
-
先定义一个主类MyTQ,做好job的基本的配置:
Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyTQ.class); job.setJobName("tq");
-
设置好输入输出配置:
Path inPath = new Path("/tq/input"); FileInputFormat.addInputPath(job,inPath); Path outPath = new Path("/tq/output"); if(outPath.getFileSystem(conf).exists(outPath)) outPath.getFileSystem(conf).delete(outPath, true); FileOutputFormat.setOutputPath(job,outPath);
-
自定义一个天气类,以实现key-value映射。实现天气类时,要继承WritableComparable接口,因为其可以支持实现比较器和序列化反序列化操作:
public class Tq implements WritableComparable<Tq>{ private int year; private int month; private int day; private int wd; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getWd() { return wd; } public void setWd(int wd) { this.wd = wd; } @Override public void write(DataOutput out) throws IOException{ out.writeInt(this,getYear()); out.writeInt(this,getMonth()); out.writeInt(this,getDay()); out.writeInt(this,getWd()); } @Override public void readFields(DataInput in) throws IOException{ // 完成的反序列化,还原的是写东西的属性,因此需要给写回去 // 注意顺序:写的时候--年月日温度,读时也年月日温度; this.setYear(in.readInt()); this.setMonth(in.readInt()); this.setDay(in.readInt()); this.setWd(in.readInt()); } @Override public int compareTo(Tq o){ // 比较器:如果不涉及原文的业务逻辑,作为一个单纯的自定义类,应该有一个标准的比较方式。 // 于是先考虑设置一个标准的排序规则:数值序或字典序; // 毕竟外部传进来的对象,现在开始两个天气进行比较,分别用年月日进行比较,要先取得对象的年月日。 // 应该用数值比较器来进行比较 int c1 = Integer.compare(this.getYear(), o.getYear()); if(c1==0){ int c2 = Integer.compare(this.getMonth(),o.getMonth()); if(c2 == 0){ return Integer.compare(this.getDay(),o.getDay()); } return c2; } return c1; } }
-
设置Mapper:
job.setMapperClass(Tmapper.class);
Tmapper的输入是文本,输出是天气类,值是温度,因此可以将类定义如下:
public class Tmapper extends Mapper<LongWritable,Text,Tq,IntWritable>
在这个方法里要把不需要的给过滤掉,同时把数据进行封装。这里可以先把过来的数据进行切割,然后分别序列化到自定义天气类中:
String[] words = StringUtils.split(value.toString(),'\t');
对切割后的日期用日期类SimpleDateFormat进行处理,再用Calendar传到自定义类中;温度用最后截取的方式进行处理:
String pattern = "yyyy-MM-dd"; SimpleDateFormat sdf = new SimpleDateFormat(pattern); Date date = sdf.parse(words[0]); Calendar cal = Calender.getInstance(); cal.setTime(date); // 分别获取年月日存入tq中 tkey.setYear(cal.get(Calendar.YEAR)); // Calender的Month要+1 tkey.setMonth(cal.get(Calendar.MONTH)+1); tkey.setDay(cal.get(Calendar.DAY_OF_MONTH)); int wd = Integer.parseInt(words[1].substring(0,words[1].lastIndexOf("c"))); tkey.setWd(wd); tval.set(wd);
最后将key,value传入:
context.write(tkey,tval);
在setmapper过程再传入数据类型:
job.setMapOutputKeyClass(Tq.class); job.setMapOutputValueClass(IntWritable.class);
-
定义排序比较器TSortComparator,将年月正序,温度倒叙输出:
在TSortComparator中重写compare方法,对年月进行排序,并对温度进行倒排:@Override public int compare(WritableComparable a, WritableComparable b){ // 要提供两个天气,这两个天气是接口的实现类。 t1 = (Tq)a; t2 = (Tq)b; int c1 = Integer.compare(t1.getYear(),t2.getYear()); if (c1==0){ int c2 = Integer.compare(t1.getMonth(),t2.getMonth()); if(c2==0){ return -Integer.compare(t1.getWd(),t2.getWd()); } return c2; } return c1; }
-
定义分区器:通过hash取模的方式,将相同的值分到同一个区域中。分区的数量取决于reduce的数量,因此先对reduceTask的数量进行设定:
job.setNumReduceTasks(2);
public class Tpartioner extends Partitioner<Tq, IntWritable>{ @Override public int getPartition(Tq key, IntWritable value, int numPartitions){ // getPartition获取key所属分区的编号 // numParttition是根据reduce定的,分区数量 // 因为是int,无需取哈希直接取模 return key.getYear() % numPartitions; } }
-
因为对温度也做了排序,所以会对相同的年月和相同的温度作为一组来对待,这和把相同的年月来作为一组对待的思路不同。因此可以通过自定义组排序的方式来进行二次排序;(比sort方法少了个步骤,不用对温度排序);
-
reduce端处理,输入为Tq类的数据,输出为Text:
<Tq,IntWritable,Text,IntWritable>
。reduce端要先遍历传来的key,key这里指的是年月,第一条数据一定是这个月内的最高温,因此可以直接对第一条数据输出。通过flag和data的设置来避免 同一天中出现最高温度的情况:
public class Treducer extends Reducer<Tq,IntWritable,Text,IntWritable>{
// 到了reducer端,数据是什么样子?
// 按照年月排,可能一个月中多个高温排在前面
Text tkey = new text();
// 需要单独拎出来设置
intWritable tval = new intWritable();
@Override
protected void reduce(Tq key, Iterable<IntWritable> vals, Context context)
throws IOException,InterruptedException{
// 先遍历,key是天气,范围是年月,values是这个年月下的所有温度
// 既然是带过来的key就是上面的温度;
// 第一条肯定是这个月的最高温
int flag =0;
int day = 0;
for(IntWritable val:vals){
if(flag == 0){
// t.set(key.getYear()"-"+key.....);
// 换成在Tq加一个tostring方法
tkey.set(key.toString());
tval.set(val.get());
context.write(tkey, tval);
flag++;
day = key.getDay();
}
if(flag!=0 && day!=key.getDay()){
// 什么时候存并往外输出
tkey.set(key.toString());
tval.set(val.get());
context.write(tkey, tval);
// 结束当前方法回到调用处
return;
}
}
}
}
最终代码
MyTQ.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.job;
public class MyTQ{
public static void main(String[] args) throws IOException{
// 1. 配置
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MyTQ.class);
job.setJobName("tq");
// 2. 设置输入输出路径
Path inPath = new Path("/tq/input");
FileInputFormat.addInputPath(job,inPath);
Path outPath = new Path("/tq/output");
if(outPath.getFileSystem(conf).exists(outPath))
outPath.getFileSystem(conf).delete(outPath, true);
FileOutputFormat.setOutputPath(job,outPath);
// 3. 定义map和reduce
// Mapper做的事情是生成中间级映射,向外输出kv
// 在setMapper之前先要设置下自定义传输的key
job.setMapperClass(Tmapper.class);
job.setMapOutputKeyClass(Tq.class);
job.setMapOutputValueClass(IntWritable.class);
// 4. 需要提供一个排序比较器:年月正序,温度倒叙输出
job.setSortComparatorClass(TSortComparator.class);
// 5. 自定义分区器
// 分区器定义数据的分发策略,对哈希进行取模,把数据分发到一个中
job.setPartitionerClass(TPartioner.class);
// 6. 自定义组排序器
job.setGroupingComparator(TGroupComparator.class);
// 设置reduce task的数量
job.setNumReduceTasks(2);
// 7. 设置reducer;
job.setReducerClass(Treducer.class);
Tq.java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.writableComparable;
public class Tq implements WritableComparable<Tq>{
private int year;
private int month;
private int day;
private int wd;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getDay() {
return day;
}
public void setDay(int day) {
this.day = day;
}
public int getWd() {
return wd;
}
public void setWd(int wd) {
this.wd = wd;
}
@Override
public void write(DataOutput out) throws IOException{
out.writeInt(this,getYear());
out.writeInt(this,getMonth());
out.writeInt(this,getDay());
out.writeInt(this,getWd());
}
@Override
public void readFields(DataInput in) throws IOException{
this.setYear(in.readInt());
this.setMonth(in.readInt());
this.setDay(in.readInt());
this.setWd(in.readInt());
}
@Override
public int compareTo(Tq o){
int c1 = Integer.compare(this.getYear(), o.getYear());
if(c1==0){
int c2 = Integer.compare(this.getMonth(),o.getMonth());
if(c2 == 0){
return Integer.compare(this.getDay(),o.getDay());
}
return c2;
}
return c1;
}
@Override
public String toString(){
return year+"-"+month+"-"+day;
}
}
Tmapper.java
import org.apache.hadoop.mapreduce.Mapper;
// 进来的就类似这一行数据1949-10-01 14:21:02 34c
public class Tmapper extends Mapper<LongWritable,Text,Tq,IntWritable>{
Tq tkey = new Tq();
IntWritable tval = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Tq, IntWritable>.Context context) throws IOException, InterruptedException{
String[] words = StringUtils.split(value.toString(),'\t');
String pattern = "yyyy-MM-dd";
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
try{
Date date = sdf.parse(words[0]);
Calendar cal = Calender.getInstance();
cal.setTime(date);
tkey.setYear(cal.get(Calendar.YEAR));
tkey.setMonth(cal.get(Calendar.MONTH)+1);
tkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
int wd = Integer.parseInt(words[1].substring(0,words[1].lastIndexOf("c")));
tkey.setWd(wd);
tval.set(wd);
context.write(tkey,tval);
} catch(ParseException e){
e.printStackTrace();
}
}
}
TSortComparator.java
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
// 实现天气年月正序,温度倒叙的排序规则
class TSortComparator extends WritableComparator{
Tq t1 = null;
Tq t2 = null;
public TSortComparator(){
super(Tq.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b){
t1 = (Tq)a;
t2 = (Tq)b;
int c1 = Integer.compare(t1.getYear(),t2.getYear());
if (c1==0){
int c2 = Integer.compare(t1.getMonth(),t2.getMonth());
if(c2==0){
return -Integer.compare(t1.getWd(),t2.getWd());
}
return c2;
}
return c1;
}
}
TPartioner.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class Tpartioner extends Partitioner<Tq, IntWritable>{
@Override
public int getPartition(Tq key, IntWritable value, int numPartitions){
return key.getYear() % numPartitions;
}
}
TGroupComparator.java
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
class TGroupComparatorextends WritableComparator{
Tq t1 = null;
Tq t2 = null;
public TGroupComparator(){
super(Tq.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b){
t1 = (Tq)a;
t2 = (Tq)b;
int c1 = Integer.compare(t1.getYear(),t2.getYear());
if (c1==0){
return Integer.compare(t1.getMonth(),t2.getMonth());
}
return c1;
}
}
Treducer.java
import org.apache.hadoop.mapreduce.Reducer;
// 1949-10-01 34
public class Treducer extends Reducer<Tq,IntWritable,Text,IntWritable>{
Text tkey = new text();
intWritable tval = new intWritable();
@Override
protected void reduce(Tq key, Iterable<IntWritable> vals, Context context)
throws IOException,InterruptedException{
int flag =0;
int day = 0;
for(IntWritable val:vals){
if(flag == 0){
tkey.set(key.toString());
tval.set(val.get());
context.write(tkey, tval);
flag++;
day = key.getDay();
}
if(flag!=0 && day!=key.getDay()){
tkey.set(key.toString());
tval.set(val.get());
context.write(tkey, tval);
return;
}
}
}
}