写操作:
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path file = new Path("hdfs://127.0.0.1:9000/example.txt");
FSDataOutputStream outStream = fs.create(file);
out.write("java api write data".getBytes("UTF-8"));
outStream.close();
1.首先通过FS的get()方法加载一些配置信息,然后返回FS的实例,这边对应的DFS。
2.通过调用DFS的create()方法创建了一个文件,并且返回文件输出流FSDataOutputStream。
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
//创建一个FSDataOutputStream对象返回,FSDataOutputStream的第一个参数是OutputStream引用
return new FSDataOutputStream
(dfs.create(getPathName(f), permission,
overwrite, true, replication, blockSize, progress, bufferSize),
statistics);
}
参数f表示将要在HDFS中建立的文件路径,permission表示客户端的权限信息,overwrite如果为true,则同名文件将被覆盖,如果为false,则有同名文件会抛出错误,bufferSize表示写文件过程中的缓存大小,replication表示文件写入到HDFS中后拥有的副本数量,blockSize表示数据块大小,progress用于客户端向HDFS报告进度。
3.这个方法首先通过dfs.create()创建一个DFSOutputStream对象,然后使用FSDataOutputStream的构造方法创建目标文件的输出流管道对象返回给客户端,这样客户端就可以通过这个管道向HDFS写入数据了。
4.dfs.create()方法有7个重载方法,最终都会调用到如下方法:
public OutputStream create(String src,FsPermission permission,boolean overwrite,
boolean createParent,short replication,long blockSize,
Progressable progress,int buffersize)throws IOException{
checkOpen();//检查DFSClient是否关闭
if (permission==null){
permission=FsPermission.getDefault();
}
FsPermission masked=permission.applyUMask(FsPermission.getUMask(conf));
LOG.debug(src+": masked="+masked);
final DFSOutputStream result=new DFSOutputStream(src,masked,
overwrite,createParent,replication,blockSize,progress,buffersize,
conf.getInt("io.bytes.per.checksum",512));
beginFileLease(src,result);
return result;
}
参数src是目标文件的路径的字符串表示,因为HDFS中的文件的路径在NameNode中是使用/分隔的字符串表示的,参数createParent如果为true,则会创建不存在的父目录。在这个方法中首先通过checkOpen()检查DFSClient是否关闭,然后通过DFSOutputStream的构造方法创建了一个DFSOutputStream对象,最后,通过beginFileLease()方法开启客户端向HDFS写文件的租约。
5.DFSOutputStream类:
class DFSOutputStream extends FSOutputSummer implements Syncable {
/**
* 与HDFS中的DataNode节点进行网络通信
**/
private Socket s;
/**
* 标识数据输出流是否关闭
**/
boolean closed = false;
/**
* 文件名
**/
private String src;
/**
* 用于写出数据流
**/
private DataOutputStream blockStream;
/**
* 响应流
**/
private DataInputStream blockReplyStream;
private Block block;
private Token<BlockTokenIdentifier> accessToken;
/**
* 数据块大小
**/
final private long blockSize;
private DataChecksum checksum;
/**
* 保存了输出流中要发送的数据包
**/
private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
/**
* 保存已经发送的,但是还没有收到确认的数据包
**/
private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
/**
* 当前数据包
**/
private Packet currentPacket = null;
private int maxPackets = 80; // each packet 64K, total 5MB
// private int maxPackets = 1000; // each packet 64K, total 64MB
private DataStreamer streamer = new DataStreamer();
;
private ResponseProcessor response = null;
private long currentSeqno = 0;
private long lastQueuedSeqno = -1;
/**
* 当前收到应答的最后一个数据包
**/
private long lastAckedSeqno = -1;
/**
* 在当前数据块中的位置
**/
private long bytesCurBlock = 0; // bytes writen in current block
/**
* 数据包的大小
**/
private int packetSize = 0; // write packet size, including the header.
private int chunksPerPacket = 0;
/**
* 当前数据块要输出到的数据节点
**/
private DatanodeInfo[] nodes = null; // list of targets for current block
/**
* 存放创建数据流管道时失败的DataNode节点,将这个DataNode排除在外,以免再次访问到故障节点
**/
private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
/**
* 是否出现错误或者异常
**/
private volatile boolean hasError = false;
/**
* 创建数据流管道时标识哪个DataNode出错
**/
private volatile int errorIndex = 0;
/**
* 保存调用过程中最后出现的一个异常
**/
private volatile IOException lastException = null;
private long artificialSlowdown = 0;
private long lastFlushOffset = 0; // offset when flush was invoked
private boolean persistBlocks = false; // persist blocks on namenode
private int recoveryErrorCount = 0; // number of times block recovery failed
private int maxRecoveryErrorCount = 5; // try block recovery 5 times
private volatile boolean appendChunk = false; // appending to existing partial block
private long initialFileSize = 0; // at time of file open
/**
* 用于向HDFS报告进度
**/
private Progressable progress;
/**
* 文件的数据块副本数
**/
private short blockReplication; // replication factor of file
在DFSOutputStream类中定义了3个内部类,分别是Packet, DataStreamer和ResponseProcessor,。
其中Packet是在客户端发往HDFS集群过程中的数据包的抽象,同时也用于构造数据包的特殊形式-心跳包。
DataStreamer用于在数据流管道中向DataNode发送数据包。它从NameNode上检索新的blockid和block的位置,然后开始向DataNode组成的数据流管道中发送数据包。每个数据包都有一个序列号。当一个数据块的所有数据包都发送完毕,且每个数据包的确认消息都被接收,那么DataStreamer就关闭当前的数据块。
ResponseProcessor是用来处理这些确认信息的,DataStreamer和ResponseProcessor分别代表单独的线程。
6.在DFSClient.create()方法中调用的DFSOutputStream构造方法的代码如下:
DFSOutputStream(String src,FsPermission masked,boolean overwrite,
boolean createParent,short replication,long blockSize,Progressable progress,
int buffersize,int bytesPerChecksum)throws IOException{
//初始化成员变量
this(src,blockSize,progress,bytesPerChecksum,replication);
computePacketChunkSize(writePacketSize,bytesPerChecksum);
try{
//旧版本的HDFS集群的create()方法不含createParent参数,使用createParent参数来兼容旧版本
//create()方法在NameNode上创建一个新的处于构建状态的文件
if(createParent){
namenode.create(
src,masked,clientName,overwrite,replication,blockSize);
}else{
namenode.create(
src,masked,clientName,overwrite,false,replication,blockSize);
}
}catch(RemoteException re){
throw re.unwrapRemoteException(AccessControlException.class,FileAlreadyExistsException.class,
FileNotFoundException.class,NSQuotaExceededException.class,DSQuotaExceededException.class);
}
streamer.start();
}
这个构造方法的最后一个参数bytesPerChecksum代表多少字节计算一次校验和,默认是512字节的数据计算一次校验和。在方法中首先调用另一个构造方法初始化成员变量,然后调用方法computePacketChunkSize来计算发往数据节点的数据包能包含多少校验块,以及包的长度,一般来说数据包最大能达到64K字节。HDFS传输数据时使用的校验方式有两种选择,一种是空校验,即不对数据进行校验,一种是CRC32校验,也就是循环冗余校验,正常都是选择使用CRC32校验方式。
7.然后调用NameNode节点上的create()方法来创建文件,即按照给定的src路径,在NameNode的目录树结构创建一个文件,同时传入这个文件的数据块大小,副本数等参数信息,在这儿远程方法执行完之后,在NameNode的目录树创建好一个条目,在NameNode上创建文件成功后,就启动DataStreamer线程,向HDFS集群中写入数据。
8.在DFSOutputStream的构造方法执行完之后,返回到DFSClient.create()方法,执行第4步中的beginFileLease(src, result);这行语句,获取写文件的租约。对于HDFS来说,租约是NameNode节点给予租约持有者在规定时间内一定权限(写文件)合同。NameNode节点通过租约管理器管理租约,客户端在写文件过程中,需要定期更新租约,否则,租约过期后,NameNode节点会通过租约恢复机制关闭文件。到这里就完成了HDFS集群中文件的创建。