Titan是目前应用比较广泛的图数据库,相信大家在使用时都遇到过数据验证的问题,就是将数据导入Titan后,如何验证导入数据的正确性。把每一个值都检验一下显然是不现实的,一种简单的校对方法就是查看图中的点数和边数是否正确。在Titan的gremlin console中可以统计图的点数和边数,但由于其是单进程的,统计大图时就显得力不从心。这里我们要介绍一种基于MapReduce的统计方法,充分利用集群资源,将统计点数边数这件事情做到极致。
本文讨论的是Titan-0.5.4版本,部署在HBase上,即存储后端使用HBase. 其实在Titan-0.4.4及更前的版本中有个Faunus项目,通过MapReduce做图上的OLAP,当然也就包括统计点数和边数这种小功能。此后Faunus被并入Titan-Hadoop模块,但并不好用。关于Titan-1.0中,由于其目前还不支持Hadoop-2.x,可能生产环境用的还是Titan-0.5.4为多吧。
简单的边数统计
先回顾一下gremlin中的点数和边数统计:
gremlin> g = TitanFactory.open('myTitan.properties')
gremlin> g.V.count()
gremlin> g.E.count()
我简单测试了下,点数统计的速度是每秒2.5万个,边数统计的速度是每秒2500个,是前者的1/10。
具体怎么统计的,可以参考下面的指令,每统计50万个点就打印一下时间。
gremlin> cnt = 0; it = g.V.iterator(); while (it.hasNext()) { cnt++; it.next(); if (cnt % 500000 == 0) println(new Date().toString() + ' ' + cnt); }; println cnt
Sat Nov 12 10:31:03 CST 2016 500000
Sat Nov 12 10:31:23 CST 2016 1000000
Sat Nov 12 10:31:45 CST 2016 1500000
……
Sat Nov 12 10:54:03 CST 2016 28500000
Sat Nov 12 10:54:27 CST 2016 29000000
Sat Nov 12 10:54:46 CST 2016 29500000
29993474
==>null
边数统计的话类似:
gremlin> vCnt = 0; eCnt = 0; it = g.V.iterator(); while (it.hasNext()) { vCnt++; eCnt += it.next().getEdgeCount(); if (vCnt % 50000 == 0) println(new Date().toString() + ' ' + vCnt + ' ' + eCnt); }; println(new Date().toString() + ' ' + vCnt + ' ' + eCnt)
Sat Nov 12 12:57:26 CST 2016 50000 9089
Sat Nov 12 12:57:45 CST 2016 100000 9089
……
Sat Nov 12 16:28:12 CST 2016 29900000 466194
Sat Nov 12 16:28:34 CST 2016 29950000 466194
Sat Nov 12 16:28:54 CST 2016 29993474 466194
==>null
(可能有人问为什么要把代码写成一行,因为这样好复制粘贴呀 :P)
利用MapReduce做边数统计
上面的程序之所以慢,主要有两方面原因:
- 程序是单进程的,无法充分利用集群资源;
- 程序从HBase RegionServer读入数据,而且把整个表的数据都扫了一遍。RegionServer的Block Cache基本没起作用,数据要从HDFS读入RegionServer(这里可能有网络开销),再返回给我们的Titan客户端(即TitanGraph对象),这里又有网络开销。而且这种全量的扫描必然引发RegionServer的各种GC.
第一点启发我们要用分布式计算来做统计,可以用MapReduce;第二点启发我们在做HBase的全表扫描时,要绕过RegionServer,可以对该表打一个Snapshot,然后在Snapshot上跑MapReduce,即直接从HFile读入数据。HBase已经提供了TableSnapshotInputFormat来解析Snapshot数据。
我们知道Titan的图是以邻接表的形式存储在HBase中的(详见Titan Data Layout),一个顶点在HBase中占据一行,该点的所有边和属性都存在这一行中。HBase已经提供了统计行数的MapReduce程序(RowCounter,在hbase-server-${VERSION}.jar中),那么是不是把HBase数据表中的行数统计出来,就是Titan图中的点数呢?并不是的。我们说的Titan图中的点数,是指用户插入的点(User Created Vertex)的数目,除此之外还有很多Titan系统内部创建的点,这些点是用户不可见的。比如说一个VertexLabel实际是表示为一个点,拥有该Label的点会连一条单向边过来,这条边也是用户不可见的(系统内部点具体有哪几类,可以看这里)。另外还有一些行存储的是索引数据,Composite Index的数据都存储在HBase中,其中Row Key是索引的Value,该行的各列存储拥有该值元素的id。因此要统计真正的点数,我们需要把系统数据过滤掉。同样的,要统计真正的边数,我们也需要把系统边过滤掉。
说了这么多,可能你要说 “Talk is cheap, show me the code!” 了,接下来我就直接上代码吧 :P
我们的MapReduce只需要Mapper就够了,点数边数用MapReduce框架的Counter来记录。为了避免频繁更新Counter,我们在Mapper最后的cleanup再把本Mapper统计到的值加到Counter里去。下面是Mapper的map函数代码:
public void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
long vid = idManager.getKeyID(new StaticArrayBuffer(key.get()));
if (!IDManager.VertexIDType.NormalVertex.is(vid))
return;
vertexCount++;
// See com.thinkaurelius.titan.diskstorage.hbase.HBaseKeyColumnValueStore#getHelper
EntryList entryList = StaticArrayEntryList.ofBytes(
value.getMap().get(Bytes.toBytes("e")).entrySet(),
entryGetter);
for (Entry entry : entryList) {
RelationCache relation = graph.getEdgeSerializer().readRelation(entry, false, tx);
RelationType type = tx.getExistingRelationType(relation.typeId);
if (type.isEdgeLabel()
&& !inspector.isEdgeLabelId(relation.relationId)
&& !inspector.isSystemRelationTypeId(type.getLongId()))
edgeCount++;
}
}
map函数每次处理HBase表中的一行,函数分两部分,第一部分判断该行是否表示一个UserVertex;第二部分读出该行中的所有Entry(即HBase的key/value对),判断其是否是一条用户可见的边。注意这样会把图中的每条边都统计两次(因为Titan本身就是存两份的),最后我们要把edgeCount除以2. 上述代码有几个类的成员变量,具体可见完整代码。
最后再说一下Job的配置,前面说了我们要绕过HBase的RegionServer,因此在统计点数边数前要先对Titan数据表建一个Snapshot。Snapshot的名字将是我们的一个输入。Mapper的运行需要依赖Titan的jar包,可能不是所有机器都有。这里推荐使用MapReduce框架的Distributed Cache来处理,只需要事先把titan的lib目录上传到HDFS,把该路径作为本程序的另一个输入。最后本程序还需要提供一个titan配置文件作为输入,这样才能找到我们要处理的图在HBase中是哪张表。即程序有三个输入:
- snapshotName
- libDir
- titanConf
当然程序还有一个隐性的要求,即HBase的jar包都在classpath里,且classpath里有一个可用的hbase-site.xml,这样 TableSnapshotInputFormat 才能找到HBase的snapshot存放路径。
Job的配置代码如下:
Job job = Job.getInstance(getConf(), "Titan vertices & edges counter");
job.setJarByClass(SnapshotCounter.class);
job.setMapperClass(CounterMap.class);
job.setNumReduceTasks(0);
job.setSpeculativeExecution(false); // 关掉推测式执行
job.setInputFormatClass(TableSnapshotInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableSnapshotMapperJob(
snapshotName,
new Scan().addFamily(Bytes.toBytes("e")),
CounterMap.class,
NullWritable.class,
NullWritable.class,
job,
false, // addDependencyJars,不需要
new Path("/tmp/snapshot_counter" + new Random().nextInt()));
Util.setupClassPath(job, libDir);
// upload titanConf and add to distributed cache
File file = new File(titanConf);
if (!file.exists())
throw new FileNotFoundException(titanConf);
String baseName = file.getName();
FileSystem fs = FileSystem.get(conf);
Path src = new Path(file.toURI());
Path dst = new Path("/tmp/LoaderMR/" + baseName); // 把配置文件上传到这里
fs.copyFromLocalFile(src, dst);
job.addCacheFile(dst.toUri());
fs.deleteOnExit(dst); // DO Not close this fs!
// It will be closed When JVM exit.
// These tmp files will be deleted at that time.
job.getConfiguration().set(TITAN_CONF_KEY, baseName);
启动方式:
export HADOOP_USER_CLASSPATH_FIRST=true
export HADOOP_CLASSPATH=`pwd`:`hbase classpath`"$TITAN_HOME/lib/*"
hadoop jar titan-utils-1.0-SNAPSHOT.jar cn.edu.pku.hql.titan.mapreduce.SnapshotCounter snapshotName titanConf hdfsTitanLibDir
事先最好对原表做个major compation,这样再打snapshot数据量会更小一点。
简单实验了一下一张29993474个点,524348736条边的图,snapshot的大小是69.8 G。在2台机器上,花了15分钟统计完;在5台机器上,花了8分钟。算了一下每个task统计边数的速度,大概是每秒7万条边。原来的单线程边数每秒只能统计2500条边,这主要是因为我们的程序直接统计的是HBase的snapshot文件,绕开了RegionServer。
总结
本文提出了一种快速统计Titan图中点数和边数的方法,通过手工对原图在HBase的数据表打一个snapshot后,直接用MapReduce解析snapshot里的HFile文件,统计得出点数和边数。精华主要在上文中Map函数里的小片代码,这是参考Faunus写的。本方法一方面是分布式的,能充分利用集群资源;另一方面直接使用HBase Snapshot,绕开了RegionServer,使得读取速度又有一个级别的提升。