MapReduce 自定义属性类
注意要点:
- 无参构造方法
- 继承 Writable类
- 重写write() readFields()方法
相关错误:
-
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodExcep(原因 没有无参构造方法)
-
java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null(导包错误)
-
空指针 (没有继承 Writable类)
-
属性是空值(没有重写 write() readFields()方法,输出000)
代码如下:
public class Flow implements Writable {
@Override
public void write(DataOutput output) throws IOException {
output.writeLong(upFlow);
output.writeLong(downFlow);
output.writeLong(sumFlow);
}
@Override
public void readFields(DataInput input) throws IOException {
upFlow = input.readLong();
downFlow = input.readLong();
sumFlow = input.readLong();
}
//排序 继承WritableComparable<Flow>接口
@Override
public int compareTo(Flow o) {
return this.sumFlow > o.sumFlow ? -1 : 1;
}
}
public class PhoneFlowMaster {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
System.setProperty("hadoop.home.dir", "C:\\Program Files\\hadoop-2.7.1");
Path inputPath = new Path("/input/flow.txt");
//处理结果存放目录
Path outputPath = new Path("/output/181020");
//处理结果存放目录
Configuration conf=new Configuration() ;
FileSystem fs = FileSystem.get(conf);
/**初始化job参数,
* 1.指定job名称
* 2.指定类 3
* 3.指定数据类型 2
* 4.指定路径 2
* 5.result
*/
Job job =Job.getInstance(conf,"PhoneFlow");
//设置job运行的类
job.setJarByClass(PhoneFlowMaster.class);
//设置Mapper类
job.setMapperClass(PhoneFlowMapper.class);
//设置Reduce类
job.setReducerClass(PhoneFlowReduce.class);
//设置Map输出数据类型 key value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
//Reduce输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//set 0 part-m 中间结果map reduce 无意义 可以注释掉输出数据类型
//job.setNumReduceTasks(0);
//设置输入路径 lib
FileInputFormat.setInputPaths(job,inputPath);
//设置输出路径 lib
FileOutputFormat.setOutputPath(job,outputPath);
//路径如果存在必须先删除
if(fs.exists(outputPath)){
fs.delete(outputPath,true);
}
boolean result=job.waitForCompletion(true);
if(result){
System.out.println("Congratulations! success");
}
}
}
public class PhoneFlowReduce extends Reducer<Text, Flow, Text,Text> {
//相同时候调用一次
@Override
protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
long upflow=0;
long downflow=0;
long sumflow=0;
for(Flow value :values){
upflow+=value.getUpFlow();
downflow+=value.getDownFlow();
sumflow+=value.getSumFlow();
}
context.write(key,new Text(upflow+" "+downflow+" "+sumflow));
}
}
public class PhoneFlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String date =value.toString();
//通过空格分割
String[] temp = date.split(" ");
String phone=temp[0];
context.write(new Text(phone),new Flow(Long.parseLong(temp[1]),Long.parseLong(temp[2])));
}
}
自定义排序
mapReduce自动根据key排序,所以把Flow类作为key,重写排序方法
1.Flow类继承 WritableComparable接口
2.重写 compareTo方法
@Override
public int compareTo(Flow o) {
return this.sumFlow > o.sumFlow ? -1 : 1;
}
3.Mapper: 把Flow 作为key ,value 任意
context.write(new Flow(Long.parseLong(temp[0]),Long.parseLong(temp[1]),Long.parseLong(temp[2])),
new Flow());
4.Reduce: 此时已经排序完成 获取mapper 里 key的phoneNo属性 输出
context.write(new Text(String.valueOf(key.getPhoneNo())),
new Text(upflow+" “+downflow+” "+sumflow));
运行结果:按照最后一列倒序排序