【MapReduce&FastJson】学生信息Jsonline文件解析、分类存储



学生信息Jsonline文件解析、分类存储

下面是一段某学校的学生信息数据集,格式为Jsonline,下面需要做些简单的处理: 在这里插入图片描述

  1. 将其转为普通Text文件形式
  2. 先按照学生的性别分类,再按照学生的分科类型分类,最终将不同的学生信息存储到不同的文件中

♦ 自定义Bean类

  • 将文件中的对应属性封装到Bean类中
  • 实现WritabelComparable接口,完成序列化、反序列化
package 学生数据处理_Jsonline;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class BeanTest implements WritableComparable<BeanTest> {
    
    

    // 定义属性
    private String id;
    private String name;
    private String age;
    private String sex;
    private String className;

    @Override
    public int compareTo(BeanTest o) {
    
    
        return 0;
    }
    // toString()
    public String toString(){
    
    
        return id+"\t"+name+"\t"+age+"\t"+sex+"\t"+className;
    }
    // 序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
    
    
        dataOutput.writeUTF(id);
        dataOutput.writeUTF(name);
        dataOutput.writeUTF(age);
        dataOutput.writeUTF(sex);
        dataOutput.writeUTF(className);
    }
    // 反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
    
    
        id = dataInput.readUTF();
        name = dataInput.readUTF();
        age = dataInput.readUTF();
        sex = dataInput.readUTF();
        className = dataInput.readUTF();
    }
    // setter、getter 
    public String getId() {
    
    
        return id;
    }

    public void setId(String id) {
    
    
        this.id = id;
    }

    public String getName() {
    
    
        return name;
    }

    public void setName(String name) {
    
    
        this.name = name;
    }

    public String getAge() {
    
    
        return age;
    }

    public void setAge(String age) {
    
    
        this.age = age;
    }

    public String getSex() {
    
    
        return sex;
    }

    public void setSex(String sex) {
    
    
        this.sex = sex;
    }

    public String getClassName() {
    
    
        return className;
    }

    public void setClassName(String className) {
    
    
        this.className = className;
    }

    public void set(String id, String name, String age, String sex, String className) {
    
    
        this.id=id;
        this.name=name;
        this.age=age;
        this.sex=sex;
        this.className=className;
    }
}

返回顶部


♦ Map阶段

  • 读取每行数据转为Json对象
  • 获取对应属性值封装到自定义的Bean类(对象)中
  • 输出
package 学生数据处理_Jsonline;

import com.alibaba.fastjson.JSON;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class MapTest extends Mapper<LongWritable, Text,BeanTest, NullWritable> {
    
    
    BeanTest k = new BeanTest();
    BeanTest bean;
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    
        // 1.每读取一行数据就将其转为JsonObject对象进行解析
        bean = JSON.parseObject(value.toString(),BeanTest.class);
        //System.out.println(bean);
        String id = bean.getId();
        String name = bean.getName();
        String age = bean.getAge();
        String sex = bean.getSex();
        String className = bean.getClassName();
        // 2.赋值
        k.set(id,name,age,sex,className);
        //System.out.println(k.toString());
        // 3.写出
        context.write(k,NullWritable.get());
    }
}

返回顶部


♦ Partition阶段

  • 根据要求自定义分区为四个:男理、男文、女理、女文。
package 学生数据处理_Jsonline;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class PartitionTest extends Partitioner<BeanTest, NullWritable> {
    
    
    @Override
    public int getPartition(BeanTest beanTest, NullWritable nullWritable, int numPartitions) {
    
    
        // 1.定义两个分区
        int partition = 3;
        // 2.分区 --- 男理、男文、女理、女文
        if ("男".equals(beanTest.getSex())){
    
    
            if (beanTest.getClassName().startsWith("理")){
    
    
                partition = 0;
            }else {
    
    
                partition = 1;
            }
        }else {
    
    
            if (beanTest.getClassName().startsWith("文")){
    
    
                partition = 2;
            }else {
    
    
                partition = 3;
            }
        }
        return partition;
    }
}

返回顶部


♦ Reduce阶段

package 学生数据处理_Jsonline;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class ReduceTest extends Reducer<BeanTest, NullWritable,BeanTest,NullWritable> {
    
    
    @Override
    protected void reduce(BeanTest key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    
    
        // 直接输出
        for (NullWritable value:values){
    
    
            context.write(key,NullWritable.get());
            //System.out.println("key:"+key);
        }
    }
}

返回顶部


♦ Driver阶段

package 学生数据处理_Jsonline;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DriverTest {
    
    
    public static void main(String[] args) {
    
    
        Job job = null;
        Configuration conf = new Configuration();
        try {
    
    
            // 1.获取job
            job = Job.getInstance(conf);
            // 2.配置关联
            job.setMapperClass(MapTest.class);
            job.setReducerClass(ReduceTest.class);
            job.setJarByClass(DriverTest.class);
            // 3.配置MR类型
            job.setMapOutputKeyClass(BeanTest.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setOutputKeyClass(BeanTest.class);
            job.setOutputValueClass(NullWritable.class);
            // 4.配置分类
            job.setPartitionerClass(PartitionTest.class);
            job.setNumReduceTasks(4);
            // 5.配置输入输出文件
            FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\data\\students.json"));
            FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\result"));
            // 6.提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result?0:1);

        } catch (Exception e){
    
    
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

返回顶部


优化 — 自定义FileOutputFormat

在这里插入图片描述
由MapReduce程序默认生成的文件是上图所示的形式,我们也可以自定义输出文件流,来自定义输出文件的名称,这样当有许多分类的时候就可以明了的看出对应的文件,如下图所示。
在这里插入图片描述


♦ 自定义文件输出类继承自FileOutputFormat

package 学生数据处理_Jsonline;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class TargetFile extends FileOutputFormat<BeanTest, NullWritable> {
    
    
    @Override
    public RecordWriter<BeanTest, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
    
    
        return new RW(job);
    }
}

♦ 创建新类继承自RecordWriter实现具体文件输出

返回顶部


package 学生数据处理_Jsonline;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;

public class RW extends RecordWriter<BeanTest, NullWritable> {
    
    

    // 声明输出流对象
    FSDataOutputStream stu1;
    FSDataOutputStream stu2;
    FSDataOutputStream stu3;
    FSDataOutputStream stu4;

    public RW(TaskAttemptContext job) throws IOException {
    
    
        // 1.获取文件系统
        FileSystem fs = FileSystem.get(job.getConfiguration());
        // 2.创建自定义的输出流
        stu1 = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results\\男_理科.txt"));
        stu2 = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results\\男_文科.txt"));
        stu3 = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results\\女_理科.txt"));
        stu4 = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results\\女_文科.txt"));
    }

    /**
     * 自定义输出流
     *
     * @param key
     * @param value
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void write(BeanTest key, NullWritable value) throws IOException, InterruptedException {
    
    
        if ("男".equals(key.getSex())) {
    
    
            if (key.getClassName().startsWith("理")) {
    
    
                stu1.write(key.toString().getBytes());
                stu1.write("\n".getBytes());
            } else {
    
    
                stu2.write(key.toString().getBytes());
                stu2.write("\n".getBytes());
            }
        } else {
    
    
            if (key.getClassName().startsWith("理")) {
    
    
                stu3.write(key.toString().getBytes());
                stu3.write("\n".getBytes());
            } else {
    
    
                stu4.write(key.toString().getBytes());
                stu4.write("\n".getBytes());
            }
        }
    }

    /**
     * 关闭流资源
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
    
    
        IOUtils.closeStream(stu1);
        IOUtils.closeStream(stu2);
        IOUtils.closeStream(stu3);
        IOUtils.closeStream(stu4);

    }
}

返回顶部


♦ 修改DriverTest类

package 学生数据处理_Jsonline;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DriverTest {
    
    
    public static void main(String[] args) {
    
    
        Job job = null;
        Configuration conf = new Configuration();
        try {
    
    
            // 1.获取job
            job = Job.getInstance(conf);
            // 2.配置关联
            job.setMapperClass(MapTest.class);
            job.setReducerClass(ReduceTest.class);
            job.setJarByClass(DriverTest.class);
            // 3.配置MR类型
            job.setMapOutputKeyClass(BeanTest.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setOutputKeyClass(BeanTest.class);
            job.setOutputValueClass(NullWritable.class);
            // 4.配置分类
            /*job.setPartitionerClass(PartitionTest.class);
            job.setNumReduceTasks(4);*/
            job.setOutputFormatClass(TargetFile.class);
            // 5.配置输入输出文件
            FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\data\\students.json"));
            FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\学生数据处理_Jsonline\\results"));
            // 6.提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result?0:1);

        } catch (Exception e){
    
    
            e.printStackTrace();
        }
    }
}

详情参见本人博客:【MapReduce】---- MR 框架原理 之 OutputFormat 数据输出

返回顶部


猜你喜欢

转载自blog.csdn.net/qq_45797116/article/details/114754046