使用文件模式,实现多文件上传至HDFS

文件模式:

在某个单一操作中处理一系列文件。例如一个日志处理的MapReduce作业可能要分析一个月的日志量。如果一个文件一个文件或者一个目录一个目录的声明那就太麻烦了,可以使用通配符(wild card)来匹配多个文件(这个操作也叫做globbing)。
Hadoop提供了两种方法来处理文件组:

public FileStatus[] globStatus(Path pathPattern) throws IOException;
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException;

PathFilter

FileSystem的listStatus()和globStatus()方法提供了一个可选参数:PathFilter——它允许一些更细化的控制匹配:例如排除某个特定文件。

package org.apache.hadoop.fs;
public interface PathFilter 
{
    boolean accept(Path path);
}

hadoop的匹配符

这里写图片描述

这里写图片描述

描述:

利用通配符和PathFilter 对象,将本地多种格式的文件上传至 HDFS文件系统,并过滤掉 txt文本格式以外的文件。

数据:

这里写图片描述

分析:

1、首先使用globStatus(Path pathPattern, PathFilter filter),完成文件格式过滤,获取所有 txt 格式的文件。

2、然后使用 Java API 接口 copyFromLocalFile,将所有 txt 格式的文件上传至 HDFS。

代码:

1.首先定义一个类 RegexAcceptPathFilter实现 PathFilter,过滤掉 txt 文本格式以外的文件。

/**
     * @function 只接受 txt 格式的文件aa
     *
     */
    public static class RegexAcceptPathFilter implements PathFilter {
        private final String regex;

        public RegexAcceptPathFilter(String regex) {
            this.regex = regex;
        }

        @Override
        public boolean accept(Path path) {
            // TODO Auto-generated method stub
            boolean flag = path.toString().matches(regex);
            //只接受 regex 格式的文件
            return flag;
        }
    }

如果要接收 regex 格式的文件,则accept()方法就return flag; 如果想要过滤掉regex格式的文件,则accept()方法就return !flag。

2.接下来在 list 方法中,使用 globStatus 方法获取所有 txt 文件,然后通过 copyFromLocalFile 方法将文件上传至 HDFS。

/**
     * function 过滤文件格式   将多个文件上传至 HDFS
     * @param dstPath 目的路径
     * @throws IOException
     * @throws URISyntaxException
     */
    public static void list(Path dstPath) throws IOException, URISyntaxException {
        //读取hadoop文件系统的配置
        Configuration conf = new Configuration();
        //HDFS 接口
        URI uri = new URI("hdfs://pc2:9000");
        //获取文件系统对象
        fs = FileSystem.get(uri, conf);
        // 获得本地文件系统
        local = FileSystem.getLocal(conf);
        //只上传data/testdata 目录下 txt 格式的文件
        FileStatus[] localStatus = local.globStatus(new Path("F:/hdfs/data/*"),new RegexAcceptPathFilter("^.*txt$"));
        // 获得所有文件路径
        Path[] listedPaths = FileUtil.stat2Paths(localStatus);
        for(Path p:listedPaths){
            //将本地文件上传到HDFS
            fs.copyFromLocalFile(p, dstPath);
        }
    }

3.在 main() 方法在调用 list,执行多文件上传至 HDFS。

package com.dajiangtai.hadoop.test2;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/**
 * @function 将指定格式的多个文件上传至 HDFS
 *
 */
@SuppressWarnings("unused")
public class CopyManyFilesToHDFS {

    private static FileSystem fs = null;
    private static FileSystem local = null;

    /**
     * @function Main 方法
     * @param args
     * @throws IOException
     * @throws URISyntaxException
     */
    public static void main(String[] args) throws IOException,URISyntaxException {
        //文件上传路径
        Path dstPath = new Path("hdfs://pc
        2:9000/home/hadoop/filter/");
        //调用文件上传 list 方法
        list(dstPath);
    }

运行:

1.hadoop集群正常运行

2.数据目录在确定的路径下

3.将代码拷贝到eclipse工具对应的包中

这里写图片描述

4.修改程序路径代码和自己的一致

这里写图片描述

5.链接myeclipse和hadoop分布式集群

这里写图片描述

6.在hdfs文件系统里创建存储路径filter

这里写图片描述

7.运行程序

这里写图片描述

运行结果
右键filter,点击Refresh即可

可以在filter目录下看到如下结果

这里写图片描述

猜你喜欢

转载自blog.csdn.net/zoeyen_/article/details/78946581