以下是使用Hadoop2.4.1的JAVA API进行HDFS的相关操作
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Formatter;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import org.junit.Test;
import com.yq.common.HdfsUtils;
import com.yq.common.RegexExcludePathFilter;
public class Dhfs {
@Test
public void Test() throws IllegalArgumentException, IOException{
//创建目录
//mkdir(new Path("/test/test/test"));
//创建文件,不写入内容
//createFile(new Path("/test/test.null"));
//创建文件并写入内容,如果存在此文件,则直接覆盖
//createFile(new Path("/test/test.data"), "/home/young/notes/quicksort.txt");
//获取指定目录下的文件夹和文件信息
//ll(new Path("/test"));
//筛选文件
//fileFilter(new Path("/*/*"));
//排除文件
//String regex = "^*ta*" ;
//fileFilter(new Path("/test/*"), regex);
//读取文件内容
//read(new Path("/test/test.data"));
//上传文件
//upload(new Path("/home/young/notes/quicksort.txt"),new Path("/test/"));
//下载文件
//download(new Path("/test/test.data"), new Path("/home/young/"));
//删除文件,已过时
//delete(new Path("/test/test.null"));
//删除目录
//delete(new Path("/test/test/"),true); //true可以递归删除,false不可以
}
/**
* 创建目录
* @param path : 要创建的目录
*/
public static void mkdir(Path path) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
hdfs.mkdirs(path);
}
/**
* 创建文件并写入数据
* @param path 要创建的文件
* @param srcPath 输入数据的源地址
* @throws IOException
*/
public static void createFile(Path path, String srcPath) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
FSDataOutputStream out = hdfs.create(path,new Progressable(){
public void progress(){
//据说是,每64KB输出一个点
System.out.print(".");
}
});
InputStream in = new BufferedInputStream(new FileInputStream(srcPath));
//将wps文件里的内容保存到/test/test.data中
IOUtils.copyBytes(in, out, 4096,true);
}
/**
* 创建文件
* @param path 要创建的文件
* @throws IOException
*/
public static void createFile(Path path) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
hdfs.create(path);
}
/**
* 获取指定目录下的文件夹和文件信息(类似linux下的ll命令)
* @param path
* @throws FileNotFoundException
* @throws IOException
*/
public static void ll(Path path) throws FileNotFoundException, IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
FileStatus [] fileStatus = hdfs.listStatus(path);
Formatter format = new Formatter(System.out);
format.format("%s", "Found "+fileStatus.length + " items\n");
String type;
String permission;
int maxLenPermission=0;
String owner;
int maxLenOwner=0;
String group;
int maxLenGroup=0;
String fPath;
int maxLenPath=0;
for (FileStatus status : fileStatus){
type=status.isFile() ? "-" : "d" ;
permission = type+status.getPermission().toString();
maxLenPermission=maxLenPermission>permission.length()?maxLenPermission:permission.length();
owner = status.getOwner();
maxLenOwner=maxLenOwner>owner.length()?maxLenOwner:owner.length();
group = status.getGroup();
maxLenGroup=maxLenGroup>group.length()?maxLenGroup:group.length();
fPath = status.getPath().toString();
maxLenPath=maxLenPath>fPath.length()?maxLenPath:fPath.length();
}
for (FileStatus status : fileStatus){
type=status.isFile() ? "-" : "d" ;
permission = type+status.getPermission().toString();
owner = status.getOwner();
group = status.getGroup();
fPath = status.getPath().toString();
format.format("%"+maxLenPermission+"s %"+maxLenOwner+"s\t%s\t%"+maxLenPath+"s\n",permission, owner, group , fPath );
}
format.close();
}
/**
* 读取文件内容
* @param path
* @throws IOException
*/
public static void read(Path path) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
FSDataInputStream fsDataInputStream = hdfs.open(path);
IOUtils.copyBytes(fsDataInputStream, System.out, 4096,false); //注意实现细节
}
/**
* 上传文件
* @param srcPath
* @param dstPath
* @throws IOException
*/
public static void upload(Path srcPath, Path dstPath) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
hdfs.copyFromLocalFile(srcPath, dstPath);
}
/**
* 下载文件
* @param srcPath
* @param dstPath
* @throws IOException
*/
public static void download(Path srcPath, Path dstPath) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
hdfs.copyToLocalFile (srcPath, dstPath);
}
/**
* 删除文件
* @param path
* @throws IOException
*/
public static void delete(Path path) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
hdfs.delete(path);
}
/**
* 删除文件或目录
* @param path
* @param r
* @throws IOException
*/
public static void delete(Path path, boolean r) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
hdfs.delete(path, r);
}
/**
* 获取符合条件的文件或目录
* @param pathPattern
* @throws IOException
*/
public static void fileFilter(Path pathPattern) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
FileStatus[] fileStatus = hdfs.globStatus(pathPattern);//.globStatus(path, new RegexExcludePathFilter(""));
for( FileStatus status : fileStatus){
System.out.println(""+status.getPath());
}
}
/**
* 获取不符合条件的文件或目录
* @param pathPattern
* @param regex
* @throws IOException
*/
public static void fileFilter(Path pathPattern, String regex) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem();
FileStatus[] fileStatus = hdfs.globStatus(pathPattern, new RegexExcludePathFilter(regex));
for( FileStatus status : fileStatus){
System.out.println(""+status.getPath());
}
}
}