一、Memstore存在的意义?
在HBase中,每个HRegionServer上有多个HRegion,每个HRegion上有多个HStore,而Memestore作为一个HStore的组成部分,当我们大量写操作发生的时候,如果超过了Memstore的设置阀值,就会执行flush到Hfile文件的操作。默认情况下hbase底层存储的文件系统为hdfs,但是HDFS在存储的时候直接就存了原始的数据,没有对数据进行相关的优化,比如rowkey排序,版本过滤等操作,而我们使用hbase就是想要支持快速的检索,那么就必须保证rowkey的顺序,hbase在设计的时候,加了Memstore这层,一是加快响应,二是在数据flush到磁盘之前,先排好序,先过滤垃圾数据(比如某些column family 只需要最新版本不需要存多个版本)。这样的设计,在flush到hfile的时候已经对数据进行了优化,检索的时候就快很多了。
hbase提供的配置选项里面有几个关于memstore的,在介绍之前,需要知晓的是hbase的memstore的flush操作的最小执行单元是一个HRegion,首先我们来分别看一下几个关于memestore操作的几个配置:
hbase.hregion.memstore.flush.size
该值表示每个HRegionServer上单个HRegion里的单个HStore里的Memstore的内存大小阀值,默认为128M,当单个Memstore超过这个大小时,会触发这个HRegion的Memstore进行flush操作(首先最小单元的HRegion,如果多个HStore中有一个HStore的Memstore的大小超过了这个阀值,就会触发整个HRegion的Memstore进行flush操作,flush操作不阻塞更新)。需要注意的是,随着数据量的越来越大,单个HRegionServer上的HRegion会变得越来越多,随之改变的就是这个HReionServer上的总的Memstore的大小会变得越来越大。
hbase.regionserver.global.memstore.size
当单个HRegionServer上的的所有的HRegion对应的所有的Memstore之和超过了该配置,也会强制进行flush操作,而且还会阻塞更新(这是最不希望看到的,因为阻塞了这个HRegionServer上的更新操作,将会影响在这个HRegionServer上所有的HRegion的读写)。默认情况下, hbase.regionserver.global.memstore.size的大小为堆大小的40%的,当触发了flush操作之后且这个HRegionServer的Memstore内存大小下降到
hbase.regionserver.global.memstore.lowerLimit *hbase.regionserver.global.memstore.upperLimit * hbase_heapsize
的配置的时候,释放阻塞操作(这个地方很巧妙,他不是一直flush,因为该flush操作会阻塞对当前这个HRegionServer的更新,而是只要flush到一个可以允许的最小值,就不阻塞)。
hbase.hregion.memstore.block.multiplier
我们知道,一个HRegion里有N个HStore分别对应表的不同column Family,该参数的配置就是如果一个HRegion里的所有Memstore大小超过了
hbase.hregion.memstore.block.multiplier * Hbase.hRegion.memstore.flush.size
大小,也会触发这个HRegion的flush操作。
举个例子:
heap:1G
hbase.regionserver.global.memstore.size = 1*1024M*40%=410M
hbase.regionserver.global.memstore.size.lower.limit =1*1024M*40%*0.95=390M
hbase.hregion.memstore.flush.size = 128M
hbase.hregion.memstore.block.multiplier = 4
现在假设:单个HRegionServer上有4个HRegion,其中每个HRgion里面只有一个HStore,
HStore1 已使用memstore 100M
HStore2 已使用memstore 110M
HStore3 已使用memstore 110M
HStore4 已使用memstore 100M
虽然单个HStore的都没有超过默认的128M配置,但是总大小已经超过了 hbase.regionserver.global.memstore.size的值 那么也会触发flush操作,并且还会阻塞这个HRegion的更新操作。
所以,我们要权衡单个HRegionServer上的总的HRegion的个数,以及一个HRegion里面的HStore数,合理设置上述配置值。 说完了一些关于memstore的配置,实际就是触发执行Memstore-flush操作的时机。
二、HRegionServer级别的flush操作源码分析
当触发了HRegionServer级别的flush,会阻塞更新,在每个HRegionServer触发了flush之后,实际还是会细化到HRegion级别的flush。因为在执行flush的时候肯定是每个HS里面的HRegion分别进行flush操作。
在HRgionServer类里面,有一个成员变量,专门用来处理flush操作
protected MemStoreFlusher cacheFlusher;
点进去看具体的实现,
MemstoreFlusher类的源码
,下面是一些重要的变量定义
-
/** 一个延迟非阻塞队列,里面放的是待flush的HRegion */
-
private
final BlockingQueue<FlushQueueEntry> flushQueue =
-
new DelayQueue<FlushQueueEntry>();
-
/**Map类型,key为代刷新的HRegion,value为改HRegion做了一次封装后的对象 */
-
private
final Map<Region, FlushRegionEntry> regionsInQueue =
-
new HashMap<Region, FlushRegionEntry>();
-
/**线程唤醒 */
-
private AtomicBoolean wakeupPending =
new AtomicBoolean();
-
-
/**线程唤醒频率*/
-
private
final
long threadWakeFrequency;
-
//持有HRegionServer的引用
-
private
final HRegionServer server;
-
/** 读写锁*/
-
private
final ReentrantReadWriteLock lock =
new ReentrantReadWriteLock();
-
/**一个对象,在线程wait和Notify的时候使用 */
-
private
final Object blockSignal =
new Object();
-
/**全局的Memstore大小限制 */
-
protected
long globalMemStoreLimit;
-
/**限制因子的百分比 */
-
protected
float globalMemStoreLimitLowMarkPercent;
-
/**限制大小 */
-
protected
long globalMemStoreLimitLowMark;
-
/**阻塞等待时间 */
-
private
long blockingWaitTime;
-
private
final Counter updatesBlockedMsHighWater =
new Counter();
-
/**处理flush操作的线程数 */
-
private
final FlushHandler[] flushHandlers;
-
private List<FlushRequestListener> flushRequestListeners =
new ArrayList<FlushRequestListener>(
1);
在构造函数里面进行了初始化操作
-
public MemStoreFlusher(final Configuration conf,
-
final HRegionServer server) {
-
super();
-
this.conf = conf;
-
this.server = server;
-
/**线程唤醒频率,默认10s,主要为了防止处理HRegion执行flush操作的线程休眠 */
-
this.threadWakeFrequency =
-
conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
10 *
1000);
-
/**获取最大的堆大小 */
-
long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
-
/** 获取全局memstore所占堆内存的百分比globalMemStorePercent,默认是0.4f */
-
float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf,
true);
-
/**计算全局Memstore的内存大小限制,默认是堆内存的40% */
-
this.globalMemStoreLimit = (
long) (max * globalMemStorePercent);
-
/**获取全局Memstore的内存限制的最低百分比 ,默认配置的0.95f*/
-
this.globalMemStoreLimitLowMarkPercent =
-
HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
-
/**获取全局Memstore的内存限制的最低值,默认是堆大小 * 0.4 * 0.95 */
-
this.globalMemStoreLimitLowMark =
-
(
long) (
this.globalMemStoreLimit *
this.globalMemStoreLimitLowMarkPercent);
-
/**阻塞等待时间 */
-
this.blockingWaitTime = conf.getInt(
"hbase.hstore.blockingWaitTime",
-
90000);
-
/**处理队列里面待Flush操作的HRegion的线程数,默认是2个 */
-
int handlerCount = conf.getInt(
"hbase.hstore.flusher.count",
2);
-
this.flushHandlers =
new FlushHandler[handlerCount];
-
LOG.info(
"globalMemStoreLimit="
-
+ TraditionalBinaryPrefix.long2String(
this.globalMemStoreLimit,
"",
1)
-
+
", globalMemStoreLimitLowMark="
-
+ TraditionalBinaryPrefix.long2String(
this.globalMemStoreLimitLowMark,
"",
1)
-
+
", maxHeap=" + TraditionalBinaryPrefix.long2String(max,
"",
1));
-
}
将需要执行flush的HRegion加入队列
-
Override
-
public
void
requestFlush
(Region r, boolean forceFlushAllStores) {
-
synchronized (regionsInQueue) {
-
/**如果队列里面没有这个Region */
-
if (!regionsInQueue.containsKey(r)) {
-
/**构造一个FlushRegionEntry ,包装一下Region,这个里面没有延迟时间的设置,所有入队后就会马上出队去执行flush操作 */
-
FlushRegionEntry fqe =
new FlushRegionEntry(r, forceFlushAllStores);
-
/**放入map */
-
this.regionsInQueue.put(r, fqe);
-
/** 加入待flush的队列 */
-
this.flushQueue.add(fqe);
-
}
-
}
-
}
有延迟时间设置的队列
-
@Override
-
public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) {
-
synchronized (regionsInQueue) {
-
if (!regionsInQueue.containsKey(r)) {
-
// This entry has some delay
-
FlushRegionEntry fqe =
new FlushRegionEntry(r, forceFlushAllStores);
-
/**设置过期时间 */
-
fqe.requeue(delay);
-
this.regionsInQueue.put(r, fqe);
-
this.flushQueue.add(fqe);
-
}
-
}
-
}
定时任务触发后执行Flush操作
-
private boolean flushRegion(final FlushRegionEntry fqe) {
-
Region region = fqe.region;
-
/**如果region是meta region或者说这个region的hfile太多了,都不执行flush操作 */
-
if (!region.getRegionInfo().isMetaRegion() &&
-
isTooManyStoreFiles(region)) {
-
/**文件太多,需要在阻塞时间结束后去执行合并操作 */
-
if
-
(fqe.isMaximumWait(
this.blockingWaitTime)) {
-
LOG.info(
"Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
-
"ms on a compaction to clean up 'too many store files'; waited " +
-
"long enough... proceeding with flush of " +
-
region.getRegionInfo().getRegionNameAsString());
-
}
else {
-
// If this is first time we've been put off, then emit a log message.
-
/**如果我们是第一次被推迟执行flush操作(就是说还在阻塞当中),说明有可能文件太多(因为hregion下的hfile太多的化,flush操作会很耗时,而hregionServer的flush操作又是阻塞更新的,所以这里加个限制条件,避免长时间的阻塞) */
-
if (fqe.getRequeueCount() <=
0) {
-
// Note: We don't impose blockingStoreFiles constraint on meta regions
-
LOG.warn(
"Region " + region.getRegionInfo().getRegionNameAsString() +
" has too many " +
-
"store files; delaying flush up to " +
this.blockingWaitTime +
"ms");
-
/**判断当前hRegion是否拆分,如果不拆分,就进行hfile的合并 */
-
if (!
this.server.compactSplitThread.requestSplit(region)) {
-
try {
-
this.server.compactSplitThread.requestSystemCompaction(
-
region, Thread.currentThread().getName());
-
}
catch (IOException e) {
-
LOG.error(
"Cache flush failed for region " +
-
Bytes.toStringBinary(region.getRegionInfo().getRegionName()),
-
RemoteExceptionHandler.checkIOException(e));
-
}
-
}
-
}
-
-
// Put back on the queue. Have it come back out of the queue
-
// after a delay of this.blockingWaitTime / 100 ms.
-
/**重新放入队列,设置一个延迟时间*/
this.flushQueue.add(fqe.requeue(
this.blockingWaitTime /
100));
-
// Tell a lie, it's not flushed but it's ok
-
return
true;
-
}
-
}
-
/** 其它情况,执行真正的flush*/
-
return flushRegion(region,
false, fqe.isForceFlushAllStores());
-
}
真正执行flush
-
private boolean flushRegion(final Region region, final boolean emergencyFlush,
-
boolean forceFlushAllStores) {
-
long startTime =
0;
-
synchronized (
this.regionsInQueue) {
-
/**先从regionsInQueue里面移除对应的region */
-
FlushRegionEntry fqe =
this.regionsInQueue.remove(region);
-
// Use the start time of the FlushRegionEntry if available
-
if (fqe !=
null) {
-
/**获取flush的开始时间 */
-
startTime = fqe.createTime;
-
}
-
/** 如果是强制刷新,直接将其从flushQueue里面remove调,不通过flushQueue.poll操作进行。强制刷新在Region Spilt的时候会触发,在Spilt之前,必须保证需要拆分的Region的Memestore数据刷入磁盘*/
-
if (fqe !=
null && emergencyFlush) {
-
// Need to remove from region from delay queue. When NOT an
-
// emergencyFlush, then item was removed via a flushQueue.poll.
-
flushQueue.remove(fqe);
-
}
-
}
-
if (startTime ==
0) {
-
// Avoid getting the system time unless we don't have a FlushRegionEntry;
-
// shame we can't capture the time also spent in the above synchronized
-
// block
-
startTime = EnvironmentEdgeManager.currentTime();
-
}
-
/**加读锁,阻塞写锁线程 */
-
lock.readLock().lock();
-
try {
-
/**通知flush操作的请求者,本次flush操作的类型是什么,类型有 NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK; */
-
notifyFlushRequest(region, emergencyFlush);
-
/** 执行flush*/
-
FlushResult flushResult = region.flush(forceFlushAllStores);
-
/**判断flush后hfile是否需要进行合并 */
-
boolean shouldCompact = flushResult.isCompactionNeeded();
-
// We just want to check the size
-
/**判断是否需要进行HRegion的拆分 */
-
boolean shouldSplit = ((HRegion)region).checkSplit() !=
null;
-
if (shouldSplit) {
-
this.server.compactSplitThread.requestSplit(region);
-
}
else
if (shouldCompact) {
-
server.compactSplitThread.requestSystemCompaction(
-
region, Thread.currentThread().getName());
-
}
-
if (flushResult.isFlushSucceeded()) {
-
long endTime = EnvironmentEdgeManager.currentTime();
-
server.metricsRegionServer.updateFlushTime(endTime - startTime);
-
}
-
}
catch (DroppedSnapshotException ex) {
-
// Cache flush can fail in a few places. If it fails in a critical
-
// section, we get a DroppedSnapshotException and a replay of wal
-
// is required. Currently the only way to do this is a restart of
-
// the server. Abort because hdfs is probably bad (HBASE-644 is a case
-
// where hdfs was bad but passed the hdfs check).
-
server.abort(
"Replay of WAL required. Forcing server shutdown", ex);
-
return
false;
-
}
catch (IOException ex) {
-
LOG.error(
"Cache flush failed" + (region !=
null ? (
" for region " +
-
Bytes.toStringBinary(region.getRegionInfo().getRegionName())) :
""),
-
RemoteExceptionHandler.checkIOException(ex));
-
if (!server.checkFileSystem()) {
-
return
false;
-
}
-
}
finally {
-
/**flush完成后释放读锁,并唤醒阻塞的其他线程 */
-
lock.readLock().unlock();
-
wakeUpIfBlocking();
-
}
-
return
true;
-
}
上面几个方法基本覆盖了入队和执行flush操作,下面来看一看什么时候触发,触发的时机很多(只要在执行操作的时候超过了上文提到的几个配置阀值或者是通过hbase shell手动触发),这里主要看hbase里面固有的flushHander线程定时触发。
-
private
class FlushHandler extends HasThread {
-
private FlushHandler(String name) {
-
super(name);
-
}
-
@Override
-
public void run() {
-
while (!server.isStopped()) {
-
FlushQueueEntry fqe =
null;
-
try {
-
wakeupPending.set(
false);
// allow someone to wake us up again
-
/**从队列里面取出一个待Flush的region */
-
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-
/**如果为Null或者是WakeupFlushThread,WakeupFlushThread是一个盾牌,放在队列里面,每次遇到就判断下是否超过了memstore的限制,如果超过了,就会选择一个Hregion进行flush,降低memstore的大小,第二个作用是用来唤醒flush线程,保证flushHander线程不休眠 */
-
if (fqe ==
null || fqe
instanceof WakeupFlushThread) {
-
/**如果这个RS上的总的memstore大小超过了阀值 */
-
if (isAboveLowWaterMark()) {
-
LOG.debug(
"Flush thread woke up because memory above low water="
-
+ TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark,
"",
1));
-
/**flush一个hregion的Memstore,降低memstore的总大小 */
-
if (!flushOneForGlobalPressure()) {
-
// Wasn't able to flush any region, but we're above low water mark
-
// This is unlikely to happen, but might happen when closing the
-
// entire server - another thread is flushing regions. We'll just
-
// sleep a little bit to avoid spinning, and then pretend that
-
// we flushed one, so anyone blocked will check again
-
Thread.sleep(
1000);
-
wakeUpIfBlocking();
-
}
-
// Enqueue another one of these tokens so we'll wake up again
-
wakeupFlushThread();
-
}
-
continue;
-
}
-
FlushRegionEntry fre = (FlushRegionEntry) fqe;
-
/**如果是正常的待flush的Hregion,执行flushRegion操作 */
-
if (!flushRegion(fre)) {
-
break;
-
}
-
}
catch (InterruptedException ex) {
-
continue;
-
}
catch (ConcurrentModificationException ex) {
-
continue;
-
}
catch (Exception ex) {
-
LOG.error(
"Cache flusher failed for entry " + fqe, ex);
-
if (!server.checkFileSystem()) {
-
break;
-
}
-
}
-
}
-
/**flush完了以后,清空队列里面的数据 */
-
synchronized (regionsInQueue) {
-
regionsInQueue.clear();
-
flushQueue.clear();
-
}
-
-
// Signal anyone waiting, so they see the close flag
-
/**唤醒等待的线程*/
-
wakeUpIfBlocking();
-
LOG.info(getName() +
" exiting");
-
}
-
}
重点看一下 flushOneForGlobalPressure
-
private boolean flushOneForGlobalPressure() {
-
/** 获取当前RS上的HRegion,按照Memstore从大到小排序,返回二者的映射关系 */
-
SortedMap<Long, Region> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize();
-
/**定义set,去重 */
-
Set<Region> excludedRegions =
new HashSet<Region>();
-
double secondaryMultiplier
-
= ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
-
boolean flushedOne =
false;
-
while (!flushedOne) {
-
// Find the biggest region that doesn't have too many storefiles
-
// (might be null!)
-
/** 找到一个最有可能被执行flush操作的,且这个hregion里面hfile的个数不是很多的region*/
-
Region bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions,
true);
-
// Find the biggest region, total, even if it might have too many flushes.
-
/**找到memstore最大的Hregion,不管这个hregion里面的hfile个数有多少 */
-
Region bestAnyRegion = getBiggestMemstoreRegion(
-
regionsBySize, excludedRegions,
false);
-
// Find the biggest region that is a secondary region
-
/**找到第二大的Hregion */
-
Region bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize,
-
excludedRegions);
-
-
if (bestAnyRegion ==
null && bestRegionReplica ==
null) {
-
LOG.error(
"Above memory mark but there are no flushable regions!");
-
return
false;
-
}
-
Region regionToFlush;
-
/**如果memstore最大的Hregion对应的Memstore的大小 > 2* 最有可能被执行flush操作对应的hregion(memstore不小,且hfile不多) */
-
if (bestFlushableRegion !=
null &&
-
bestAnyRegion.getMemstoreSize() >
2 * bestFlushableRegion.getMemstoreSize()) {
-
// Even if it's not supposed to be flushed, pick a region if it's more than twice
-
// as big as the best flushable one - otherwise when we're under pressure we make
-
// lots of little flushes and cause lots of compactions, etc, which just makes
-
// life worse!
-
if (LOG.isDebugEnabled()) {
-
LOG.debug(
"Under global heap pressure: " +
"Region "
-
+ bestAnyRegion.getRegionInfo().getRegionNameAsString()
-
+
" has too many " +
"store files, but is "
-
+ TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemstoreSize(),
"",
1)
-
+
" vs best flushable region's "
-
+ TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemstoreSize(),
"",
1)
-
+
". Choosing the bigger.");
-
}
-
/**选择Hifle不多且memstore也不小的Hregion执行flush */
-
regionToFlush = bestAnyRegion;
-
}
else {
-
if (bestFlushableRegion ==
null) {
-
regionToFlush = bestAnyRegion;
-
}
else {
-
regionToFlush = bestFlushableRegion;
-
}
-
}
-
-
Preconditions.checkState(
-
(regionToFlush !=
null && regionToFlush.getMemstoreSize() >
0) ||
-
(bestRegionReplica !=
null && bestRegionReplica.getMemstoreSize() >
0));
-
-
-
/**如果选择出来待flush的region为null 或者 第二个可能被执行flush操作的region对应的memstore大小 > 4 * 选择出来的待刷新的hregion对应的Memstore的大小, 那么就执行再次刷新操作 */
-
if (regionToFlush ==
null ||
-
(bestRegionReplica !=
null &&
-
ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
-
(bestRegionReplica.getMemstoreSize()
-
> secondaryMultiplier * regionToFlush.getMemstoreSize()))) {
-
LOG.info(
"Refreshing storefiles of region " + bestRegionReplica +
-
" due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
-
server.getRegionServerAccounting().getGlobalMemstoreSize()));
-
flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
-
if (!flushedOne) {
-
LOG.info(
"Excluding secondary region " + bestRegionReplica +
-
" - trying to find a different region to refresh files.");
-
excludedRegions.add(bestRegionReplica);
-
}
-
}
else {
-
LOG.info(
"Flush of region " + regionToFlush +
" due to global heap pressure. "
-
+
"Total Memstore size="
-
+ humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
-
+
", Region memstore size="
-
+ humanReadableInt(regionToFlush.getMemstoreSize()));
-
/**强制刷新这个region下面的所有hstore对应的memstore */
-
flushedOne = flushRegion(regionToFlush,
true,
true);
-
-
if (!flushedOne) {
-
LOG.info(
"Excluding unflushable region " + regionToFlush +
-
" - trying to find a different region to flush.");
-
excludedRegions.add(regionToFlush);
-
}
-
}
-
}
至此,整个HRegionServer级别的memstore flush操作触发大概过程已经分析完,如果不对,欢迎指正。