HiveMetaStore模块代码分析及多个节点压力测试

 

从package结构来看,主要的5个package,让我们来看看这几个package的内容
(1)m有点etastore:是metastore模块的入口,也是整个metastore模块的核心所在,里面包含了HiveMetaStore类作为整个模块的核心,接收来自hive的请求,返回需要的信息。HiveMetaStore模块代码分析

从package结构来看,主要的5个package,让我们来看看这几个package的内容

(2)metastore.api:包含了调用和访问metastore模块的接口以及接口参数和返回值类型,metastore模块的用户可以通过api对metastore模块进行访问。

(3)metastore.events:用于metastore模块内部的观察者模式。因为metastore模块是支持notification通知机制和一些其他的后续处理的。通过观察者模式,当metastore对元信息进行一些操作以后,会同时产生一些event,这些event会被它们的listener捕获,并作出一些相应的处理,如发出一些通知等。

(4)metastore.model:与数据持久化相关,metastore模块通过datanucleus库将model持久化到数据库,这里的model与数据库中的表是对应的。

(5)metastore.tools:是供后台的元数据管理员对元信息进行查看和修改的工具。

原理结构:

HiveMetaStore模块代码分析
 

HiveMetaStore模块代码分析

Hive

     hive内所有的对于元数据的操作都通过生成一个DDL Task,在task中完成。Hive 是对Hive元数据的最顶层的抽象。DDL Task通过操作Hive类,完成各种DDL操作,例如createTasb,dropTable等。Hive中提供的接口几乎与DDL的操作一一对应。

HiveMetaStoreClient

     是Hive连接MetaStore的客户端。HiveMetaStoreClient首先会检查hive.metastore.local是否为true,就是说metastore是否是在本地。如果在本地,则生成一个服务端,并通过函数调用的方式连接。如果为false,metastore不再本地,则生成一个Thrift的客户端,并连接Thrift的服务端(服务端必须首先在远程被启动,通过hive --server metastore命令)。HiveMetaStoreClient继承自IMetaStoreClient接口,HiveMetaStoreClient可以通过本地和远程两种方式访问和调用HiveMetaStore的Server。

     如果是远程连接,用户可以指定多个uri,用户连接thrift server。HiveMetaStoreClient会依次尝试连接,直到找到一个可用连接。

     无论是本地还是远程,接口都是统一的。

HiveMetaHook

     MetaStoreHook提供了从HiveMetaStoreClient向StorageHandler通知的机制,每当HiveMetaStoreClient对MetaStore进行操作时,会首先调用Hook中的接口,通知StorageHandler采取相应的准备操作。这样当数据存储方式不是文件时,可以保持数据结构和元数据信息是一致的。

HiveMetaStore

     HiveMetaStore是HiveMetaStoreClient的服务端,可能与HiveMetaStoreClient在同一台服务器上,也可以不再统一服务器上。但是HiveMetaStore一定与实际存储Meta信息的数据库(或其他存储介质,这里以数据库为例)在一台服务器上。

     核心部分是HiveMetaStore的内部类HMSHandler,它继承自IHMSHandler接口,IHMSHandler又继承自ThrifHiveMetastore.Iface接口,提供通过Thrift方式进行远程的调用。在HiveMetaStore.HMSHandler内实现了接口的所有metastore模块对外的方法。

     HiveMetaStore将Hive的业务逻辑转换为实际的数据操作。并调用RawStore的接口进行存储和读写操作。

     因为HiveMetaStore与数据在同一端,所以可以进行错误检测以及重试操作。例如要创建的表是否已经存在,等等。HiveMetaStore也会利用RawStore提供的transaction接口,在操作前都要open一个transaction。

RawStore

     RawStore定义了一套接口,定义了对元数据操作的一套机制。每一个接口可以看作是一个原子操作。RawStore提供了transaction机制。确保数据的一致性的同时,保证了性能。

     RawStore可以有多种实现,Hive目前实现的是ObjectStore类,即对象存储。通过JDO技术,把每一个要存储数据通过Model定义成一个对象。JDO负责实际存储一个对象。JDO底层可以连接多种存储方式,存储方式可以通过配置参数的javax.jdo.option直接传递给JDO。目前常用的是derby和mysql。

     JDO会将一个对象当作表进行存储。ObjectStore中的transaction机制也是通过JDO提供的transaction实现的。当commit失败时,将rollback所有操作。

ObjectStore

     继承自RowStore接口,是用于对数据进行持久化的部分,Object可以从数据库中获取数据并映射到model的对象中,或者将model中的对象存入数据库。

HiveAlterHandler

     继承自AlterHandler接口,从HMSHandler分享出来专门进行Alter一类操作。

Warehouse

     主要作用是对HDFS上的文件进行操作。因为在修改元信息的同时可能会涉及到HDFS的一些文件操作,如mkdir,delteDir等操作。

MetaStorePreEventListener,MetaStoreEventListener,MetaStoreEndFunctionListener

     通过观察者模式对产生的event进行相应的处理的观察者。这三个类都是抽象类,由其他一些具体的类来继承和实现。

MetaStore模块与其他模块间的耦合

     MetaStore是一个独立的模块,但由于Hive的逻辑,在进行DDL操作时,不仅会对元数据操作,同时也需要存储数据的系统进行配合。

     问题在于,Hive目前默认数据都是存储在文件系统中(虽然没有定义具体的文件格式),也利用了文件系统的中的一些特性。例如,元数据中的DataBase,Table,Partition都是通过目录实现的。任何一个对DataBase,Table,Partition的操作,除了需要对元数据进行修改意外,可能还需要对文件系统进行操作。例如创建一个partition,同时需要在文件系统中创建一个目录。这是MetaStore与其他模块耦合的地方。

     目前,HiveMetaHook实现了对部分耦合的剥离,允许通过hook,减少MetaStore与数据存储方式的依赖。但是目前并没有实现完全剥离。在Hive.java,HiveMetaStore.java中依然会调用FileSystem和WareHouse的接口(WareHouse是数据存储的抽象,假设数据全部存储在文件中,WareHouse也会调用FileSystem接口)。

     如果要支持其他数据存储方式,需要将这些耦合全部剥离,或者提供一个临时文件目录,模拟数据存储的文件系统。

 

 

 

其中PersistenceManager负责控制一组持久化对象包括创建持久化对象和查询对象,它是ObjectStore的一个实例变量,每个ObjectStore拥有一个pm,RawStore是metastore逻辑层和物理底层元数据库(比如derby)交互的接口类,ObjectStore是RawStore的默认实现类。Hive Metastore Server启动的时候会指定一个TProcessor,包装了一个HMSHandler,内部有一个ThreadLocal threadLocalMS实例变量,每个thread维护一个RawStore

 

  private final ThreadLocal threadLocalMS =
        new ThreadLocal() {
          @Override
          protected synchronized RawStore initialValue() {
            return null;
          }
        };


每一个从hive metastore client过来的请求都会从线程池中分配一个WorkerProcess来处理,在HMSHandler中每一个方法都会通过getMS()获取rawstore instance来做具体操作

 

public RawStore getMS() throws MetaException {
RawStore ms = threadLocalMS.get();
      if (ms == null) {
        ms = newRawStore();
        ms.verifySchema();
        threadLocalMS.set(ms);
        ms = threadLocalMS.get();
      }
      return ms;
}

看得出来RawStore是延迟加载,初始化后绑定到threadlocal变量中可以为以后复用

private RawStore newRawStore() throws MetaException {   LOG.info(addPrefix("Opening raw store with implemenation class:"
          + rawStoreClassName));
      Configuration conf = getConf();
      return RetryingRawStore.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get());
}


RawStore使用了动态代理模式(继承InvocationHandler接口),内部实现了invoke函数,通过method.invoke()执行真正的逻辑,这样的好处是可以在method.invoke()上下文中添加自己其他的逻辑,RetryingRawStore就是在通过捕捉invoke函数抛出的异常,来达到重试的效果。由于使用reflection机制,异常是wrap在InvocationTargetException中的,不过在hive 0.9中竟然在捕捉到此异常后直接throw出来了,而不是retry,明显不对啊。我对它修改了下,拿出wrap的target exception,判断是不是instance of jdoexception的,再做相应的处理

@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; boolean gotNewConnectUrl = false; boolean reloadConf = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.METASTOREFORCERELOADCONF); boolean reloadConfOnJdoException = false; if (reloadConf) { updateConnectionURL(getConf(), null); } int retryCount = 0; Exception caughtException = null; while (true) { try { if (reloadConf || gotNewConnectUrl || reloadConfOnJdoException) { initMS(); } ret = method.invoke(base, args); break; } catch (javax.jdo.JDOException e) { caughtException = (javax.jdo.JDOException) e.getCause(); } catch (UndeclaredThrowableException e) { throw e.getCause(); } catch (InvocationTargetException e) { Throwable t = e.getTargetException(); if (t instanceof JDOException){ caughtException = (JDOException) e.getTargetException(); reloadConfOnJdoException = true; LOG.error("rawstore jdoexception:" + caughtException.toString()); }else { throw e.getCause(); } } if (retryCount >= retryLimit) { throw caughtException; } assert (retryInterval >= 0); retryCount++; LOG.error( String.format( "JDO datastore error. Retrying metastore command " + "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit)); Thread.sleep(retryInterval); // If we have a connection error, the JDO connection URL hook might // provide us with a new URL to access the datastore. String lastUrl = getConnectionURL(getConf()); gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl); } return ret; }


初始化RawStore有两种方式,一种是在RetryingRawStore的构造函数中调用"this.base = (RawStore) ReflectionUtils.newInstance(rawStoreClass, conf);" 因为ObjectStore实现了Configurable,在newInstance方法中主动调用里面的setConf(conf)方法初始化RawStore,还有一种情况是在捕捉到异常后retry,也会调用base.setConf(getConf());

private void initMS() { base.setConf(getConf()); }

 

ObjectStore的setConf方法中,先将PersistenceManagerFactory锁住,pm close掉,设置成NULL,再初始化pm

public void setConf(Configuration conf) { // Although an instance of ObjectStore is accessed by one thread, there may // be many threads with ObjectStore instances. So the static variables // pmf and prop need to be protected with locks. pmfPropLock.lock(); try { isInitialized = false; hiveConf = conf; Properties propsFromConf = getDataSourceProps(conf); boolean propsChanged = !propsFromConf.equals(prop); if (propsChanged) { pmf = null; prop = null; } assert(!isActiveTransaction()); shutdown(); // Always want to re-create pm as we don't know if it were created by the // most recent instance of the pmf pm = null; openTrasactionCalls = 0; currentTransaction = null; transactionStatus = TXN_STATUS.NO_STATE; initialize(propsFromConf); if (!isInitialized) { throw new RuntimeException( "Unable to create persistence manager. Check dss.log for details"); } else { LOG.info("Initialized ObjectStore"); } } finally { pmfPropLock.unlock(); } }
private void initialize(Properties dsProps) { LOG.info("ObjectStore, initialize called"); prop = dsProps; pm = getPersistenceManager(); isInitialized = pm != null; return; }


回到一开始报错的那段信息,怎么会Persistence Manager会被关闭呢,仔细排查后才发现是由于HCatalog使用HiveMetastoreClient用完后主动调用了close方法,而一般Hive里面内部不会调这个方法.

HiveMetaStoreClient.java

public void close() { isConnected = false; try { if (null != client) { client.shutdown(); } } catch (TException e) { LOG.error("Unable to shutdown local metastore client", e); } // Transport would have got closed via client.shutdown(), so we dont need this, but // just in case, we make this call. if ((transport != null) && transport.isOpen()) { transport.close(); } }


对应server端HMSHandler中的shutdown方法

@Override public void shutdown() { logInfo("Shutting down the object store..."); RawStore ms = threadLocalMS.get(); if (ms != null) { ms.shutdown(); ms = null; } logInfo("Metastore shutdown complete."); }


ObjectStore的shutdown方法

public void shutdown() { if (pm != null) { pm.close(); } }

 

我们看到shutdown方法里面只是把当前thread的ObjectStore拿出来后,做了一个ObjectStore shutdown方法,把pm关闭了。但是并没有把ObjectStore销毁掉,它还是存在于threadLocalMS中,下次还是会被拿出来,下一次这个thread服务于另外一个请求的时候又会被get出ObjectStore来,但是由于里面的pm已经close掉了所以肯定抛异常。正确的做法是应该加上threadLocalMS.remove()或者threadLocalMS.set(null),主动将其从ThreadLocalMap中删除。

修改后的shutdown方法

public void shutdown() { logInfo("Shutting down the object store..."); RawStore ms = threadLocalMS.get(); if (ms != null) { ms.shutdown(); ms = null; threadLocalMS.remove(); } logInfo("Metastore shutdown complete."); }  
一、             简述原理 
    metastore服务端配置metastore相关参数,并启动metastore服务进程.hive客户端配置连接服务端metastore参数。客户端读取hive元数据,请求(HiveMetaStoreClient)发送到metastore服务端(HiveMetaStore),服务端查询(JDO-mysql)并返回元数据信息给客户端。好处是:hive客户端配置文件中无mysql地址及口令信息,提高元数据安全性。
如果第一个MetaSoreServer请求失败,收到回执信息,才会请求下一个MetaSoreServer服务,为非广播形式发送读取请求。
已解决,查看修改后客户端源码。 
二、             测试部署
 
Hive:0.8.1  Hadoop: hadoop-0.20.2-cdh3u3-x  
a)         Mysql元数据IP为:172.XX.XX.137  Hive metastore service: 172.XX.XX.137,172.XX.XX.136     Hive client: 172.XX.XX.134
b)         在172.XX.XX.137(部署方式和部署Hive客户端一致)
监听端口(默认是 9083)
通过环境变量METASTORE_PORT指定或者通过-p指定
c)         启动metastoreServer :nohup hive –service metastore & (注意service前是2个-)
d)         修改Hive客户端134配置文件,指向metastore Server提供服务的地址
相关配置参数
 
  hive.metastore.warehouse.dir 
  hdfs://BJ-YW-test-HA-6126.jd.com:54310/user/user/warehouse 
 
 
 hive.metastore.local 
  false 
 
 
 
  hive.metastore.uris 
    thrift://172.XX.XX.XX:9083, thrift://172.17.6.136:9083 
 
 
测试内容
   
     客户端134正常连接hive,读取表元数据信息
     断开137 metastore服务,客户端134正常读取元数据信息
     断开 136 metastore服务,客户端134正常读取元数据信息
 
 
Hive:0.12.0  Hadoop: hadoop-2.0.0-cdh4.3.0   
Mysql元数据IP为:172.XX.XX.116  Hive metastore service: 172.XX.XX.116 ,172.XX.XX.84    Hive client: 172.XX.XX.116
e)         在172.XX.XX.86(部署方式和部署Hive客户端一致)
监听端口(默认是 9083)
通过环境变量METASTORE_PORT指定或者通过-p指定
f)          启动metastoreServer :nohup hive –service metastore & (注意service前是2个-)
g)         修改Hive客户端134配置文件,指向metastore Server提供服务的地址
相关配置参数
  hive.metastore.warehouse.dir 
  hdfs://ns1/user/hadp/yw 
  hive.metastore.client.socket.timeout  
  3600
  hive.metastore.uris 
    thrift://172.XX.XX.116:9083, thrift://172.XX.XX.84:9083
测试内容
   
        客户端86正常连接hive,读取表元数据信息
        断开116 metastore服务,客户端86正常读取元数据信息
        断开 84 metastore服务,客户端86正常读取元数据信息
        断开所有metastore服务,客户端86无法读取元数据信息
        恢复其中一台metastore服务,客户端正常读取元数据信息
        客户端调用metastore服务新建表、删表成功
metastore相关参数(红色字段为本次使用属性):
服务端
 
hive.metastore.event.listeners:metastore的事件监听器列表,逗号隔开,默认是空;
hive.metastore.authorization.storage.checks:在做类似drop partition操作时,metastore是否要认证权限,默认是false;
hive.metastore.event.expiry.duration:事件表中事件的过期时间,默认是0;
hive.metastore.event.clean.freq:metastore中清理过期事件的定时器的运行周期,默认是0;
javax.jdo.option.Multithreaded:是否支持并发访问metastore,默认是true;
 
hive.metastore.server.min.threads:在thrift服务池中最小的工作线程数,默认是200;
hive.metastore.server.max.threads:最大线程数,默认是100000;
hive.metastore.server.tcp.keepalive:metastore的server是否开启长连接,长连可以预防半连接的积累,默认是true;
hive.metastore.sasl.enabled:metastore thrift接口的安全策略,开启则用SASL加密接口,客户端必须要用Kerberos机制鉴权,默认是不开启false;
hive.metastore.kerberos.keytab.file:在开启sasl后kerberos的keytab文件存放路径,默认是空;
hive.metastore.ds.connection.url.hook:查找JDO连接url时hook的名字,默认是javax.jdo.option.ConnectionURL;
hive.metastore.kerberos.principal:kerberos的principal,_HOST部分会动态替换,默认是hive-metastore/[email protected]
hive.metastore.batch.retrieve.max:在一个批处理获取中,能从metastore里取出的最大记录数,默认是300
hive.metastore.cache.pinobjtypes:在cache中支持的metastore的对象类型,由逗号分隔,默认是Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order;
hive.metastore.schema.verification:强制metastore的schema一致性,开启的话会校验在metastore中存储的信息的版本和hive的jar包中的版本一致性,并且关闭自动schema迁移,用户必须手动的升级hive并且迁移schema,关闭的话只会在版本不一致时给出警告,默认是false不开启;
hive.metastore.archive.intermediate.original :用于归档压缩的原始中间目录的后缀,这些目录是什么并不重要,只要能够避免冲突即可。
hive.metastore.archive.intermediate.archived :用于归档压缩的压缩后的中间目录的后缀,这些目录是什么并不重要,只要能够避免冲突即可。
hive.metastore.archive.intermediate.extracted :用于归档压缩的解压后的中间目录的后缀,这些目录是什么并不重要,只要能够避免冲突即可。
hive.metastore.partition.inherit.table.properties:当新建分区时自动继承的key列表,默认是空;
hive.metastore.end.function.listeners:metastore函数执行结束时的监听器列表,默认是空;
hive.metastore.execute.setugi:非安全模式,设置为true会令metastore以客户端的用户和组权限执行DFS操作,默认是false,这个属性需要服务端和客户端同时设置;
hive.metastore.rawstore.impl:原始metastore的存储实现类,默认是org.apache.hadoop.hive.metastore.ObjectStore;
 
 
客户端
hive.metastore.uris: 客户端连接远程metastore服务地址端口
hive.metastore.local:控制hive是否连接一个远程metastore服务器还是开启一个本地客户端jvm,默认是true,Hive0.10已经取消了该配置项;
hive.metastore.warehouse.dir:指定Hive的存储目录
hive.metastore.client.socket.timeout:客户端socket超时时间,默认20秒;
hive.metastore.ds.retry.attempts:当出现连接错误时重试连接的次数,默认是1次;
hive.metastore.ds.retry.interval:metastore重试连接的间隔时间,默认1000毫秒
datanucleus.connectionPoolingType:使用连接池来访问JDBC metastore,默认是DBCP
hive.metastore.execute.setugi:非安全模式,设置为true会令metastore以客户端的用户和组权限执行DFS操作,默认是false,这个属性需要服务端和客户端同时设置;
hive.metastore.client.connect.retry.delay:客户端在连续的重试连接等待的时间,默认1;
hive.metastore.connect.retries:创建metastore连接时的重试次数,默认是5;
可能风险:
服务端有可能出现宕机,原因当获取hive表相关partition太多,返回partition信息太大,会导致JVM heap space ,这里的partition为5W。实际生产中按天做partition,partition不会太多
压力测试:详见(org.apache.hadoop.hive.metastore.test.MetaStoreTest)
前置:
 
1个Metastore service(116)
 
测试功能:多线程访问service,每个线程里都新建client,调用getDatabase(mydb)
返回Database对象,判断对象getName().equals("mydb"),如果是,调用同步方法,
让外界变量+1,最后得出结果:所有线程执行总时间+外界变量数值(成功返回个数)
 
结果:
单台机器116(本地)
service5000线程 time:176368毫秒 OK:5000
单台机器 84(远程)
13:57:09-13:57:52 time:280382 OK:5000
 
 
双台机器86\84访问 service 10000*2 共4万getDatabase请求
Client 84:
12:51:07-12:52:09 time:277613 OK:10000
12:47:53-12:49:32 time:114669 OK:10000
Client 86:
12:50:36-12:51:36 time:251133 OK:10000
12:47:57-12:49:06 time:83487  OK:10000
 
经日志验证 116 服务端 84:20000 86:20000 全部正常
 
 
双台机器86\136\137访问 service 10000*2 共6万getDatabase请求
Client 86:
15:08:43-15:09:59 time: 90461    OK:10000
15:09:04-15:10:12 time: 88821    OK:10000
Client 136:
15:09:01-15:10:27 time: 117757    OK:10000
15:09:15-15:10:46 time: 120695    OK:10000
Client 137:
15:14:26-15:15:46 time: 435969    OK:10000
15:16:33-15:17:48 time: 544234    OK:10000
 
经日志验证 116 服务端 86:20000 136:20000 137:20000 全部正常
 
 
测试功能:多线程访问service,每个线程里都新建client,调用listPartitions (“gdm”,” gdm_online_log”),每个线程listPartitions会生成10 条mysql select 语句
返回List对象,判断对象个数(60)>0,如果是,调用同步方法,
让外界变量+1,最后得出结果:所有线程执行总时间+外界变量数值(成功返回个数)
 
一台机器136访问 service 1500 listPartitions请求
Client 136:
14:09:24-14:15:44 time :379958    OK:1500
三台机器86\136\137访问 service 1000*2 共6000 listPartitions请求
Client 86:当时该机器正在执行MapReduce,所以速度慢
15:30:38-15:50:58 time: 1221170    OK:1000
15:18:20-15:51:55 time: 1399362    OK:1000
Client 136:
15:39:44-15:47:48 time: 483550     OK:1000
15:35:30-15:48:02 time: 741888     OK:1000
Client 137:
15:39:36-15:47:48 time: 494850     OK:1000
15-32:30-15:48:04 time: 934510     OK:1000 

分享:

发布了62 篇原创文章 · 获赞 235 · 访问量 169万+

猜你喜欢

转载自blog.csdn.net/javastart/article/details/99853373