HLog的作用:
HBase写入数据时会同时写入到WAL和Memstore中,其中Memstore是位于内存中的store,类似于写缓存,当Memstore的大小超过限定的阈值时会触发flush行为,将内存中的数据刷写到磁盘做持久化。其中的wal也称为hlog,作用类似于mysql中的binlog,记录了客户端的每次update动作,只有当wal写入成功之后,本次写事务才会返回。
我们知道内存中的数据是易失的,当regionserver宕机时,HMaster会切割按region切割宕机regionserver上的hlog,并将它分发到region被迁移到的新regionserver上以恢复该在memstore中还未来得及刷盘的数据。相应地如果hlog里面的记录已经完成了flush,则该hlog会被regionServer移动到.oldlog目录下,由HMaster上的定时线程LogCleaner周期性地扫描该目录,删除掉不再使用的hlog。
此外hlog还有一个意义就是用于hbase的replication,hbase的replication是通过将主集群的hlog推送到备集群,然后在备集群上reply来实现的。这个推送过程是异步完成的,因此会存在.oldlog目录下的hlog还未被replication推送完成的情况,此时HMaster会将这些未推送完成的hlog记录在zk上。方便在清除.oldlog目录时跳过有zk指向的hlog文件。
WALFactory类:
先从WALFactory开始分析,HRegionServer中管理着一个WALFactory变量,定义的格式如下:
protected volatile WALFactory walFactory
下面分析一下walFactory在regionserver中的应用姿势,首先在RegionServer中维护着一个与Master之间的心跳逻辑,这段代码在RegionServer的主循环run()里,如下所示:
while(keepLooping()) {
RegionServerStartupResponse w = repartForDuty();
if (w == null) {
this.sleeper.sleep();
} else {
handleReportForDutyResponse(w);
break;
}
}
reportForDuty是RegionServer向master上报注册信息,master会回应一个key/value格式的信息给regionserver,以标识regionserver本次register成功。
regionserver收到master的回应消息后,开始调用handleReportForDutyResponse,这个函数的主要逻辑列在下面:
try{
根据master的返回值处理hostname逻辑;
ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
this.walFactory = setupWALAndReplication(); //启动walFactory
初始化metricsRegionServer;
startServiceThreads(); //启动各路服务线程;
startHeapMemoryManager(); //启动内存管理了;
synchronized (online) {
online.set(true);
online.notifyAll();
}
}
setupWALAndReplication创建并返回了WALFactory类,setupWALAndRepliaction中的几个主要步骤摘录如下:
private WALFactory setupWALAndReplication() throws IOException {
final Path oldLogDir = new Paht(……..) //获取old WAL日志的路径
final String logName=DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
Path lodger = new Path(rootDir, logName);
//如果设置了replication相关,初始化replication manager
createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
listeners添加,添加了WALActionsListener;
final List<WALActionsListener> listeners = new ArrayList<>();
return new WALFactory(conf, listeners, serverName.toString);
}
WALFactory的构造函数中除了设置超时时间等之外,还初始化了一个DefaultWALProvider类型的变量,几乎所有与wal文件操作相关的方法都定义在这个接口类中。它里面分别定义了Reader&Writer,用于对WAL文件的读写。WALFactory中提供了两个接口createReader&createWriter,实际也是初始化了DefaultWALProvider中的这两个类。Reader&Writer实现了对hlog文件的读写。
DefaultWALProvider中还包括了一个FSHLog类型的成员变量,FSHLog管理了将WAL持久化的线程模型。下面详细分析FSHLog的实现。
FSHLog与HLog写入模型:首先从hbase的写入路径入手分析,前面分析过客户端的put操作在服务端最终调用的是doMiniBatchMutation。数据在被成功写入到memstore之后,会收集此次写入动作的table name、region info等信息并构造成一个HlogKey结构的对象记为walkey,并将当前写入的数据作为walEdit,然后将walkey和walEdit共同组装成一个entry之后将之append到内存中一个ringbuffer类型的缓冲区中。返回值txid用于标识本次写事务在缓冲区的写入序号。
if (walEdit.size() > 0) {
walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
currentNonceGroup, currentNonce);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, getSequenceId(), true, null);
walEdit = new WALEdit(isInReplay);
walKey = null;
}
待所有的锁释放之后,再将buffer中的数据刷写到磁盘,最后将版本号向前推进,提交本次写事务,其中刷写磁盘的代码如下:
if(taxied != 0) {
syncOrDefer(txid, durabiliby);
}
syncOrDefer会根据客户端设置的持久化级别选择是否将日志数据落盘,其中client可以选择的wal持久化等级划分为如下四个等级:
SKIP_WAL:数据只写memstore,不写Hlog,此时写入性能最高,但是数据有丢失风险;
ASYNC_WAL:异步将数据写入HLog文件中;
SYNC_WAL:同步将数据写入日志文件,但是此时数据只是被写入文件系统缓存,并没有真正落盘;
FSYNC_WAL:同步将数据写入日志文件并强制落盘,可以保证数据不丢失,但是性能最差;
客户端可以通过如:put.setDurability(Durability.SYNC_WAL)设置WAL持久化级别,不设置时默认是SYNC_WAL。syncOrDefer通过调用this.wal.sync(txid)将数据落盘,
经过前面的分析,可以看到hlog的写入最终是调用了FSHLog向外暴露的append()&sync()两个方法来实现的,可见FSHLog中实现了hlog的写线程模型。因此想要分析写线程模型的实现,分析的入口就在上面两个方法,在具体分析方法细节前,先看看构造FSHLog时都初始化了哪些变量:
this.appendExecutor = Executors.
newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
final int preallocatedEventCount =
this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
this.disruptor =
new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
this.disruptor.getRingBuffer().next();
this.ringBufferEventHandler =
new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
maxHandlersCount);
this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
this.disruptor.start();
其中最关键的就是变量disruptor,它是一个Disruptor<RingBufferTruck>类型的成员变量,Distuptor是LMAX开发的一个高性能无锁队列,本质还是个生产者-消费者模型,它采用一个环形数组结构,称为RingBuffer来复用内存,同时Buffer上的读写序列号经过优化可以避免伪共享,多线程并发访问该序列号时通过CPU级别的CAS自旋来获得,以此实现了lock free。这样只要buffer中有事件就会被递交到消费者线程池去处理。
回头看disruptor构造函数中的各参数的含义,首先第一个参数指定了通过Disruptor交换的事件类型,这里定义为RingBufferTruck类型,参数EVENT_FACTORY指代事件工厂,用于Disruptor通过该工厂在RingBuffer中预创建Event实例。参数preallocatedEventCount指定了ringBuffer的大小。ProducerType是数据生产方式,客户端写入数据时,调用this.wal.append()方法实际上就是以生产者的身份将数据写入到RingBuffer中。append关键代码如下:
public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
final List<Cell> memstoreCells) throws IOException {
FSWALEntry entry = null;
long sequence = this.disruptor.getRingBuffer().next();
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells); //用key和edits构造一个对象entry
truck.loadPayload(entry, scope.detach()); //将上面构造的对象包装为RingBufferTruck事件并添加到Ring Buffer
} finally {
this.disruptor.getRingBuffer().publish(sequence);
}
return sequence;
}
消费者线程由appendExecutor指定,这里用到的是newSingleThreadExecutor定义的单线程线程。BlockingWaitStrategy()指定了consumer的等待策略。appendExecutor并不处理具体的event,而是从Ringbuffer中接收之后将它转交给后端的ringBufferEventHandler来处理,因为appendExecutor中不包含事件处理逻辑,所以非常轻量,只需一个线程就可以处理生产端高并发的请求。
真正的事件处理在ringBufferEventHandler中完成,如下面定义,hbase中默认是5个handler:
this.ringBufferEventHandler = new
RingBufferEventHandler(conf.getInt(“hbase.regionserver.hlog.syncer.count”,5), maxHandersCount);
从RingBufferEventHandler起分析event的处理逻辑,每个RingBufferEventHandler中定义了两组主要的线程数组,如下所示:
class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
private final SyncRunner [] syncRunners;
private final SyncFuture [] syncFutures;
//其它变量
}
每接收到RingBufferTruck事件,RingBufferEventHandler便会调用onEvent对该事件进行处理,主要的处理逻辑代码列在了下面:
public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
throws Exception {
try {
if (truck.hasSyncFuturePayload()) {
this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
} else if (truck.hasFSWALEntryPayload()) {
TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
try {
append(truck.unloadFSWALEntryPayload());
} catch (Exception e) {
。。。。
}
} else {
return;
}
int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
try {
this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount);
} catch (Exception e) {
cleanupOutstandingSyncsOnException(sequence, e);
throw e;
}
attainSafePoint(sequence);
this.syncFuturesCount = 0;
} catch (Throwable t) {
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
}
}
RingBufferTruck中封装可能封装了两种不同类型的对象,分别是WALEntry或者SyncFuture,消费线程的执行方法onEvent()中对上述对象的处理也不同,如果是WALEntry,则调用append方法,使用writer将输入的WALEntry经protobuf序列化后写入hadoop文件。如果是SyncFuture,则把该对象放入RingBufferEventHandler自身维护的SyncFutures[]数组中。
然后,onEvent从syncRunners[]中取出一个线程,并调用它的offer方法,offer中将该EventHandler中的所有syncFuture添加到SyncRunner自身维护的阻塞队列中,在syncRunner线程的run方法里等到写满一批syncFuture之后,会调用writer.sync()将数据落盘,待数据成功刷到磁盘后,释放syncFuture,并将其中的scope置位。之所以如此设计,是因为对比客户端的屡次append操作,刷盘是相对比较耗时的,以此采用写文件缓存并结合异步刷盘的方式平衡对client端API的友好和客户端写吞吐。run方法简化后的主干代码如下:
public void run() {
long currentSequence;
while (!isInterrupted()) {
int syncCount = 0;
SyncFuture takeSyncFuture;
try {
while (true) {
takeSyncFuture = this.syncFutures.take();
currentSequence = this.sequence;
long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
long currentHighestSyncedSequence = highestSyncedSequence.get();
if (currentSequence < currentHighestSyncedSequence) {
syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
continue;
}
break;
}
try {
writer.sync();
currentSequence = updateHighestSyncedSequence(currentSequence);
} catch (IOException e) {
。。。。。
} catch (Exception e) {
。。。。。
} finally {
takeSyncFuture.setSpan(scope.detach());
syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t);
syncCount += releaseSyncFutures(currentSequence, t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Throwable t) {
LOG.warn("UNEXPECTED, continuing", t);
}
}
}
}
需要注意的是writer.sync()的预处理,其取出当前已处理的最大sequence与本次待处理syncFuture中的sequence相对比,sequence是按照事务提交的顺序递增赋值的,事务append到缓存的顺序也是与sequence的赠序一致,如果当前sequence小于最大已提交sequence,则表明hlog中已写入相应记录,因此调用releaseSyncFuture()释放syncFuture。
还有一个问题,syncFuture中scope的置位是什么时候来查的呢,答案就是FSHLog向外暴露的this.wal.sync(txid)方法,客户端写操作调用sync后会阻塞等待数据刷盘成功,sync中调用syncFuture的get后阻塞在文件系统的同步操作上,当文件系统将数据落盘完成之后,get方法返回,并将syncFuture中置位的scope返回给客户端。客户端工作线程被唤醒,返回继续写入memstore,完成一次写操作。private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
try {
syncFuture.get();
return syncFuture.getSpan();
} catch (InterruptedException ie) {
。。。。
} catch (ExecutionException e) {
。。。。
}
}
总而言之,WAL的写入模型是一个多消费者单生产者模型,生产者调用的方法append(),将包装好的WALEdit写入到线程安全的消息队列RingBuffer,同时只有一个消费者从这个队列中拉取数据并调用sync()方法把数据异步刷写到磁盘,单消费者保证了WAL日志并发写入时日志的全局顺序唯一,同时采用无锁队列Disruptor RingBuffer保证了写入端(生产者)的高吞吐低延时。
LogRoller:
LogRoller是个定期执行的线程。每个RegionServer中都有一个LogRoller线程,线程执行的周期由hbase.regionserver.logroll.period给出,默认时间是1hr。也就是说每过一个小时会产生一个新的hlog文件,hlog的文件名由regionserver名称+hlog形成时的时间戳构成。
LogRoller的run方法中的主要流程如下面列出:
rollLock.lock();
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
final WAL wal = entry.getKey();
final byte [][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
walNeedsRoll.put(wal, Boolean.FALSE);
if (regionsToFlush != null) {
for (byte [] r : regionsToFlush) scheduleFlush(r);
}
}
} finally {
try {
rollLog.set(false);
} finally {
rollLock.unlock();
}
}
在第四行中调用了HLog的rollWriter,rollWriter中会打开一个新的hdfs文件供log写入,并将old的hlog文件关闭。