MapReduce读写ORC文件
ORC的全称是Optimized Row Columnar。ORC文件格式是hadoop生态圈中的一种列式存储格式,最早来自于Apache Hive, 有着非常高的压缩比和读取效率,成为Hive中常用的一种文件格式。
1、读ORC文件
OrcFileReadMapper.java
---------------------------------------
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileReadMapper extends Mapper<NullWritable, OrcStruct, Text, NullWritable> {
private Text outputKey = new Text();
@Override
protected void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException {
StringBuffer sb= new StringBuffer();
sb.append(value.getFieldValue(0)); //通过下标索引获取数据
sb.append(value.getFieldValue(1));
sb.append(value.getFieldValue(2));
sb.append(value.getFieldValue("age")); //也可以通过字段名获取数据
// 打印输出获取到的数据
System.out.println(sb.toString());
outputKey = new Text(sb.toString());
context.write(outputKey, NullWritable.get());
}
}
OrcFileReadJob.java
------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapreduce.OrcInputFormat;
import java.io.IOException;
/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileReadJob extends Configured implements Tool {
@Override
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = getConf();
String input = "input";
String output = "output";
Job job = Job.getInstance(conf);
job.setJarByClass(OrcFileReadJob.class);
job.setMapperClass(OrcFileReadMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(OrcInputFormat.class); //设置输入格式为Orc格式
job.setOutputFormatClass(TextOutputFormat.class); //输出格式为文本格式
OrcInputFormat.addInputPath(job, new Path(input)); //以Orc的方式加载输入路径
FileOutputFormat.setOutputPath(job, new Path(output));
boolean rt = job.waitForCompletion(true);
return rt?0:1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int retnum = ToolRunner.run(conf,new OrcFileReadJob(),args);
}
}
2、写ORC文件
OrcFileWriteMapper.java
----------------------------------------------------------------
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileWriteMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
Text outputKey = new Text();
IntWritable outputValue = new IntWritable(1);
/* @Override
protected void setup(Context context) {
String filePath = ((FileSplit) context.getInputSplit()).getPath().toString();
System.out.println("filePath:" + filePath);
}*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
outputKey = value;
context.write(outputKey,outputValue);
}
}
OrcFileWriteReducer.java
----------------------------------------------------------------------
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileWriteReducer extends Reducer<Text,IntWritable,NullWritable,OrcStruct> {
//要创建的ORC文件中的字段类型
private TypeDescription schema = TypeDescription.fromString(
"struct<id:string," +
"name:string," +
"sex:string," +
"age:int>"
);
private OrcStruct pair = (OrcStruct)OrcStruct.createValue(schema);
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
String line = key.toString();
String[] lineSplit = line.trim().split(" ");
pair.setFieldValue(0,new Text(lineSplit[0]));
pair.setFieldValue(1,new LongWritable(Long.parseLong(lineSplit[1])));
pair.setFieldValue(2,new Text(lineSplit[2]));
pair.setFieldValue(3,new Text(lineSplit[3]));
pair.setFieldValue(4,new Text(lineSplit[4]));
context.write(NullWritable.get(),pair);
}
}
OrcFileWriteJob.java
------------------------------------------------------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;
import java.io.IOException;
/**
* @author lyf
* @since 2018/06/16
*/
public class OrcFileWriteJob extends Configured implements Tool {
@Override
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = getConf();
conf.set("orc.mapred.output.schema","struct<id:string,name:string,sex:string,age:int>");
String input = "input";
String output = "output";
Job job = Job.getInstance(conf);
job.setJarByClass(OrcFileWriteJob.class);
job.setMapperClass(OrcFileWriteMapper.class);
job.setReducerClass(OrcFileWriteReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrcStruct.class);
job.setOutputFormatClass(OrcOutputFormat.class);
FileInputFormat.addInputPath(job,new Path(input));
FileOutputFormat.setOutputPath(job,new Path(output));
boolean rt = job.waitForCompletion(true);
return rt?0:1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int retnum = ToolRunner.run(conf,new OrcFileWriteJob(),args);
}
}