实验:对薪水分区
假如薪资<1500,为低薪,
假如薪资>=1500,薪资<3000为中薪,
假如薪资>=3000,为高薪。
原始数据:
倒数第三列为薪水
思路:主要是选取 salary 数据,对薪水进行判断,分为 3 个区(0,1,2)就好。
Map阶段: 将每一行作为 k2,v2 为null;
Shuffle阶段: 自定义分区,实现薪水大小的分区,三个区。其他默认,完成后,得到新k2,v2,和原来 k2、v2 类型一样;
Reduce阶段: 设置 3 个 ReduceTasks 来拉取 新k2,v2,得到 k3,v3,即实现了自定义分区。
结果:
其他 MapReduce 实验:
本次实验的代码
Map代码:
package lhr.word_count.homework;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class salary_partition_Mapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*
map阶段:
直接将每一行作为k2就好,v2为空
*/
context.write(value, NullWritable.get());
}
}
Partition代码:
package lhr.word_count.homework;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class salary_partition_Partitioner extends Partitioner<Text, NullWritable> {
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
/*
自定义分区,实现薪水大小的分区,满足要求即可。
*/
String s = text.toString().split("\t")[5];
if (Integer.parseInt(s) < 1500) {
return 0;
} else if (Integer.parseInt(s) >= 1500 && Integer.parseInt(s) < 3000) {
return 1;
} else {
return 2;
}
}
}
Reduce代码:
package lhr.word_count.homework;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class salary_partition_Reducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
/*
reduce阶段:
不需要修改操作,直接拉取结果。
*/
context.write(key, NullWritable.get());
}
}
Main代码:
package lhr.word_count.homework;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class salary_partition_Main extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "salary_partition");
job.setJarByClass(salary_partition_Main.class);
job.setInputFormatClass(TextInputFormat.class);
// TextInputFormat.addInputPath(job, new Path("file:///D:\\input1"));
TextInputFormat.addInputPath(job, new Path("hdfs://hadoop11:8020/salary_partition"));
job.setMapperClass(salary_partition_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//指定分区的类
job.setPartitionerClass(salary_partition_Partitioner.class);
job.setReducerClass(salary_partition_Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置3个区
job.setNumReduceTasks(3);
job.setOutputFormatClass(TextOutputFormat.class);
Path path = new Path("hdfs://hadoop11:8020/salary_partition_result");
// Path path = new Path("file:///D:\\output1");
TextOutputFormat.setOutputPath(job, path);
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop11:8020"), super.getConf(), "root");
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new salary_partition_Main(), args);
System.exit(run);
}
}