文章目录
学生信息Jsonline文件解析、分类存储
下面是一段某学校的学生信息数据集,格式为Jsonline,下面需要做些简单的处理:
- 将其转为普通
Text文件
形式- 先按照学生的
性别分类
,再按照学生的分科类型分类
,最终将不同的学生信息存储到不同的文件中
♦ 自定义Bean类
- 将文件中的对应属性封装到Bean类中
- 实现WritabelComparable接口,完成序列化、反序列化
package 学生数据处理_Jsonline;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class BeanTest implements WritableComparable<BeanTest> {
// 定义属性
private String id;
private String name;
private String age;
private String sex;
private String className;
@Override
public int compareTo(BeanTest o) {
return 0;
}
// toString()
public String toString(){
return id+"\t"+name+"\t"+age+"\t"+sex+"\t"+className;
}
// 序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(name);
dataOutput.writeUTF(age);
dataOutput.writeUTF(sex);
dataOutput.writeUTF(className);
}
// 反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
id = dataInput.readUTF();
name = dataInput.readUTF();
age = dataInput.readUTF();
sex = dataInput.readUTF();
className = dataInput.readUTF();
}
// setter、getter
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public void set(String id, String name, String age, String sex, String className) {
this.id=id;
this.name=name;
this.age=age;
this.sex=sex;
this.className=className;
}
}
♦ Map阶段
- 读取每行数据转为Json对象
- 获取对应属性值封装到自定义的Bean类(对象)中
- 输出
package 学生数据处理_Jsonline;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MapTest extends Mapper<LongWritable, Text,BeanTest, NullWritable> {
BeanTest k = new BeanTest();
BeanTest bean;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.每读取一行数据就将其转为JsonObject对象进行解析
bean = JSON.parseObject(value.toString(),BeanTest.class);
//System.out.println(bean);
String id = bean.getId();
String name = bean.getName();
String age = bean.getAge();
String sex = bean.getSex();
String className = bean.getClassName();
// 2.赋值
k.set(id,name,age,sex,className);
//System.out.println(k.toString());
// 3.写出
context.write(k,NullWritable.get());
}
}
♦ Partition阶段
- 根据要求自定义分区为四个:男理、男文、女理、女文。
package 学生数据处理_Jsonline;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionTest extends Partitioner<BeanTest, NullWritable> {
@Override
public int getPartition(BeanTest beanTest, NullWritable nullWritable, int numPartitions) {
// 1.定义两个分区
int partition = 3;
// 2.分区 --- 男理、男文、女理、女文
if ("男".equals(beanTest.getSex())){
if (beanTest.getClassName().startsWith("理")){
partition = 0;
}else {
partition = 1;
}
}else {
if (beanTest.getClassName().startsWith("文")){
partition = 2;
}else {
partition = 3;
}
}
return partition;
}
}
♦ Reduce阶段
package 学生数据处理_Jsonline;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ReduceTest extends Reducer<BeanTest, NullWritable,BeanTest,NullWritable> {
@Override
protected void reduce(BeanTest key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 直接输出
for (NullWritable value:values){
context.write(key,NullWritable.get());
//System.out.println("key:"+key);
}
}
}
♦ Driver阶段
package 学生数据处理_Jsonline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DriverTest {
public static void main(String[] args) {
Job job = null;
Configuration conf = new Configuration();
try {
// 1.获取job
job = Job.getInstance(conf);
// 2.配置关联
job.setMapperClass(MapTest.class);
job.setReducerClass(ReduceTest.class);
job.setJarByClass(DriverTest.class);
// 3.配置MR类型
job.setMapOutputKeyClass(BeanTest.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(BeanTest.class);
job.setOutputValueClass(NullWritable.class);
// 4.配置分类
job.setPartitionerClass(PartitionTest.class);
job.setNumReduceTasks(4);
// 5.配置输入输出文件
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\data\\students.json"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\result"));
// 6.提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
} catch (Exception e){
e.printStackTrace();
}
}
}
优化 — 自定义FileOutputFormat
由MapReduce程序默认生成的文件是上图所示的形式,我们也可以自定义输出文件流,来自定义输出文件的名称,这样当有许多分类的时候就可以明了的看出对应的文件,如下图所示。
♦ 自定义文件输出类继承自FileOutputFormat
package 学生数据处理_Jsonline;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TargetFile extends FileOutputFormat<BeanTest, NullWritable> {
@Override
public RecordWriter<BeanTest, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new RW(job);
}
}
♦ 创建新类继承自RecordWriter实现具体文件输出
package 学生数据处理_Jsonline;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class RW extends RecordWriter<BeanTest, NullWritable> {
// 声明输出流对象
FSDataOutputStream stu1;
FSDataOutputStream stu2;
FSDataOutputStream stu3;
FSDataOutputStream stu4;
public RW(TaskAttemptContext job) throws IOException {
// 1.获取文件系统
FileSystem fs = FileSystem.get(job.getConfiguration());
// 2.创建自定义的输出流
stu1 = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results\\男_理科.txt"));
stu2 = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results\\男_文科.txt"));
stu3 = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results\\女_理科.txt"));
stu4 = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results\\女_文科.txt"));
}
/**
* 自定义输出流
*
* @param key
* @param value
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(BeanTest key, NullWritable value) throws IOException, InterruptedException {
if ("男".equals(key.getSex())) {
if (key.getClassName().startsWith("理")) {
stu1.write(key.toString().getBytes());
stu1.write("\n".getBytes());
} else {
stu2.write(key.toString().getBytes());
stu2.write("\n".getBytes());
}
} else {
if (key.getClassName().startsWith("理")) {
stu3.write(key.toString().getBytes());
stu3.write("\n".getBytes());
} else {
stu4.write(key.toString().getBytes());
stu4.write("\n".getBytes());
}
}
}
/**
* 关闭流资源
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(stu1);
IOUtils.closeStream(stu2);
IOUtils.closeStream(stu3);
IOUtils.closeStream(stu4);
}
}
♦ 修改DriverTest类
package 学生数据处理_Jsonline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DriverTest {
public static void main(String[] args) {
Job job = null;
Configuration conf = new Configuration();
try {
// 1.获取job
job = Job.getInstance(conf);
// 2.配置关联
job.setMapperClass(MapTest.class);
job.setReducerClass(ReduceTest.class);
job.setJarByClass(DriverTest.class);
// 3.配置MR类型
job.setMapOutputKeyClass(BeanTest.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(BeanTest.class);
job.setOutputValueClass(NullWritable.class);
// 4.配置分类
/*job.setPartitionerClass(PartitionTest.class);
job.setNumReduceTasks(4);*/
job.setOutputFormatClass(TargetFile.class);
// 5.配置输入输出文件
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\data\\students.json"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results"));
// 6.提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
} catch (Exception e){
e.printStackTrace();
}
}
}