当处理海量小文件时,先将小文件进行sequenceFile操作或者类似操作处理,然后再上传到HDFS系统进行下一步的处理。(如有其它建议,欢迎留言)
一、直接上传本地栅格数据将导致的问题
HDFS在存储文件时,会将文件break them into chunks,默认inputSplit的大小与block块的大小一致,为128M,如果单个文件的大小小于block块的大小则不会切分,直接将改小文件存储到一个block块中。因此如果不对栅格数据文件做处理,将导致占据大量的block块;由于namenode中会存储为元数据信息,因此也将导致namenode节点记录大量小文件的位置等元数据信息而产生压力过大,甚至namenode节点的内存可能会被占满。
二、在本地将栅格数据处理成sequenceFile文件
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件。通常对小文件的处理是使用sequenceFile文件或MapFile文件。此次选用的处理方式是使用SequenceFile文件。(MapFile文件由两部分组成,data和index,index作为文件的索引,存储每个Record的key值以及它的偏移量。mapFile文件的检索效率较sequenceFile文件高,但是访问mapFile文件时需要先将索引文件加载到内存)
由于sequenceFile文件由Key和value组成,此处的key值存放的为文件的路径,例如:file:/home/greatmap/World/6/109/48.jpeg;value的值存放的为图片的字节数组文件。栅格数据存放在的文件夹下,通过遍历文件夹,将所有的图片都写成一个sequenceFile文件。
下面是具体实现过程:
public class SequenceFileTest {
//本地linux磁盘输出路径
static String PATH = "/home/greatmap/out";
static SequenceFile.Writer writer = null;
public static void main(String[] args) throws Exception{
//设置读取本地磁盘文件
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
//linux磁盘下路径
String path = "/home/greatmap/World/";
URI uri = new URI(path);
FileSystem fileSystem = FileSystem.get(uri, conf);
//实例化writer对象
writer = SequenceFile.createWriter(fileSystem, conf, new Path(PATH), Text.class, BytesWritable.class);
//递归遍历文件夹,并将文件下的文件写入sequenceFile文件
listFileAndWriteToSequenceFile(fileSystem,path);
//关闭流
org.apache.hadoop.io.IOUtils.closeStream(writer);
}
/****
* 递归文件;并将文件写成SequenceFile文件
* @param fileSystem
* @param path
* @throws Exception
*/
public static void listFileAndWriteToSequenceFile(FileSystem fileSystem,String path) throws Exception{
final FileStatus[] listStatuses = fileSystem.listStatus(new Path(path));
for (FileStatus fileStatus : listStatuses) {
if(fileStatus.isFile()){
Text fileText = new Text(fileStatus.getPath().toString());
System.out.println(fileText.toString());
//返回一个SequenceFile.Writer实例 需要数据流和path对象 将数据写入了path对象
FSDataInputStream in = fileSystem.open(new Path(fileText.toString()));
byte[] buffer = IOUtils.toByteArray(in);
in.read(buffer);
BytesWritable value = new BytesWritable(buffer);
//写成SequenceFile文件
writer.append(fileText, value);
}
if(fileStatus.isDirectory()){
listFileAndWriteToSequenceFile(fileSystem,fileStatus.getPath().toString());
}
// org.apache.hadoop.io.IOUtils.closeStream(writer);
}
}}