[目录]
- 第一章:概述
- 第二章:整体数据分层
- 第三章:整体实现框架
- 第四章:元数据
- 第五章:ETL
- 第六章:数据校验
- 第七章:数据标准化
- 第八章:去重
- 第九章:增量/全量
- 第十章:拉链处理
- 第十一章:分布式处理增量
- 第十二章:列式存储
- 第十三章:逻辑数据模型(数仓模型)
- 第十四章:数据模型参考
- 第十五章:维模型
- 第十六章:渐变维
- 第十七章:数据回滚
- 第十八章:关于报表
- 第十九章:数据挖掘
数据仓库实践杂谈(十一)——分布式处理增量
面向大量数据的时候,想极大提高处理效率,最简单的办法就是增加处理的服务器。当然,从根本上来说,优化算法得到的提升可能是指数级的,通过横向扩展计算节点的提升只能是倍数级的,而且远达不到N个节点处理时间是1/N的效果。但目前的分布式计算框架都支持廉价的服务器和存储,因此很可以通过很低的成本获得极大的性能提升。
这里不深入分析什么样的算法是最优化的,我们只谈最合适分布式的设计。
从这个场景来看,这里需要的分布式计算其实是一种对等分布式,具体如下:
- 每个计算节点所进行的程序或者算法是一样的,但处理的数据不一样。
- 数据分布在不同的计算节点,每个节点独立完成一部分数据的处理,此时不需要其他节点的数据,也不影响其他节点的处理。
- 每个节点的计算的结果需要归并成最终的结果。
- 可能需要进行多轮计算。
考虑如何计算之前,我们先讨论一个叫数据分区的概念。
按照前面提到的,全量历史数据存储的时候,按照日期做分区,因此本质上,可以看做一天的数据保存在一个文件里面。
这首先带来一个问题是:用哪个日期来分区?
这个分区的原则,一般来说要尽量保证每个分区数据基本一样多,而且涉及到更新/查询操作,能尽量在本分区完成。举个例子,从全量表里面,客户想知道自己的订单情况,一般客户知道自己下单的日期。通过日期查询订单,找到多条同样订单号的记录(状态变化过程)。也就是,在同一个分区文件里面,看到该订单的整个过程。
从上面提到的订单表结构分析,有两个时间字段可选,一个是下单时间,一个是开始时间。这两个字段前者是业务数据中有,后者是全量表(拉链表)中创建。用这两个字段做分区,有什么区别呢:
- 下单时间:业务原始发生时间,以此为分区的话,同一个订单后续状态变更而产生的新增拉链数据,也都会放在此分区中;因此这个分区保存的数据是当天下单的全部数据加上这批数据的变更拉链数据。
- 开始时间:数据进入仓库的时间,以此为分区的话,每天进入仓库的数据在一个分区,数据包括当天新增订单,以及之前的订单中状态当天发生变化的。
前者同一个订单的所有记录必定在一个分区,但此分区有可能不稳定,会持续更新;后者同一个订单数据会分散在多个分区,但每个分区建立之后不会被修改。
再来考虑全量数据的存储和分布式处理的模式,一般来说有两种:
基于传统关系数据库的存储和处理模式
传统关系数据库往往采用集中存储的方式,数据存放在集中的高速存储设备上。存储设备是专用的,昂贵的。数据库服务器则一般采用小型机,两台主备组成双机热备的高可用性(HA)集群。小型机也是昂贵的。因此一般来说,如果数据量不算大,而且有钱的话,直接在数据库服务器用PLSQL存储过程搞定,是不错的选择。以前银行客户都这么干。随着业务发展,数据越来越多,直接在数据库上处理不过来了——毕竟更换更高级的小型机成本和风险都很大——所以,在数据库之上增加一个PC Server的集群来处理数据。数据库起到一个查询和存储数据的作用,其他处理放到PC Server集群中,组成一个混合的、存储和计算分离的结构。
相当于是计算分布,存储集中的模式。
基于Hadoop分布式平台的存储和处理模式
Hadoop的出现和流行,对于数据处理来说,确实是福音。到现在,即使相对保守的银行客户,也基本都接受采用Hadoop体系了。此模式的特点最典型的有三个:
- 专为大规模数据存储和查询设计,不支持更新;
- 采用廉价PC Server和本地硬盘作为处理和存储介质;
- 存储和计算都分布,但尽量保证本地计算(在哪个节点存储的数据就在哪个节点计算这部分数据)。
这天生就是为了分布式而设计了。
无论哪种模式,全量数据表的分区都是很重要,很有必要的。对于前者而言,一个分区相当于一个独立的存储文件,查找分区内的数据直接在当前分区文件检索即可,不需要所有文件的检索一遍。对于后者,按系统设置,一般64MB是一个文件,一张表的数据都在一个目录内;如果分区了,一个分区就是一个目录。在一个分区内查找数据,不需要找其他目录的文件。
有此,通过分布式来识别变化数据的机制出来了:
- 把当天需要处理的输入文件(订单快照)按某种规则拆分成多份,这里最直观的就是按照按照下单日期;
- 把每份文件分发到不同的处理节点中;
- 根据拿到的数据的特诊(下单日期)把全量表中对应日期的分区数据拿出来;
- 相同下单日期的数据,比较是否有变化。
在分区的问题中,很明显,为了处理高效,最合适的分区字段是“下单时间”。每个处理节点处理同一天下单的记录,互不干扰。
至于同一天的订单数据对比变化的过程,最快的方式就是把两边的数据都装如内存,然后直接比较了。如果一天的订单数据量还是特别大,那就根据业务情况,再增加分区,比如产品类型,让一个分区的数据能装到内存里面去处理。把历史表带9999的装入A,把快照表装入B,遍历一遍B,逐条在A中对比是否新增和有变化。找到的数据拿出来,最后A中还剩下的,则是被删除的。
当然,换个场景,比如用户表。用户表的变化不会有像订单表一样明显的日期特性。如果用注册日期作为分区,极有可能造成分区大小差异巨大。所以可能的做法是通过用户ID除固定值(比如一百万)取整(如果用户ID是顺序增长的数字);或者通过用户ID、用户名等计算Hash然后求模等方式,分成若干份做分区。为了便于扩展,可以采用一致性Hash方式,把分区的编号也计算Hash并映射到环上。
Hadoop的MapReduce处理框架,则更是直接基于集群处理而设计的。主要逻辑是"Map(映射)“和"Reduce(归约)”。简单理解就是把数据拆分成多块分别处理,然后结果合并在一起在处理。在多个服务器节点上经过多次Map和Reduce的循环,实现海量数据的处理。
具体的实现方式,主要取决于对全量表存储的机制:
- 保存在HIVE:把当日快照表也加载入HIVE,然后通过HiveSQL直接在Hive环境中执行,交给Hive来转换为MapReduce。
- 保存在HBase:当日快照表存放在HDFS即可,通过自己编写的
MapReduce程序拆分文件并且从HBase中读出对应分区的数据进行处理。
当然不同的工具有不同的存储、分区特性,需要有针对性的设计。如HBase的分区,则是通过设计Row Key的规则来实现Region的划分。在订单例子中,可以把日期当做Row Key的前缀。
未完待续。