一、
根据HDFS提供的API,实现以下功能:
针对文件: 上传、下载、删除、重命名、移动
package HdfsApi;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class HdfsApi {
//获取FS
public static FileSystem getFs() throws IOException{
//获取配置文件
Configuration conf = new Configuration();
//获取文件系统
FileSystem fs = FileSystem.get(conf);
return fs;
}
//读取文件
public static void readFile(String src) throws IOException{
// 获取 FileSystem
FileSystem fs = getFs();
//读的路径
Path readPath = new Path(src);
FSDataInputStream inStream = null;
try {
//打开输入流
inStream = fs.open(readPath);
IOUtils.copyBytes(inStream, System.out, 4096, false);
} catch (Exception e) {
e.printStackTrace();
}finally{
IOUtils.closeStream(inStream);
}
}
public static void putFlie() throws IOException
{ //获取filesystem
FileSystem fs=getFs();
//上传到路径
File readpath=new File("/opt/app/hadoop/wc.input");
//本地路径
Path putpath=new Path("/data/put-wc.input");
FileInputStream in=new FileInputStream(readpath);
FSDataOutputStream out =fs.create(putpath);
try {
IOUtils.copyBytes(in, out, 4096, false);
} catch (Exception e) {
e.printStackTrace();
}finally{
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
//上传文件
public static void writeFile(String src,String dst) throws IOException{
//获取filesystem
FileSystem fs = getFs();
//本地路径
File inFile = new File(src);
//目标路径
Path outFile = new Path(dst);
FileInputStream inStream = null;
FSDataOutputStream outStream = null;
try {
//打开输入流
inStream = new FileInputStream(inFile);
//打开输出流
outStream = fs.create(outFile);
IOUtils.copyBytes(inStream, outStream,4096, false);
} catch (Exception e) {
// TODO: handle exception
}finally{
IOUtils.closeStream(inStream);
IOUtils.closeStream(outStream);
}
}
//下载文件
public static void downLoad(String src,String dst) throws IOException{
FileSystem fs = getFs();
//设置下载地址和目标地址
fs.copyToLocalFile(new Path(src), new Path(dst));
fs.close();
}
//重命名和移动
public static void renameMV(String src,String dst) throws IOException{
FileSystem fs = getFs();
fs.rename(new Path(src), new Path(dst));
fs.close();
}
//删除文件
public static void delete(String fileName) throws IOException{
FileSystem fs = getFs();
fs.deleteOnExit(new Path(fileName));
}
//获取文件列表
public static void listFile(String dirName) throws IOException{
FileSystem fs = getFs();
FileStatus[] fileStatuses = fs.listStatus(new Path(dirName));
for(FileStatus fileName:fileStatuses){
System.out.println(fileName.getPath().getName());
}
}
//创建目录
public static void mkdir() throws IOException
{
FileSystem fs = getFs();
fs.mkdirs(new Path("/data/bb"));
fs.close();
}
//创建目录
public static void deletedir() throws IOException
{
FileSystem fs = getFs();
fs.delete(new Path("/data/bb"), true);
fs.close();
}
public static void main(String[] args) throws IOException {
}
}
二、实现把本地某个目录下面所有小文件合并上传到HDFS文件系统
package HdfsApi;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class PutMerge {
public static void main(String[] args) throws IOException {
//step 1 : 获取 FileSystem
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
FileSystem dfs = FileSystem.get(conf);
//step 2 : 设置输入输出 in/out Path
Path inPath = new Path("/opt/app/hadoop-2.5.0/etc/hadoop");
Path outPath = new Path("/data2/putmerge.xml");
FileStatus[] fileStatuses = localFs.listStatus(inPath);
OutputStream outStream = null;
InputStream inStream = null;
try {
//step 4 : 打开输出流
outStream = dfs.create(outPath);
for(FileStatus fileName:fileStatuses){
//step 3 : 打开输入流
inStream = localFs.open(fileName.getPath());
IOUtils.copyBytes(inStream, outStream, 4096, false);
IOUtils.closeStream(inStream);
System.out.println(fileName.getPath());
}
} catch (Exception e) {
e.printStackTrace();
}finally{
IOUtils.closeStream(outStream);
System.out.println("PutMerge Success");
}
}
}