版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/fragrant_no1/article/details/85857348
Hadoop 中基本CRUD操作的 javaAPI:
package com.example.demo.hadoop;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class HDFSDemo01 {
private FileSystem hadoopClient = null;
@Before
public void init(){
System.out.println("初始化");
//配置configuration
System.setProperty("hadoop.home.dir", "D:\\Program Files\\hadoop-2.6.4");
Configuration conf = new Configuration();
// 获取文件系统的客户端操作对象
try {
hadoopClient = FileSystem.get(new URI("hdfs://47.100.110.224:9000"), conf, "hadoop");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
/**
* 文件上传,下载
* @throws Exception
*/
@Test
public void test01() throws Exception{
//系统操作
hadoopClient.copyFromLocalFile(new Path("E:/abc.txt"), new Path("/access.txt"));
//刷新并关闭
hadoopClient.close();
//下载操作,跟上面的不能一起,因为上面已经关闭了
hadoopClient.copyToLocalFile(new Path("/access.log.copy"),new Path("E:/"));
//刷新并关闭
hadoopClient.close();
}
/**
* 文件/目录CRUD操作
*/
@Test
public void test04() throws Exception {
//可以多级目录进行创建
boolean mkdirs = hadoopClient.mkdirs(new Path("/data/outpt6"));
//是否成功标识,下同,不再写出
System.out.print(mkdirs);
//第二个参数 是否递归删除非空目录
boolean delete = hadoopClient.delete(new Path("/data/outpt6"), true);
//存在即删除
boolean b = hadoopClient.deleteOnExit(new Path("/data/outpt6"));
// 重命名文件或文件夹 旧名-》新名
boolean rename = hadoopClient.rename(new Path("/data/outpt6"), new Path("/data/outpt7"));
}
/**
* 查看指定目录信息,只显示文件
* 为什么用迭代器而不用数组?
* 因为数组/集合 都是已经填充完毕的对象,并且加载到内存中去使用,hadoop的数据量比较大,如果全加载在内存中,会导致内存不足
* 而迭代器不是已经填充完毕的对象,开始只是获取迭代器对象,然后通过hasNext去集群中查询是否呦下一个数据,如果呦,就通过
* next方法去集群找下一个数据,所以每次next只会有一个数据存在内存中。
*/
@Test
public void test05() throws Exception {
//获取指定目录的迭代器信息,第二个参数代表递归查询(包括子目录下的文件一并查出)
RemoteIterator<LocatedFileStatus> iterator = hadoopClient.listFiles(new Path("/data/"), true);
while(iterator.hasNext()){
//FileStatus对象封装了文件的和目录的元数据,包括文件长度、块大小、权限等信息
LocatedFileStatus next = iterator.next();
System.out.println("文件路径:"+next.getPath());
System.out.println("块的大小:"+next.getBlockSize());
System.out.println("文件所有者:"+next.getOwner()+":"+next.getGroup());
System.out.println("文件权限:"+next.getPermission());
System.out.println("文件长度:"+next.getLen());
System.out.println("备份数:"+next.getReplication());
System.out.println("修改时间:"+next.getModificationTime());
//文件切割块的信息
BlockLocation[] blockLocations = next.getBlockLocations();
for (BlockLocation bl : blockLocations) {
System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
String[] hosts = bl.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("--------------为angelababy打印的分割线--------------");
}
}
/**
* 查看文件及文件夹信息,不会进行递归查看子目录
*/
@Test
public void test06() throws Exception{
//通过fs的listStatus方法获取一个指定path的所有文件信息(status)
FileStatus[] listStatus = hadoopClient.listStatus(new Path("/data/"));
for (FileStatus fstatus : listStatus) {
//可以判断是文件还是目录
if(fstatus.isFile()){
//文件
}
if(fstatus.isDirectory()){
//目录
}
}
}
//通过流的方式访问hdfs这样可以控制获取的内容大小,比如有3台机器分别部署了mapreduce.jar 那么每一台处理不同的数据,分别为:0-60M,60-120M,120-180M 并发进行,那么上面那种操作方式时不可以的。
/**
* 通过流的方式上传文件到hdfs
*/
@Test
public void test07() throws Exception{
//获取输出流,第二个参数 如果文件存在就覆盖
FSDataOutputStream out = hadoopClient.create(new Path("/data/outpt7"),true);
FileInputStream in = new FileInputStream("D:/abc.txt");
//不需要我们手动关闭流拉
IOUtils.copy(in, out);
}
/**
* 通过流的方式下载
*/
@Test
public void test08() throws Exception{
//先获取一个文件的输入流----针对hdfs上的
FSDataInputStream in = hadoopClient.open(new Path("/data/outpt7"));
//指定读取的起始位置,单位字节byte
in.seek(12);
//再构造一个文件的输出流----针对本地的
FileOutputStream out = new FileOutputStream(new File("D:/outpt7"));
//再将输入流中数据传输到输出流
IOUtils.copy(in,out);
}
/**
* 显示hdfs上文件的内容到控制台
*/
@Test
public void test09() throws Exception{
// FSDataInputStream in = hadoopClient.open(new Path("/iloveyou.txt"));
FileInputStream fileInputStream = new FileInputStream(new File("E:/abc.txt"));
//第三个参数代表缓冲区大小,也就是每次读取数据量的最大值
org.apache.hadoop.io.IOUtils.copyBytes(fileInputStream,System.out,1024);
//第三个参数代表 读取多少个字节内容,第四个参数代表关不关流
org.apache.hadoop.io.IOUtils.copyBytes(fileInputStream,System.out,8L,true);
}
}