HBase版本memstore flush触发机制以及HRegionServer级别触发源码分析

一、Memstore存在的意义?


        HBase中,每个HRegionServer上有多个HRegion,每个HRegion上有多个HStore,而Memestore作为一个HStore的组成部分,当我们大量写操作发生的时候,如果超过了Memstore的设置阀值,就会执行flushHfile文件的操作。默认情况下hbase底层存储的文件系统为hdfs,但是HDFS在存储的时候直接就存了原始的数据,没有对数据进行相关的优化,比如rowkey排序,版本过滤等操作,而我们使用hbase就是想要支持快速的检索,那么就必须保证rowkey的顺序,hbase在设计的时候,加了Memstore这层,一是加快响应,二是在数据flush到磁盘之前,先排好序,先过滤垃圾数据(比如某些column family 只需要最新版本不需要存多个版本)。这样的设计,在flushhfile的时候已经对数据进行了优化,检索的时候就快很多了。

       hbase提供的配置选项里面有几个关于memstore的,在介绍之前,需要知晓的是hbasememstoreflush操作的最小执行单元是一个HRegion,首先我们来分别看一下几个关于memestore操作的几个配置:

       hbase.hregion.memstore.flush.size

      该值表示每个HRegionServer单个HRegion里的单个HStore里的Memstore的内存大小阀值,默认为128M,当单个Memstore超过这个大小时,会触发这个HRegionMemstore进行flush操作(首先最小单元的HRegion,如果多个HStore中有一个HStoreMemstore的大小超过了这个阀值就会触发整个HRegionMemstore进行flush操作,flush操作不阻塞更新)。需要注意的是,随着数据量的越来越大,单个HRegionServer上的HRegion会变得越来越多,随之改变的就是这个HReionServer上的总的Memstore的大小会变得越来越大。

       hbase.regionserver.global.memstore.size

       当单个HRegionServer上的的所有的HRegion对应的所有的Memstore之和超过了该配置,也会强制进行flush操作,而且还会阻塞更新(这是最不希望看到的,因为阻塞了这个HRegionServer上的更新操作,将会影响在这个HRegionServer上所有的HRegion的读写)。默认情况下, hbase.regionserver.global.memstore.size的大小为堆大小的40%的,当触发了flush操作之后且这个HRegionServerMemstore内存大小下降到

    

hbase.regionserver.global.memstore.lowerLimit *hbase.regionserver.global.memstore.upperLimit * hbase_heapsize
  
  

的配置的时候,释放阻塞操作(这个地方很巧妙,他不是一直flush,因为该flush操作会阻塞对当前这个HRegionServer的更新,而是只要flush到一个可以允许的最小值,就不阻塞)

       hbase.hregion.memstore.block.multiplier

       我们知道,一个HRegion里有NHStore分别对应表的不同column Family,该参数的配置就是如果一个HRegion里的所有Memstore大小超过了   

   

hbase.hregion.memstore.block.multiplier * Hbase.hRegion.memstore.flush.size
  
  

      大小,也会触发这个HRegion的flush操作。

  举个例子:

  heap:1G

     hbase.regionserver.global.memstore.size = 1*1024M*40%=410M

     hbase.regionserver.global.memstore.size.lower.limit =1*1024M*40%*0.95=390M

     hbase.hregion.memstore.flush.size = 128M

     hbase.hregion.memstore.block.multiplier = 4

     现在假设:单个HRegionServer上有4HRegion,其中每个HRgion里面只有一个HStore

     HStore1 已使用memstore 100M

     HStore2 已使用memstore 110M

     HStore3 已使用memstore 110M

     HStore4 已使用memstore 100M

      虽然单个HStore的都没有超过默认的128M配置,但是总大小已经超过了 hbase.regionserver.global.memstore.size的值 那么也会触发flush操作,并且还会阻塞这个HRegion的更新操作。

       所以,我们要权衡单个HRegionServer上的总的HRegion的个数,以及一个HRegion里面的HStore数,合理设置上述配置值。 说完了一些关于memstore的配置,实际就是触发执行Memstore-flush操作的时机。


二、HRegionServer级别的flush操作源码分析


       当触发了HRegionServer级别的flush,会阻塞更新,在每个HRegionServer触发了flush之后,实际还是会细化到HRegion级别的flush。因为在执行flush的时候肯定是每个HS里面的HRegion分别进行flush操作。

       在HRgionServer类里面,有一个成员变量,专门用来处理flush操作

     

protected MemStoreFlusher cacheFlusher;
  
  
     点进去看具体的实现, MemstoreFlusher类的源码 ,下面是一些重要的变量定义

    


  
  
  1. /** 一个延迟非阻塞队列,里面放的是待flush的HRegion */
  2. private final BlockingQueue<FlushQueueEntry> flushQueue =
  3. new DelayQueue<FlushQueueEntry>();
  4. /**Map类型,key为代刷新的HRegion,value为改HRegion做了一次封装后的对象 */
  5. private final Map<Region, FlushRegionEntry> regionsInQueue =
  6. new HashMap<Region, FlushRegionEntry>();
  7. /**线程唤醒 */
  8. private AtomicBoolean wakeupPending = new AtomicBoolean();
  9. /**线程唤醒频率*/
  10. private final long threadWakeFrequency;
  11. //持有HRegionServer的引用
  12. private final HRegionServer server;
  13. /** 读写锁*/
  14. private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  15. /**一个对象,在线程wait和Notify的时候使用 */
  16. private final Object blockSignal = new Object();
  17. /**全局的Memstore大小限制 */
  18. protected long globalMemStoreLimit;
  19. /**限制因子的百分比 */
  20. protected float globalMemStoreLimitLowMarkPercent;
  21. /**限制大小 */
  22. protected long globalMemStoreLimitLowMark;
  23. /**阻塞等待时间 */
  24. private long blockingWaitTime;
  25. private final Counter updatesBlockedMsHighWater = new Counter();
  26. /**处理flush操作的线程数 */
  27. private final FlushHandler[] flushHandlers;
  28. private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>( 1);
     在构造函数里面进行了初始化操作

   


  
  
  1. public MemStoreFlusher(final Configuration conf,
  2. final HRegionServer server) {
  3. super();
  4. this.conf = conf;
  5. this.server = server;
  6. /**线程唤醒频率,默认10s,主要为了防止处理HRegion执行flush操作的线程休眠 */
  7. this.threadWakeFrequency =
  8. conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
  9. /**获取最大的堆大小 */
  10. long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
  11. /** 获取全局memstore所占堆内存的百分比globalMemStorePercent,默认是0.4f */
  12. float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
  13. /**计算全局Memstore的内存大小限制,默认是堆内存的40% */
  14. this.globalMemStoreLimit = ( long) (max * globalMemStorePercent);
  15. /**获取全局Memstore的内存限制的最低百分比 ,默认配置的0.95f*/
  16. this.globalMemStoreLimitLowMarkPercent =
  17. HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
  18. /**获取全局Memstore的内存限制的最低值,默认是堆大小 * 0.4 * 0.95 */
  19. this.globalMemStoreLimitLowMark =
  20. ( long) ( this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
  21. /**阻塞等待时间 */
  22. this.blockingWaitTime = conf.getInt( "hbase.hstore.blockingWaitTime",
  23. 90000);
  24. /**处理队列里面待Flush操作的HRegion的线程数,默认是2个 */
  25. int handlerCount = conf.getInt( "hbase.hstore.flusher.count", 2);
  26. this.flushHandlers = new FlushHandler[handlerCount];
  27. LOG.info( "globalMemStoreLimit="
  28. + TraditionalBinaryPrefix.long2String( this.globalMemStoreLimit, "", 1)
  29. + ", globalMemStoreLimitLowMark="
  30. + TraditionalBinaryPrefix.long2String( this.globalMemStoreLimitLowMark, "", 1)
  31. + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
  32. }
    
      将需要执行flush的HRegion加入队列

  


  
  
  1. Override
  2. public void requestFlush (Region r, boolean forceFlushAllStores) {
  3. synchronized (regionsInQueue) {
  4. /**如果队列里面没有这个Region */
  5. if (!regionsInQueue.containsKey(r)) {
  6. /**构造一个FlushRegionEntry ,包装一下Region,这个里面没有延迟时间的设置,所有入队后就会马上出队去执行flush操作 */
  7. FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
  8. /**放入map */
  9. this.regionsInQueue.put(r, fqe);
  10. /** 加入待flush的队列 */
  11. this.flushQueue.add(fqe);
  12. }
  13. }
  14. }
    有延迟时间设置的队列

    


  
  
  1. @Override
  2. public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) {
  3. synchronized (regionsInQueue) {
  4. if (!regionsInQueue.containsKey(r)) {
  5. // This entry has some delay
  6. FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
  7. /**设置过期时间 */
  8. fqe.requeue(delay);
  9. this.regionsInQueue.put(r, fqe);
  10. this.flushQueue.add(fqe);
  11. }
  12. }
  13. }
    定时任务触发后执行Flush操作

   

  
  
  1. private boolean flushRegion(final FlushRegionEntry fqe) {
  2. Region region = fqe.region;
  3. /**如果region是meta region或者说这个region的hfile太多了,都不执行flush操作 */
  4. if (!region.getRegionInfo().isMetaRegion() &&
  5. isTooManyStoreFiles(region)) {
  6. /**文件太多,需要在阻塞时间结束后去执行合并操作 */
  7. if
  8. (fqe.isMaximumWait( this.blockingWaitTime)) {
  9. LOG.info( "Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
  10. "ms on a compaction to clean up 'too many store files'; waited " +
  11. "long enough... proceeding with flush of " +
  12. region.getRegionInfo().getRegionNameAsString());
  13. } else {
  14. // If this is first time we've been put off, then emit a log message.
  15. /**如果我们是第一次被推迟执行flush操作(就是说还在阻塞当中),说明有可能文件太多(因为hregion下的hfile太多的化,flush操作会很耗时,而hregionServer的flush操作又是阻塞更新的,所以这里加个限制条件,避免长时间的阻塞) */
  16. if (fqe.getRequeueCount() <= 0) {
  17. // Note: We don't impose blockingStoreFiles constraint on meta regions
  18. LOG.warn( "Region " + region.getRegionInfo().getRegionNameAsString() + " has too many " +
  19. "store files; delaying flush up to " + this.blockingWaitTime + "ms");
  20. /**判断当前hRegion是否拆分,如果不拆分,就进行hfile的合并 */
  21. if (! this.server.compactSplitThread.requestSplit(region)) {
  22. try {
  23. this.server.compactSplitThread.requestSystemCompaction(
  24. region, Thread.currentThread().getName());
  25. } catch (IOException e) {
  26. LOG.error( "Cache flush failed for region " +
  27. Bytes.toStringBinary(region.getRegionInfo().getRegionName()),
  28. RemoteExceptionHandler.checkIOException(e));
  29. }
  30. }
  31. }
  32. // Put back on the queue. Have it come back out of the queue
  33. // after a delay of this.blockingWaitTime / 100 ms.
  34. /**重新放入队列,设置一个延迟时间*/ this.flushQueue.add(fqe.requeue( this.blockingWaitTime / 100));
  35. // Tell a lie, it's not flushed but it's ok
  36. return true;
  37. }
  38. }
  39. /** 其它情况,执行真正的flush*/
  40. return flushRegion(region, false, fqe.isForceFlushAllStores());
  41. }

       真正执行flush

  


  
  
  1. private boolean flushRegion(final Region region, final boolean emergencyFlush,
  2. boolean forceFlushAllStores) {
  3. long startTime = 0;
  4. synchronized ( this.regionsInQueue) {
  5. /**先从regionsInQueue里面移除对应的region */
  6. FlushRegionEntry fqe = this.regionsInQueue.remove(region);
  7. // Use the start time of the FlushRegionEntry if available
  8. if (fqe != null) {
  9. /**获取flush的开始时间 */
  10. startTime = fqe.createTime;
  11. }
  12. /** 如果是强制刷新,直接将其从flushQueue里面remove调,不通过flushQueue.poll操作进行。强制刷新在Region Spilt的时候会触发,在Spilt之前,必须保证需要拆分的Region的Memestore数据刷入磁盘*/
  13. if (fqe != null && emergencyFlush) {
  14. // Need to remove from region from delay queue. When NOT an
  15. // emergencyFlush, then item was removed via a flushQueue.poll.
  16. flushQueue.remove(fqe);
  17. }
  18. }
  19. if (startTime == 0) {
  20. // Avoid getting the system time unless we don't have a FlushRegionEntry;
  21. // shame we can't capture the time also spent in the above synchronized
  22. // block
  23. startTime = EnvironmentEdgeManager.currentTime();
  24. }
  25. /**加读锁,阻塞写锁线程 */
  26. lock.readLock().lock();
  27. try {
  28. /**通知flush操作的请求者,本次flush操作的类型是什么,类型有 NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK; */
  29. notifyFlushRequest(region, emergencyFlush);
  30. /** 执行flush*/
  31. FlushResult flushResult = region.flush(forceFlushAllStores);
  32. /**判断flush后hfile是否需要进行合并 */
  33. boolean shouldCompact = flushResult.isCompactionNeeded();
  34. // We just want to check the size
  35. /**判断是否需要进行HRegion的拆分 */
  36. boolean shouldSplit = ((HRegion)region).checkSplit() != null;
  37. if (shouldSplit) {
  38. this.server.compactSplitThread.requestSplit(region);
  39. } else if (shouldCompact) {
  40. server.compactSplitThread.requestSystemCompaction(
  41. region, Thread.currentThread().getName());
  42. }
  43. if (flushResult.isFlushSucceeded()) {
  44. long endTime = EnvironmentEdgeManager.currentTime();
  45. server.metricsRegionServer.updateFlushTime(endTime - startTime);
  46. }
  47. } catch (DroppedSnapshotException ex) {
  48. // Cache flush can fail in a few places. If it fails in a critical
  49. // section, we get a DroppedSnapshotException and a replay of wal
  50. // is required. Currently the only way to do this is a restart of
  51. // the server. Abort because hdfs is probably bad (HBASE-644 is a case
  52. // where hdfs was bad but passed the hdfs check).
  53. server.abort( "Replay of WAL required. Forcing server shutdown", ex);
  54. return false;
  55. } catch (IOException ex) {
  56. LOG.error( "Cache flush failed" + (region != null ? ( " for region " +
  57. Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
  58. RemoteExceptionHandler.checkIOException(ex));
  59. if (!server.checkFileSystem()) {
  60. return false;
  61. }
  62. } finally {
  63. /**flush完成后释放读锁,并唤醒阻塞的其他线程 */
  64. lock.readLock().unlock();
  65. wakeUpIfBlocking();
  66. }
  67. return true;
  68. }
   上面几个方法基本覆盖了入队和执行flush操作,下面来看一看什么时候触发触发的时机很多(只要在执行操作的时候超过了上文提到的几个配置阀值或者是通过hbase shell手动触发),这里主要看hbase里面固有的flushHander线程定时触发。

 


  
  
  1. private class FlushHandler extends HasThread {
  2. private FlushHandler(String name) {
  3. super(name);
  4. }
  5. @Override
  6. public void run() {
  7. while (!server.isStopped()) {
  8. FlushQueueEntry fqe = null;
  9. try {
  10. wakeupPending.set( false); // allow someone to wake us up again
  11. /**从队列里面取出一个待Flush的region */
  12. fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
  13. /**如果为Null或者是WakeupFlushThread,WakeupFlushThread是一个盾牌,放在队列里面,每次遇到就判断下是否超过了memstore的限制,如果超过了,就会选择一个Hregion进行flush,降低memstore的大小,第二个作用是用来唤醒flush线程,保证flushHander线程不休眠 */
  14. if (fqe == null || fqe instanceof WakeupFlushThread) {
  15. /**如果这个RS上的总的memstore大小超过了阀值 */
  16. if (isAboveLowWaterMark()) {
  17. LOG.debug( "Flush thread woke up because memory above low water="
  18. + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
  19. /**flush一个hregion的Memstore,降低memstore的总大小 */
  20. if (!flushOneForGlobalPressure()) {
  21. // Wasn't able to flush any region, but we're above low water mark
  22. // This is unlikely to happen, but might happen when closing the
  23. // entire server - another thread is flushing regions. We'll just
  24. // sleep a little bit to avoid spinning, and then pretend that
  25. // we flushed one, so anyone blocked will check again
  26. Thread.sleep( 1000);
  27. wakeUpIfBlocking();
  28. }
  29. // Enqueue another one of these tokens so we'll wake up again
  30. wakeupFlushThread();
  31. }
  32. continue;
  33. }
  34. FlushRegionEntry fre = (FlushRegionEntry) fqe;
  35. /**如果是正常的待flush的Hregion,执行flushRegion操作 */
  36. if (!flushRegion(fre)) {
  37. break;
  38. }
  39. } catch (InterruptedException ex) {
  40. continue;
  41. } catch (ConcurrentModificationException ex) {
  42. continue;
  43. } catch (Exception ex) {
  44. LOG.error( "Cache flusher failed for entry " + fqe, ex);
  45. if (!server.checkFileSystem()) {
  46. break;
  47. }
  48. }
  49. }
  50. /**flush完了以后,清空队列里面的数据 */
  51. synchronized (regionsInQueue) {
  52. regionsInQueue.clear();
  53. flushQueue.clear();
  54. }
  55. // Signal anyone waiting, so they see the close flag
  56. /**唤醒等待的线程*/
  57. wakeUpIfBlocking();
  58. LOG.info(getName() + " exiting");
  59. }
  60. }
   重点看一下 flushOneForGlobalPressure

  


  
  
  1. private boolean flushOneForGlobalPressure() {
  2. /** 获取当前RS上的HRegion,按照Memstore从大到小排序,返回二者的映射关系 */
  3. SortedMap<Long, Region> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize();
  4. /**定义set,去重 */
  5. Set<Region> excludedRegions = new HashSet<Region>();
  6. double secondaryMultiplier
  7. = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
  8. boolean flushedOne = false;
  9. while (!flushedOne) {
  10. // Find the biggest region that doesn't have too many storefiles
  11. // (might be null!)
  12. /** 找到一个最有可能被执行flush操作的,且这个hregion里面hfile的个数不是很多的region*/
  13. Region bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions, true);
  14. // Find the biggest region, total, even if it might have too many flushes.
  15. /**找到memstore最大的Hregion,不管这个hregion里面的hfile个数有多少 */
  16. Region bestAnyRegion = getBiggestMemstoreRegion(
  17. regionsBySize, excludedRegions, false);
  18. // Find the biggest region that is a secondary region
  19. /**找到第二大的Hregion */
  20. Region bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize,
  21. excludedRegions);
  22. if (bestAnyRegion == null && bestRegionReplica == null) {
  23. LOG.error( "Above memory mark but there are no flushable regions!");
  24. return false;
  25. }
  26. Region regionToFlush;
  27. /**如果memstore最大的Hregion对应的Memstore的大小 > 2* 最有可能被执行flush操作对应的hregion(memstore不小,且hfile不多) */
  28. if (bestFlushableRegion != null &&
  29. bestAnyRegion.getMemstoreSize() > 2 * bestFlushableRegion.getMemstoreSize()) {
  30. // Even if it's not supposed to be flushed, pick a region if it's more than twice
  31. // as big as the best flushable one - otherwise when we're under pressure we make
  32. // lots of little flushes and cause lots of compactions, etc, which just makes
  33. // life worse!
  34. if (LOG.isDebugEnabled()) {
  35. LOG.debug( "Under global heap pressure: " + "Region "
  36. + bestAnyRegion.getRegionInfo().getRegionNameAsString()
  37. + " has too many " + "store files, but is "
  38. + TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemstoreSize(), "", 1)
  39. + " vs best flushable region's "
  40. + TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemstoreSize(), "", 1)
  41. + ". Choosing the bigger.");
  42. }
  43. /**选择Hifle不多且memstore也不小的Hregion执行flush */
  44. regionToFlush = bestAnyRegion;
  45. } else {
  46. if (bestFlushableRegion == null) {
  47. regionToFlush = bestAnyRegion;
  48. } else {
  49. regionToFlush = bestFlushableRegion;
  50. }
  51. }
  52. Preconditions.checkState(
  53. (regionToFlush != null && regionToFlush.getMemstoreSize() > 0) ||
  54. (bestRegionReplica != null && bestRegionReplica.getMemstoreSize() > 0));
  55. /**如果选择出来待flush的region为null 或者 第二个可能被执行flush操作的region对应的memstore大小 > 4 * 选择出来的待刷新的hregion对应的Memstore的大小, 那么就执行再次刷新操作 */
  56. if (regionToFlush == null ||
  57. (bestRegionReplica != null &&
  58. ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
  59. (bestRegionReplica.getMemstoreSize()
  60. > secondaryMultiplier * regionToFlush.getMemstoreSize()))) {
  61. LOG.info( "Refreshing storefiles of region " + bestRegionReplica +
  62. " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
  63. server.getRegionServerAccounting().getGlobalMemstoreSize()));
  64. flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
  65. if (!flushedOne) {
  66. LOG.info( "Excluding secondary region " + bestRegionReplica +
  67. " - trying to find a different region to refresh files.");
  68. excludedRegions.add(bestRegionReplica);
  69. }
  70. } else {
  71. LOG.info( "Flush of region " + regionToFlush + " due to global heap pressure. "
  72. + "Total Memstore size="
  73. + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
  74. + ", Region memstore size="
  75. + humanReadableInt(regionToFlush.getMemstoreSize()));
  76. /**强制刷新这个region下面的所有hstore对应的memstore */
  77. flushedOne = flushRegion(regionToFlush, true, true);
  78. if (!flushedOne) {
  79. LOG.info( "Excluding unflushable region " + regionToFlush +
  80. " - trying to find a different region to flush.");
  81. excludedRegions.add(regionToFlush);
  82. }
  83. }
  84. }

   至此,整个HRegionServer级别的memstore flush操作触发大概过程已经分析完,如果不对,欢迎指正。

猜你喜欢

转载自blog.csdn.net/qq_38025219/article/details/88683269