MapReduce读写ORC文件

MapReduce读写ORC文件

ORC的全称是Optimized Row Columnar。ORC文件格式是hadoop生态圈中的一种列式存储格式,最早来自于Apache Hive, 有着非常高的压缩比和读取效率,成为Hive中常用的一种文件格式。

1、读ORC文件
OrcFileReadMapper.java
---------------------------------------
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;

/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileReadMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> {

    private Text outputKey = new Text();

    @Override
    protected void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException {
        StringBuffer sb= new StringBuffer();

        sb.append(value.getFieldValue(0));      //通过下标索引获取数据
        sb.append(value.getFieldValue(1));
        sb.append(value.getFieldValue(2));
        sb.append(value.getFieldValue("age"));      //也可以通过字段名获取数据

        // 打印输出获取到的数据
        System.out.println(sb.toString());
        outputKey = new Text(sb.toString());

        context.write(outputKey, NullWritable.get());
    }
}

OrcFileReadJob.java
------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapreduce.OrcInputFormat;

import java.io.IOException;

/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileReadJob extends Configured implements Tool {

    @Override
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = getConf();

        String input = "input";
        String output = "output";

        Job job = Job.getInstance(conf);

        job.setJarByClass(OrcFileReadJob.class);
        job.setMapperClass(OrcFileReadMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setInputFormatClass(OrcInputFormat.class);      //设置输入格式为Orc格式
        job.setOutputFormatClass(TextOutputFormat.class);       //输出格式为文本格式

        OrcInputFormat.addInputPath(job, new Path(input));      //以Orc的方式加载输入路径
        FileOutputFormat.setOutputPath(job, new Path(output));

        boolean rt = job.waitForCompletion(true);
        return rt?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int retnum = ToolRunner.run(conf,new OrcFileReadJob(),args);
    }
}
2、写ORC文件
OrcFileWriteMapper.java
----------------------------------------------------------------
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileWriteMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

    Text outputKey = new Text();
    IntWritable outputValue = new IntWritable(1);

/*    @Override
    protected void setup(Context context) {
        String filePath = ((FileSplit) context.getInputSplit()).getPath().toString();
        System.out.println("filePath:" + filePath);
    }*/

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        outputKey = value;
        context.write(outputKey,outputValue);
    }
}
OrcFileWriteReducer.java
----------------------------------------------------------------------
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;

/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileWriteReducer extends Reducer<Text,IntWritable,NullWritable,OrcStruct> {

  //要创建的ORC文件中的字段类型
    private TypeDescription schema = TypeDescription.fromString(
            "struct<id:string," +
                    "name:string," +
                    "sex:string," +
                    "age:int>"
    );

    private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        String line = key.toString();
        String[] lineSplit = line.trim().split(" ");

        pair.setFieldValue(0,new Text(lineSplit[0]));
        pair.setFieldValue(1,new LongWritable(Long.parseLong(lineSplit[1])));
        pair.setFieldValue(2,new Text(lineSplit[2]));
        pair.setFieldValue(3,new Text(lineSplit[3]));
        pair.setFieldValue(4,new Text(lineSplit[4]));

        context.write(NullWritable.get(),pair);
    }
}

OrcFileWriteJob.java
------------------------------------------------------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;

import java.io.IOException;

/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileWriteJob extends Configured implements Tool {
    @Override
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = getConf();

        conf.set("orc.mapred.output.schema","struct<id:string,name:string,sex:string,age:int>");
        String input = "input";
        String output = "output";

        Job job = Job.getInstance(conf);

        job.setJarByClass(OrcFileWriteJob.class);
        job.setMapperClass(OrcFileWriteMapper.class);
        job.setReducerClass(OrcFileWriteReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(OrcStruct.class);

        job.setOutputFormatClass(OrcOutputFormat.class);
        FileInputFormat.addInputPath(job,new Path(input));
        FileOutputFormat.setOutputPath(job,new Path(output));

        boolean rt = job.waitForCompletion(true);
        return rt?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int retnum = ToolRunner.run(conf,new OrcFileWriteJob(),args);
    }
}





猜你喜欢

转载自blog.csdn.net/YF_Li123/article/details/80714269