本片文章主要介绍利用FileSystem API对HDFS进行相关操作,如增删查等——HDFS不支持对文件在任意位置修改。
1 从HDFS中读取数据
从HDFS中读取数据,主要是从存放在HDFS中的文件中读取数据,可以利用FileSystem中的open()方法得到一个文件的输入流,从文件输入流中获取文件的数据。
具体步骤1)获取FileSystem的实例 2)利用open()方法获取文件输入流 3)从输入流中读取书
代码如下:
//获取FileSystem的实例
FileSystem fs = FileSystem.get(URI.create(path),configuration);
//得到文件输入流
InputStream in = null;
in = fs.open(new Path(path));
//从输入流中读取数据
IOUtils.copyBytes(in,System.out,4096,false);
其中copyBytes(InputStream in, OutputStream out, int buffSize, boolean close),将in中的输入流copy到out输入流中,close表示是否要关闭输入输出流。在该方法内部会调用copyBytes(InputStream in, OutputStream out, int buffSize)方法,方法内部有一行语句:
out.write(buf, 0, bytesRead);
用于从输出流中输出数据,我们这里将其设置成System.out标准输出,因此输出到控制台。
我准备读取存放在HDFS中的/input/f1文件,因此,可以将path路径设置成:hdfs://localhost:9000/input/f1,从而读取到HDFS中的f1中的数据。其中hdfs://localhost:9000对应的是HDFS文件系统。
ps:如果将path设置成hdfs://localhost:9000/input/f1,会报如下错误:
/127.0.1.1 to localhost:8020 failed on connection exception: java.net.ConnectException: 拒绝连接;
原因在于没有设置HDFS的端口号,会根据默认端口号8082访问文件系统,而在core-site.xml中配置的HDFS的端口号为9000,当然访问不到对应的HDFS了。
2 向HDFS中写数据
同理,向HDFS中写数据,实际上是向HDFS中的文件中写入数据,有两种方式:1)新建文件,向文件中写入数据 2)HDFS同时支持向已有文件追加数据
新建文件
可以使用如下步骤进行:
1. 获取FileSystem实例
2. 创建输入流in,和输出流out
3. 将in中的数据传到out中,最后写入到目标文件中
代码如下:
//创建输入流,从本地文件中读取数据
InputStream in = new BufferedInputStream(new FileInputStream(loclfile));
//获取FileSystem实例
FileSystem fs = FileSystem.get(URI.create(path),configuration);
//创建文件(指定文件在HDFS中的路径,如果路径不存在,会自动创建文件所在目录) 返回输出流
OutputStream out =fs.create(new Path(path), new Progressable() {
public void progress() {
System.out.println("*");
}
});
//将输入流中的数据复制到输出流中,写入HDFS文件中
IOUtils.copyBytes(in,out,4096,true);
向文件中追加数据
使用FileSystem的append()方法向已有文件的尾部追加数据,具体代码如下:
//随便创建一个输入流
InputStream in = new ByteArrayInputStream("sajkdj sjak asjk sdsd".getBytes());
FileSystem fs = FileSystem.get(URI.create(path),configuration);
//向文件尾部追加数据
OutputStream out = fs.append(new Path(path));
IOUtils.copyBytes(in,out,4096,false);
ps追加数据时,必须添加两行代码:
configuration.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
configuration.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
否则,出错,具体原因,还没有完全搞清楚,有知道的可以告知下。
3 删除文件/目录
步骤:
- 获取FileSystem实例
- 调用delete(Path path ,boolean b)方法删除文件或目录(注意,删除目录时,会连同目录下的文件一起删除)
代码如下:
FileSystem fs = FileSystem.get(URI.create(path),configuration);
//删除文件/目录(第二个参数为true时,只删除非空文件/目录,否则抛出异常,为false时,可以删除空文件/目录)
fs.delete(new Path(path),true);