实验:对职位和薪水序列化排序
规则:先按职位的 ASCII 码排序,假如职业一样,则按薪水高低排序
原始数据: 第二列为职位,倒数第三列为薪水
思路:主要是将职业和薪水封装成 java bean 对象,接着重写 compareto 去比较排序。
Map阶段: 编写封装数据的类(继承WritableComparable,重写对应方法),将读到的职业和薪水数据封装为 k2,此时 v2 为 null;
Shuffle阶段: 默认操作;
Reduce阶段: 将 shuffle 默认操作后拉取新的<k2,v2>,这时<k3,v3>和新的<k2,v2>一样。
结果:
其他 MapReduce 实验:
本次实验的代码
sort_bean代码:
package lhr.word_count.homework;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class sort_bean implements WritableComparable<sort_bean> {
String job;
int salary;
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getSalary() {
return salary;
}
public void setSalary(int salary) {
this.salary = salary;
}
@Override
public String toString() {
return job + "\t" + salary;
}
//重写比较器,对每个职业进行比较,若职业相同,则比较薪水,升序。
@Override
public int compareTo(sort_bean o) {
int i = this.job.compareTo(o.job);
if (i==0) {
return this.salary - o.salary;
}
return i;
}
//实现序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.job);
dataOutput.writeInt(this.salary);
}
//实现反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
this.job = dataInput.readUTF();
this.salary = dataInput.readInt();
}
}
Map代码:
package lhr.word_count.homework;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class sort_serialize_Mapper extends Mapper<LongWritable, Text, sort_bean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*
map阶段:
将读到的k1,v1进行切割得到职业和薪水,
将职业和薪水封装为sort_bean对象,即k2,而v3为空就好,
然后写到上下文对象进行网络传输。
*/
String[] split = value.toString().split("\t");
sort_bean sort = new sort_bean();
sort.setJob(split[2]);
sort.setSalary(Integer.parseInt(split[5]));
context.write(sort, NullWritable.get());
}
}
Reduce代码:
package lhr.word_count.homework;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class sort_serialize_Reducer extends Reducer<sort_bean, NullWritable, sort_bean, NullWritable> {
@Override
protected void reduce(sort_bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
/*
reduce阶段:
这里不需要进行修改,map已经排序、序列化好了。
*/
context.write(key, NullWritable.get());
}
}
Main代码:
package lhr.word_count.homework;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class sort_serialize_Main extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "sort_serialize");
job.setJarByClass(sort_serialize_Main.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://hadoop11:8020/sort_serialize"));
// TextInputFormat.addInputPath(job, new Path("file:///D:\\input1"));
job.setMapperClass(sort_serialize_Mapper.class);
job.setMapOutputKeyClass(sort_bean.class);
job.setOutputValueClass(NullWritable.class);
job.setReducerClass(sort_serialize_Reducer.class);
job.setOutputKeyClass(sort_bean.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path path = new Path("hdfs://hadoop11:8020/sort_serialize_result");
// Path path = new Path("file:///D:\\output2");
TextOutputFormat.setOutputPath(job, path);
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop11:8020"), super.getConf(), "root");
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new sort_serialize_Main(), args);
System.exit(run);
}
}