HA模式下所有对namespace的修改操作都会被nn优先记录[HDFS-2874]到jnode集群以便跟snn共享,然后再记录到nn本地文件中。
通常JournalSet中维护了两类记录edit的方式:
- 通过QuorumOutputStream 记录到所有的jnode中,图中的4-->5
- 通过EditLogFileOutputStream记录到nn本地文件中,图中的6-->7
两种方式都是先将操作记录写入双缓冲区EditsDoubleBuffer,当累积一定量后再批量发送(写)出去。
EditsDoubleBuffer双缓冲由两个TxnBuff实现:bufCurrent和bufReady,默认初始容量initBufferSize都是512kb。操作记录直接写入bufCurrent,一旦写满,FSEditLog就会调用logSync( )将缓存的操作记录发送出去。
step1、QuorumOutputStream 和 EditLogFileOutputStream 分别调用setReadyToFlush( )方法将各自EditsDoubleBuffer的两个缓冲区互换,使当前正在接收写入的缓冲区bufCurrent变成就绪的缓冲区,以便发送出去;同时使先前已经发送完毕的缓冲区bufReady开始接手新的操作记录缓存。
public void setReadyToFlush() {
assert isFlushed() : "previous data not flushed yet";
TxnBuffer tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
}
step2、QuorumOutputStream 和 EditLogFileOutputStream 分别[QuorumOutputStream 优先于EditLogFileOutputStream ]调用flush( )j方法将各自bufReady中就绪的缓存数据发送出去。QuorumOutputStream将操作记录数据写入jnode, EditLogFileOutputStream将其写入本地的editlog文件。完成之后EditsDoubleBuffer会重置为空缓冲区。
QuorumOutputStream:
protected void flushAndSync(boolean durable) throws IOException {
int numReadyBytes = buf.countReadyBytes();
if (numReadyBytes > 0) {
int numReadyTxns = buf.countReadyTxns();
long firstTxToFlush = buf.getFirstReadyTxId();
assert numReadyTxns > 0;
// Copy from our double-buffer into a new byte array. This is for
// two reasons:
// 1) The IPC code has no way of specifying to send only a slice of
// a larger array.
// 2) because the calls to the underlying nodes are asynchronous, we
// need a defensive copy to avoid accidentally mutating the buffer
// before it is sent.
DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes);
buf.flushTo(bufToSend);
assert bufToSend.getLength() == numReadyBytes;
byte[] data = bufToSend.getData();
assert data.length == bufToSend.getLength();
//将数据发送到各个jnode
QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
segmentTxId, firstTxToFlush,
numReadyTxns, data);
loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits");
// Since we successfully wrote this batch, let the loggers know. Any future
// RPCs will thus let the loggers know of the most recent transaction, even
// if a logger has fallen behind.
loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
}
}
EditLogFileOutputStream:
public EditLogFileOutputStream(Configuration conf, File name, int size)
throws IOException {
super();
shouldSyncWritesAndSkipFsync = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH,
DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT);
file = name;
doubleBuf = new EditsDoubleBuffer(size);
RandomAccessFile rp;
if (shouldSyncWritesAndSkipFsync) {
rp = new RandomAccessFile(name, "rws");
} else {
rp = new RandomAccessFile(name, "rw");
}
fp = new FileOutputStream(rp.getFD()); // open for append
fc = rp.getChannel();
fc.position(fc.size());
}
public void flushAndSync(boolean durable) throws IOException {
if (fp == null) {
throw new IOException("Trying to use aborted output stream");
}
if (doubleBuf.isFlushed()) {
LOG.info("Nothing to flush");
return;
}
preallocate(); // preallocate file if necessary
doubleBuf.flushTo(fp);
if (durable && !shouldSkipFsyncForTests && !shouldSyncWritesAndSkipFsync) {
fc.force(false); // metadata updates not needed
}
}