在Hadoop或者Mapreduce 使用压缩或者解压文件的技术时,都基本会用到CompressionCodecFactory,下面就来浅讨如何使用它。
- CompressionCodecFactory简介
当在读取一个压缩文件的时候,可能并不知道压缩文件用的是哪种压缩算法,那么无法完成解压任务。在Hadoop中,CompressionCodecFactory通过使用其getCodec()方法,可以通过文件扩展名映射到一个与其对应的CompressionCodec类。
常见的对照表如下:
用CompressionCodecFactory方法来推断CompressionCodecs
在阅读一个压缩文件时,我们通常可以从其扩展名来推断出它的编码/解码器。以.gz结尾的文件可以用GzipCodec来阅读,如此类推。每个压缩格式的扩展名均以下表所示:
CompressionCodecFactory提供了getCodec()方法,从而将文件扩展名映射到相应的CompressionCodec。此方法接受一个Path对象。
package cn.roboson.codec;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
/*
* 通过CompressionCodeFactory推断CompressionCodec
* 1.先从本地上传一个.gz后缀的文件到Hadoop
* 2.通过文件后缀推断出所用的压缩算法
* 3.解压上传的压缩文件到统一个目录下
*/
public class StreamCompressor02 {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.addResource("core-site.xml");
try {
FileSystem fs = FileSystem.get(conf);
//本地文件
String localsrc="/home/roboson/桌面/README.txt.gz";
Path localPath= new Path(localsrc);
//目的处路径
String hadoopdsc="/roboson/README.txt.gz";
Path hadoopPath = new Path(hadoopdsc);
//复制前/roboson目录下的文件列表
FileStatus[] files = fs.listStatus(new Path("/roboson/"));
System.out.println("复制前:");
for (FileStatus fileStatus : files) {
System.out.println(fileStatus.getPath());
}
//复制本地文件到Hadoop文件系统中
fs.copyFromLocalFile(localPath,hadoopPath);
//复制后/roboson目录下的文件列表
files = fs.listStatus(new Path("/roboson/"));
System.out.println("复制后:");
for (FileStatus fileStatus : files) {
System.out.println(fileStatus.getPath());
}
//获得一个CompressionCodecFactory实例来推断哪种压缩算法
CompressionCodecFactory facotry = new CompressionCodecFactory(conf);
//通过CompressionCodecFactory推断出一个压缩类,用于解压
CompressionCodec codec =facotry.getCodec(hadoopPath);
if(codec==null){
System.out.println("没有找到该类压缩");
System.exit(1);
}
/*
* 1.CompressionCodecFactory的removeSuffix()用来返回一个文件名,这个文件名==压缩文件的后缀名去掉
* 如README.txt.gz调用removeSuffix()方法后,返回的是README.txt
*
* 2.CompressionCodec的getDefaultExtension()方法返回的是一个压缩算法的压缩扩展名,如gzip的是.gz
*/
String uncodecUrl=facotry.removeSuffix(hadoopdsc, codec.getDefaultExtension());
System.out.println("压缩算法的生成文件的扩展名:"+codec.getDefaultExtension());
System.out.println("解压后生成的文件名:"+uncodecUrl);
//在Hadoop中创建解压后的文件
FSDataOutputStream out = fs.create(new Path(uncodecUrl));
//创建输入数据流,并用CompressionCodec的createInputStream()方法,将输入数据流中读取的数据解压
FSDataInputStream in = fs.open(new Path(hadoopdsc));
CompressionInputStream codecIn = codec.createInputStream(in);
//将输入数据流写入到 输出数据流
IOUtils.copyBytes(codecIn, out, conf,true);
//解压后/roboson目录下的文件列表
files = fs.listStatus(new Path("/roboson/"));
System.out.println("解压后");
for (FileStatus fileStatus : files) {
System.out.println(fileStatus.getPath());
}
//查看解压后的内容
System.out.println("解压后的内容:");
in=fs.open(new Path(uncodecUrl));
IOUtils.copyBytes(in,System.out, conf,true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}