[置顶] Hadoop与MongoDB整合(Hive篇)



公司希望使用MongoDB作为后端业务数据库,使用Hadoop平台作为数据平台。最开始是先把数据从MongoDB导出来,然后传到HDFS,然后用Hive/MR处理。我感觉这也太麻烦了,现在不可能没有人想到这个问题,于是就搜了一下,结果真找到一个MongoDB Connector for Hadoop



3.Hadoop HA集群搭建与Hive安装

Hadoop HA高可用集群搭建(2.7.2) 



Obtain the MongoDB Hadoop Connector. You can either build it or download the jars. For Hive, you'll need the "core" jar and the "hive" jar.
Get a JAR for the MongoDB Java Driver. The connector requires at least version 3.0.0 of the driver "uber" jar (called "mongo-java-driver.jar").
In your Hive script, use ADD JAR commands to include these JARs (core, hive, and the Java driver), e.g., ADD JAR /path-to/mongo-hadoop-hive-<version>.jar;.

Supported Hadoop and Hive versions

As of August 2013, only Hive versions <= 0.10 are stable. Mongo-Hadoop currently supports Hive versions >= 0.9. Some classes and functions are deprecated in Hive 0.11, but they’re still functional.

Hadoop versions greater than 0.20.x are supported. CDH4 is supported, but CDH3 with its native Hive 0.7 is not. However, CDH3 is compatible with newer versions of Hive. Installing a non-native version with CDH3 can be used with Mongo-Hadoop. 
2.将jar包拷到 HADOOPHOME/lib{HIVE_HOME}/lib下,然后启动Hive,加入jar包

[hadoop@DEV21 ~]$ hive

Logging initialized using configuration in jar:file:/home/hadoop/opt/apache-hive-1.2.1-bin/lib/hive-common-1.2.1.jar!/hive-log4j.properties
hive> add jar /home/hadoop/opt/hive/lib/mongo-hadoop-core-1.5.1.jar;#三个都加,我这就不写了。
3.Hive Usage有两种连接方式:

其一MongoDB-based 直接连接hidden节点,使用 com.mongodb.hadoop.hive.MongoStorageHandler做数据Serde
其二BSON-based 将数据dump成bson文件,上传到HDFS系统,使用 com.mongodb.hadoop.hive.BSONSerDe


hive> CREATE TABLE eventlog
    > ( 
    >   id string,
    >   userid string,
    >   type string,
    >   objid string,
    >   time string,
    >   source string
    > )
    > STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler' 
    > WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id"}') 
    > TBLPROPERTIES('mongo.uri'='mongodb://username:password@ip:port/xxx.xxxxxx');
hive> select * from eventlog limit 10;
5757c2783d6b243330ec6b25    NULL    shb NULL    2016-06-08 15:00:07 NULL
5757c27a3d6b243330ec6b26    NULL    shb NULL    2016-06-08 15:00:10 NULL
5757c27e3d6b243330ec6b27    NULL    shb NULL    2016-06-08 15:00:14 NULL
5757c2813d6b243330ec6b28    NULL    shb NULL    2016-06-08 15:00:17 NULL
5757ee443d6b242900aead78    NULL    shb NULL    2016-06-08 18:06:59 NULL
5757ee543d6b242900aead79    NULL    smb NULL    2016-06-08 18:07:16 NULL
5757ee553d6b242900aead7a    NULL    cmcs    NULL    2016-06-08 18:07:17 NULL
5757ee593d6b242900aead7b    NULL    vspd    NULL    2016-06-08 18:07:21 NULL
575b73b2de64cc26942c965c    NULL    shb NULL    2016-06-11 10:13:06 NULL
575b73b5de64cc26942c965d    NULL    shb NULL    2016-06-11 10:13:09 NULL
Time taken: 0.101 seconds, Fetched: 10 row(s)
hive> create table qsstest as select * from eventlog limit 10; 

BSON-based需要先将数据dump出来,但这个时候的dump与export不一样,不需要关心具体的数据内容,不需要指定fields list.

mongodump --host=datatask01:29017 --db=test --collection=ldc_test --out=/tmp
hdfs dfs -mkdir /dev_test/dli/bson_demo/
hdfs dfs -put /tmp/test/ldc_test.bson /dev_test/dli/bson_demo/
- 创建映射表
create external table temp.ldc_test_bson
  id string,
  fav_id array<int>,
  info struct<github:string, location:string>
ROW FORMAT SERDE "com.mongodb.hadoop.hive.BSONSerDe"
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"id","fav_id":"fav_id","info.github":"info.github","info.location":"info.location"}')
STORED AS INPUTFORMAT "com.mongodb.hadoop.mapred.BSONFileInputFormat"
OUTPUTFORMAT "com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat"
location '/dev_test/dli/bson_demo/';
0: jdbc:hive2://hd-cosmos-01:10000/default> select * from temp.ldc_test_mongo;
| ldc_test_mongo.id  | ldc_test_mongo.fav_id  |                       ldc_test_mongo.info                       |
| @Tony_老七           | [3,33,333,3333,33333]  | {"github":"https://github.com/tonylee0329","location":"SH/CN"}  |
1 row selected (0.345 seconds)
 public Object deserializeField(final Object value, final TypeInfo valueTypeInfo, final String ext) {
        if (value != null) {
            switch (valueTypeInfo.getCategory()) {
                case LIST:
                    return deserializeList(value, (ListTypeInfo) valueTypeInfo, ext);
                case MAP:
                    return deserializeMap(value, (MapTypeInfo) valueTypeInfo, ext);
                case PRIMITIVE:
                    return deserializePrimitive(value, (PrimitiveTypeInfo) valueTypeInfo);
                case STRUCT:
                    // Supports both struct and map, but should use struct 
                    return deserializeStruct(value, (StructTypeInfo) valueTypeInfo, ext);
                case UNION:
                    // Mongo also has no union
                    LOG.warn("BSONSerDe does not support unions.");
                    return null;
                    // Must be an unknown (a Mongo specific type)
                    return deserializeMongoType(value);
        return null;

// 转为java的原子类型存储.
 private Object deserializePrimitive(final Object value, final PrimitiveTypeInfo valueTypeInfo) {
        switch (valueTypeInfo.getPrimitiveCategory()) {
            case BINARY:
                return value;
            case BOOLEAN:
                return value;
            case DOUBLE:
                return ((Number) value).doubleValue();
            case FLOAT:
                return ((Number) value).floatValue();
            case INT:
                return ((Number) value).intValue();
            case LONG:
                return ((Number) value).longValue();
            case SHORT:
                return ((Number) value).shortValue();
            case STRING:
                return value.toString();
            case TIMESTAMP:
                if (value instanceof Date) {
                    return new Timestamp(((Date) value).getTime());
                } else if (value instanceof BSONTimestamp) {
                    return new Timestamp(((BSONTimestamp) value).getTime() * 1000L);
                } else if (value instanceof String) {
                    return Timestamp.valueOf((String) value);
                } else {
                    return value;
                return deserializeMongoType(value);

