一、通过 java.net.URL
1.在ubuntu下打开eclipse
2.创建项目
3.导入hadoop所有jar包
Build Path --->Configure Build Path ---> Add External JARs --->FileSystem --->mnt ---> hgfs --->share for linux --->hadoop2.9.0--->-lib
4.编写代码
package hadoopDemo;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
public class TestFileSystem {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception {
String urlString = "hdfs://ubuntucp:8020/test/a.txt";
URL url = new URL(urlString) ;
InputStream is = url.openStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buf = new byte[1024];
int len = 0 ;
while((len = is.read(buf)) != -1){
baos.write(buf, 0, len);
}
byte[] data = baos.toByteArray();
System.out.println(new String(data));
}
}
其中
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
是为了让Java程序能识别Hadoop的 hdfs URL 方案所做的额外工作。
还可以调用Hadoop中IOUtils 类
package hadoopDemo;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
public class TestFileSystem {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception {
String urlString = "hdfs://ubuntucp:8020/test/a.txt";
URL url = new URL(urlString) ;
InputStream is = url.openStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// byte[] buf = new byte[1024];
// int len = 0 ;
// while((len = is.read(buf)) != -1){
// baos.write(buf, 0, len);
// }
// byte[] data = baos.toByteArray();
// System.out.println(new String(data));
// is.close();
IOUtils.copyBytes(is, baos, 1024);
IOUtils.closeStream(is) ;
System.out.println(new String(baos.toByteArray()));
}
}
5.添加log4j的属性文件
将其粘到项目的src下
二、通过FileSystem API 读取数据
1.首先构建单元测试
在已有工程下新建名字为test的 Source Folder 用于存放测试类源代码(和src并列),并要求包名也相同,然后新建测试类
package hadoopDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Test;
public class TestFileSystemAPI {
@Test
public void read() throws Exception{
Configuration conf = new Configuration() ;
FileSystem fs = FileSystem.get(conf) ;
}
}
对测试类调试,发现变量 fs 的值为 LocalFileSystem ,这是因为用的是 /mnt/hgfs/share for linux/hadoop-2.9.0/_lib/hadoop-common-2.9.0.jar 中 core-default.xml 的默认配置。
2.新建名称为 core-site.xml 的 File
为了便于管理可以在工程下新建 Source Folder ,把 core-site.xml 放入,如下:
core-site.xml 内容如下
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://ubuntucp:8020/</value>
</property>
</configuration>
再次调试测试类,发现变量 fs 的值为 DistributedFileSystem ,这样就可以访问了
但这样必须要求配置文件名称为 core-site.xml ,如果用其他名称,需要在程序中添加指定配置文件:
package hadoopDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
@Test
public void read() throws Exception{
Configuration conf = new Configuration() ;//Configuration对象封装了客户端或服务器的配置
//添加指定配置文件
conf.addResource("my-core-site.xml") ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/a.txt" ;
Path path = new Path(file) ;
FSDataInputStream in = fs.open(path) ;
IOUtils.copyBytes(in, System.out, 1024, true) ;//1024是缓冲区的长度而不是流的长度,true为关闭流
}
}
直接使用 FileSystem 以标准输出格式显示 Hadoop 文件系统中的文件,如下:
package hadoopDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
@Test
public void read() throws Exception{
Configuration conf = new Configuration() ;//Configuration对象封装了客户端或服务器的配置
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/a.txt" ;
Path path = new Path(file) ;
FSDataInputStream in = fs.open(path) ;
IOUtils.copyBytes(in, System.out, 1024, true) ;//1024是缓冲区的长度而不是流的长度,true为关闭流
}
}
FSDataInputStream 继承了 DataInputStream 类和实现了 Seekbale 接口,Seekable 接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos()) 的查询方法。与 java.io.InputStream 的 skip() 不同,seek() 可以移到文件中任意一个绝对位置,skip() 则只能相对于当前位置定位到另一个新位置。
//Seekable接口
public interface Seekable {
void seek(long pos) throws IOException ;
long getPos() throws IOException ;
}
通过API实现seek操作:
package hadoopDemo;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 通过API实现seek操作
*/
@Test
public void seek() throws Exception{
Configuration conf = new Configuration() ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/how.txt" ;
Path path = new Path(file) ;
FSDataInputStream in = fs.open(path) ;
IOUtils.copyBytes(in, new FileOutputStream("/home/ubuntu/Downloads/how1.jpg") , 1024, false) ;
in.seek(0) ;//重新定位到起始位置
IOUtils.copyBytes(in, new FileOutputStream("/home/ubuntu/Downloads/how2.jpg") , 1024, true) ;
}
}
FSDataInputStream 类也实现了 PositionedReadable 接口,从一个指定偏移量出读取文件的一部分:
//PositionedReadable接口
public interface PositionedReadable {
public int read(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer) throws IOException;
}
由上,read() 方法从文件的指定 position 处读取至多为 length 字节的数据并存入缓冲区 buffer 的指定偏移量 offset 处。返回值是实际读到的字节数。readFully() 方法将指定 length 长度的字节数数据读到 buffer 中。
3.获取文件状态
将单元测试改为如下:
package hadoopDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
@Test
public void read() throws Exception{
Configuration conf = new Configuration() ;//Configuration对象封装了客户端或服务器的配置
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/a.txt" ;
Path path = new Path(file) ;
FileStatus ft = fs.getFileStatus(path) ;//获取文件状态
System.out.println("块大小 " + ft.getBlockSize()); //得到块大小
System.out.println("访问时间 " + ft.getAccessTime()); //得到访问时间
System.out.println("组 " + ft.getGroup()); //得到组
System.out.println("文件长度bytes " + ft.getLen()); //得到长度
System.out.println("修改时间 " + ft.getModificationTime()); //得到修改时间
System.out.println("文件拥有者 " + ft.getOwner()); //得到文件拥有者
System.out.println("文件复制因子 " + ft.getReplication()); //得到文件复制
}
}
列出文件目录:
package hadoopDemo;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 列出文件目录
*/
@Test
public void listFile() throws Exception{
Configuration conf = new Configuration() ;//Configuration对象封装了客户端或服务器的配置
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/" ;
Path path = new Path(file) ;
FileStatus[] fst = fs.listStatus(path) ;
System.out.println("一种方法");
for(FileStatus ft : fst){
System.out.println(ft.getPath() + ": isFile = " + ft.isFile());
}
//工具类,直接将FileStatus[]数组提取数据形成Path[]数组,可以替代ft.getPath()的for循环
System.out.println("另一种方法");
Path[] listesPaths = FileUtil.stat2Paths(fst) ;
for(Path p : listesPaths){
System.out.println(p);
}
}
}
结果如下
4.获取块信息
将单元测试改为如下:
package hadoopDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
@Test
public void read() throws Exception{
Configuration conf = new Configuration() ;//Configuration对象封装了客户端或服务器的配置
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/hadoop-2.9.0.tar.gz" ;
Path path = new Path(file) ;
//得到指定路径下的文件的状态
FileStatus ft = fs.getFileStatus(path) ;//获取文件状态,FileStatus相当于文件或者目录
//得到指定文件状态的块位置信息集合
BlockLocation[] location = fs.getFileBlockLocations(ft, 0, ft.getLen()) ;//一个文件被切割成两块,则BlockLocation[]就有两个BlockLocation元素
for(BlockLocation block : location){
System.out.println(block.getHosts()) ;//之所以getHosts()是String[],是因为每一个块的副本在不同的主机上
}
}
}
调试location的值如下:
可以看到location包含了3块,每块中含有hosts(主机名称),length,offset(偏移量),names(datanode的远程通信rpc地址) 等信息
在storageids中有
和 datanode 中 ~/hadoop/dfs/data/current 的VERSION 中的storageID 相同:
5.通过 API 实现文件上传(写入数据)
package hadoopDemo;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 通过API实现文件上传
*/
@Test
public void putFile() throws Exception{
Configuration conf = new Configuration() ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/how.txt" ;
Path path = new Path(file) ;
FSDataOutputStream out = fs.create(path) ;//创建文件系统数据输出流用来写入文件
IOUtils.copyBytes(new FileInputStream("/home/ubuntu/Downloads/bizhi.jpg"), out, 1024) ;
}
}
使用 append() 方法在一个现有文件末尾追加数据:
package hadoopDemo;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 使用append()方法在一个现有文件末尾追加数据
*/
@Test
public void append() throws Exception{
Configuration conf = new Configuration() ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/a.txt" ;//文件已存在
Path path = new Path(file) ;
FSDataOutputStream out = fs.append(path) ;
out.writeChars("I miss you !") ;
out.close() ;
}
}
6.文件副本数和块大小修改
首先要在集群中修改最小块限制:
1).进入 /soft/hadoop/etc/hadoop_cluster ,修改 hdfs-site.xml ,添加如下内容(把块最小限制改为10K):
<property>
<name>dfs.namenode.fs-limits.min-block-size</name>
<value>10240</value>
</property>
2).将修改后的 hdfs-site.xml 分发给各主机
3).停掉集群重新开启:
start-dfs.sh
然后可以通过API实现副本数以及块大小的修改:
默认配置请参考 F:\share for linux\hadoop-2.9.0\_conf 中的 hdfs-default.xml
package hadoopDemo;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 修改副本书和块大小
*/
@Test
public void customReplication() throws Exception{
Configuration conf = new Configuration() ;
//set(String,String),修改修改当前会话副本数为4
conf.set("dfs.replication", "" + 4) ;
//修改当前文件块大小为50K,但hdfs有最小块限制,所以要先修改最小块限制(需在集群中修改)
conf.set("dfs.blocksize", "" + (1024*50)) ;
FileSystem fs = FileSystem.get(conf) ;
String file = "hdfs://ubuntucp:8020/test/modify.txt" ;
Path path = new Path(file) ;
FSDataOutputStream out = fs.create(path) ;//创建文件系统数据输出流用来写入文件
IOUtils.copyBytes(new FileInputStream("/home/ubuntu/Downloads/bizhi.jpg"), out, 1024) ;
}
}
进入 http://ubuntucp:50070/ 发现 modify.txt 文件的块大小以及副本数改变了:
7.文件通配及过滤
Hadoop为执行通配提供了两个FileSystem方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
globStatus() 方法返回路径格式与指定模式匹配的所有 FileStatus 对象组成的数组。PathFilter 命令作为可选项可以进一步匹配结果进行限制。
通配符及其含义:
通配符 | 匹配 |
* | 代表0到多个字符 |
? | 代表单一字符 |
[ ab ] | 代表字符类型,匹配{a,b}中的一个字符 |
[ ^ab ] | 代表不是{a,b}中的一个字符 |
[ a-b ] | 代表匹配一个a到b之间的字符包括ab,ASCII代码在a-b之间的 |
[ ^a-b] | 代表不在a到b之间的字符包括ab |
{a,b} | 代表匹配a或b的一个语句 |
\c | 代表转义字符匹配原字符c |
实例:
/* | /2007 /2008 |
/*/* | /2007/12 /2008/01 |
/200? | /2007 /2008 |
/200[78] | /2007 /2008 |
/200[7-8] | /2007 /2008 |
通配符模式并不能总能够精确地描述想要访问的文件集,比如使用通配格式排除一个特定的文件就不太可能。FileSystem 中的 listStatus() 和 globStatus() 方法提供了可选的 PathFilter 对象,从而控制通配符:
package org.apache.hadoop.fs;
public interface PathFilter {
boolean accept(Path path);
}
范例 PathFilter 用于排除匹配正则表达式的路径:
首先在HDFS中创建相关目录:
hdfs dfs -mkdir -p /test/2007/12/30 /test/2007/12/31 /test/2008/01/01 /test/2008/01/02
接着定义排他性路径过滤的类 RegexExcludePathFilter :
package hadoopDemo.pathFilter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/*
* 排他性路径过滤
*/
public class RegexExcludePathFilter implements PathFilter {//实现了PathFilter接口
private String regexp ;
public RegexExcludePathFilter(String regexp) {
this.regexp = regexp;
}
public boolean accept(Path path) {
return !path.toString().matches(regexp);
}
}
然后在单元测试中调用该类:
package hadoopDemo;
import hadoopDemo.pathFilter.RegexExcludePathFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestFileSystemAPI {
/*
* 通过PathFilter应用正则表达式对路径过滤
*/
@Test
public void pathFilter() throws Exception{
Configuration conf = new Configuration() ;
FileSystem fs = FileSystem.get(conf) ;
FileStatus[] ft = fs.globStatus(new Path("/test/2007/*/*"), new RegexExcludePathFilter("^.*/2007/12/31$")) ;
//直接将FileStatus[]数组提取数据形成Path[]数组,可以替代ft.getPath()的for循环
Path[] path = FileUtil.stat2Paths(ft) ;
for(Path p : path){
System.out.println(p);
}
}
}
结果为:
8.删除数据
使用 FileSystem 的 delete() 方法可以永久性删除文件或目录:
public boolean delete(Path f , boolean recursive) throws IOException
如果 f 是一个文件或空目录,那么 recursive 的值就会被忽略,直接删除。只有在 recursive 值为 true 时,非空目录及其内容才会被删除。