版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/a909301740/article/details/84454450
通过 HDFS 提供的 Java API,我们可以完成以下功能:
-
在 HDFS 上创建目录
-
通过 FileSystem API 读取数据(下载文件)
-
写入数据(上传文件)
-
查看目录及文件的信息
-
查看某个文件在 HDFS 集群中的位置
-
删除数据
-
获取 HDFS 集群上所有数据节点的信息
一、在 HDFS 上创建目录
@Test
public void testMkDir() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.220.111:9000");
// 获取一个HDFS 客户端
FileSystem client = FileSystem.get(conf);
// 创建目录
boolean flag = client.mkdirs(new Path("/f1"));
// 关闭
client.close();
System.out.println(flag);
}
二、通过 FileSystem API 读取数据(下载文件)
@Test
public void testDownLoad() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.220.111:9000");
FileSystem client = FileSystem.get(conf);
// 打开一个输入流 <------HDFS
InputStream in = client.open(new Path("/tools/stt.txt"));
// 构造一个输出流 ----> D:\\temp\\down.txt
OutputStream out = new FileOutputStream("D:\\down.txt"); // 目录必须事先存在
// 将输入流中的数据写到输出流
IOUtils.copyBytes(in, out, 1024);
/*byte[] buffer = new byte[1024];
int len = 0;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
out.flush();
out.close();
in.close();*/
}
三、写入数据(上传文件)
@Test
public void testUpLoad() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.220.111:9000");
FileSystem client = FileSystem.get(conf);
// 构造一个输入流
InputStream in = new FileInputStream("D:\\TestUpload.java");
// 构造一个输出流,不存在目录则会创建
OutputStream out = client.create(new Path("/tool/upload.java"));
IOUtils.copyBytes(in, out, 1024);
}
四、查看目录及文件的信息
@Test
public void testCheckFileInformation() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.220.111:9000");
FileSystem client = FileSystem.get(conf);
// 列出根目录下所有文件及目录的信息
FileStatus[] fileStatus = client.listStatus(new Path("/tool"));
for (FileStatus status : fileStatus) {
String dir = status.isDirectory() ? "目录" : "文件";
// 全路径名
String path = status.getPath().toString();
// 文件名
String name = status.getPath().getName();
System.out.println(dir + "-----" + name + ",path:" + path);
// 访问时间
System.out.println("访问时间:" + status.getAccessTime());
// 数据块的大小
System.out.println("数据块的大小:" + status.getBlockSize());
// 所属组
System.out.println("所属组:" + status.getGroup());
// 长度
System.out.println("长度:" + status.getLen());
// 修改时间
System.out.println("修改时间:" + status.getModificationTime());
// 拥有者
System.out.println("拥有者:" + status.getOwner());
// 权限
System.out.println("权限:" + status.getPermission());
// 冗余度
System.out.println("冗余度:" + status.getReplication());
}
}
五、查看某个文件对应的数据块在 HDFS 集群中的位置
@Test
public void testFindFileBlockLocation() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.220.111:9000");
FileSystem client = FileSystem.get(conf);
// Path必须是一个文件,不能是目录
FileStatus fStatus = client.getFileStatus(new Path("/tool/upload.java"));
BlockLocation[] blocks = client.getFileBlockLocations(fStatus, 0, fStatus.getLen());
for (BlockLocation block : blocks) {
// 文件所对应的数据块可能分布在多台服务器上,所以返回的是数组
System.out.println(Arrays.toString(block.getHosts()) + "\t" + Arrays.toString(block.getNames()));
}
}
六、删除数据
@Test
public void testDeleteFile() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.220.111:9000");
FileSystem fs = FileSystem.get(conf);
// 第二个参数表示是否递归
boolean flag = fs.delete(new Path("/tool"), true);
System.out.println(flag ? "删除成功" : "删除失败");
}
七、获取 HDFS 集群上所有数据节点的信息
@Test
public void testDataNode() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.220.111:9000");
DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(conf);
DatanodeInfo[] dataNodeStats = fs.getDataNodeStats();
for (DatanodeInfo dataNode : dataNodeStats) {
System.out.println(dataNode.getHostName() + "\t" + dataNode.getName());
}
}