一、简介
- 增量表: 有日期分区,存放增量数据,即新增量和变化量。
- 全量表: 无日期分区(每天覆盖更新),存放截止至当前,数据的最新的状态,所以无法记录数据的历史变化
- 快照表: 有日期分区,每天的数据都是全量的(无论有无变化),缺点是每个分区存储了许多重复的数据,浪费存储空间
- 拉链表: 拉链表是用来维护历史状态,以及最新状态数据的一种表,拉链表根据拉链粒度的不同,实际上相当于快照,只不过做了优化,去除了一部分不变的记录
二、应用场景
拉链表适用于那些大数据量,并且在字段变化的比例和频率不大的情况下需要查看历史快照信息的场景。
比如说有一张客户表,大约有几千万条记录,几百个字段。那么对于这种表,即使采用ORC压缩,单张表每天的数据存储空间也会超过50GB,在HDFS中使用三备份情况下,存储空间的占用会更大。
那么对于这种表我该如何设计呢?下面有几种方案可选:
- 方案一(全量表):每天抽取最新数据覆盖前一天的数据,优点是实现简单,节省空间,但是缺点同样明显,没有历史状态
- 方案二(快照表):做成每天全量的话,我们就可以查看历史数据了,但是缺点是存储空间占用太大了,特别是在客户信息不会频繁变化的情况下,字段的重复存储率太高了
- 方案三(拉链表):采用拉链表设计的话,不仅能查看历史状态,而且存储空间的占用也是极低的(毕竟对于没有变化的数据不重复存储)
三、Hive SQL实践
首先创建一张测试用的客户信息原表
CREATE TABLE IF NOT EXISTS datadev.zipper_table_test_cust_src (
`cust_id` STRING COMMENT '客户编号',
`phone` STRING COMMENT '手机号码'
)PARTITIONED BY (
dt STRING COMMENT 'etldate'
)STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY")
;
然后insert一些测试数据
cust_id | phone | dt |
001 | 1111 | 20210601 |
002 | 2222 | 20210601 |
003 | 3333 | 20210601 |
004 | 4444 | 20210601 |
001 | 1111 | 20210602 |
002 | 2222-1 | 20210602 |
003 | 3333 | 20210602 |
004 | 4444-1 | 20210602 |
005 | 5555 | 20210602 |
001 | 1111-1 | 20210603 |
002 | 2222-2 | 20210603 |
003 | 3333 | 20210603 |
004 | 4444-1 | 20210603 |
005 | 5555-1 | 20210603 |
006 | 6666 | 20210603 |
002 | 2222-3 | 20210604 |
003 | 3333 | 20210604 |
004 | 4444-1 | 20210604 |
005 | 5555-1 | 20210604 |
006 | 6666 | 20210604 |
007 | 7777 | 20210604 |
数据简单说明如下:
- 20210601为起始日期,总共有4个客户
- 20210602更新了002和004客户的信息,并新增了005客户
- 20210603更新了001、002、005客户的信息,并新增了006客户
- 20210604更新了002客户的信息,并新增了007客户,删除了001客户
现在回到正题,拉链表如何设计?
首先,拉链表有两个重要的审计字段:数据生效日期和数据失效日期。顾名思义,数据生效日期记录了这条记录是何时生效的,而数据失效日期则是记录了该条记录的失效时间(9999-12-31表示截至当前一直有效)。那么对数据的操作总共可分为以下几类:
- 新增的记录:数据生效日期为当天, 失效日期为9999-12-31
- 没有变化的记录:数据生效日期需要使用之前的, 失效日期不变
- 有变化的记录:==》对于旧记录:保留,并更改失效日期为当天; ==》对于新记录:新增,生效日期为当天,失效日期为9999-12-31
- 删除的记录:需要闭环,失效日期变为当天
因此拉链表的HQL实现代码如下:
-- 拉链表建表语句
CREATE TABLE IF NOT EXISTS datadev.zipper_table_test_cust_dst (
`cust_id` STRING COMMENT '客户编号',
`phone` STRING COMMENT '手机号码',
`s_date` DATE COMMENT '生效时间',
`e_date` DATE COMMENT '失效时间'
)STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY")
;
-- 拉链表实现代码(含数据回滚刷新)
INSERT OVERWRITE TABLE datadev.zipper_table_test_cust_dst
-- part1: 处理新增的、没有变化的记录,以及有变化的记录中的新记录
select NVL(curr.cust_id, prev.cust_id) as cust_id,
NVL(curr.phone, prev.phone) as phone,
-- 没有变化的记录: s_date需要使用之前的
case when NVL(curr.phone, '') = NVL(prev.phone, '') then prev.s_date
else NVL(curr.s_date, prev.s_date)
end as s_date,
NVL(curr.e_date, prev.e_date) as e_date
from (
select cust_id, phone, DATE(from_unixtime(unix_timestamp(dt, 'yyyyMMdd'), 'yyyy-MM-dd')) as s_date, DATE('9999-12-31') as e_date
from datadev.zipper_table_test_cust_src
where dt = '${etldate}'
) as curr
left join (
select cust_id, phone, s_date, if(e_date > from_unixtime(unix_timestamp('${etldate}', 'yyyyMMdd'), 'yyyy-MM-dd'), DATE('9999-12-31'), e_date) as e_date,
row_number() over(partition by cust_id order by e_date desc) as r_num -- 取最新状态
from datadev.zipper_table_test_cust_dst
where regexp_replace(s_date, '-', '') <= '${etldate}' -- 拉链表历史数据回滚
) as prev
on curr.cust_id = prev.cust_id
and prev.r_num = 1
union all
-- part2: 处理删除的记录,以及有变化的记录中的旧记录
select prev_cust.cust_id, prev_cust.phone, prev_cust.s_date,
case when e_date <> '9999-12-31' then e_date
else DATE(from_unixtime(unix_timestamp('${etldate}', 'yyyyMMdd'), 'yyyy-MM-dd'))
END as e_date
from (
select cust_id, phone, s_date, if(e_date > from_unixtime(unix_timestamp('${etldate}', 'yyyyMMdd'), 'yyyy-MM-dd'), DATE('9999-12-31'), e_date) as e_date
from datadev.zipper_table_test_cust_dst
where regexp_replace(s_date, '-', '') <= '${etldate}' -- 拉链表历史数据回滚
) as prev_cust
left join (
select cust_id, phone
from datadev.zipper_table_test_cust_src
where dt = '${etldate}'
) as curr_cust
on curr_cust.cust_id = prev_cust.cust_id
-- 只要变化量
where NVL(prev_cust.phone, '') <> NVL(curr_cust.phone, '')
;
四、测试
4.1 第一天(20210601):将${etldate}替换成20210601,并执行SQL。此时为初始状态,客户信息没有变化量,因此生效日期为2021-06-01,生效日期为9999-12-31(代表当前有效)
zipper_table_test_cust_dst.cust_id | zipper_table_test_cust_dst.phone | zipper_table_test_cust_dst.s_date | zipper_table_test_cust_dst.e_date |
001 | 1111 | 2021-06-01 | 9999-12-31 |
002 | 2222 | 2021-06-01 | 9999-12-31 |
003 | 3333 | 2021-06-01 | 9999-12-31 |
004 | 4444 | 2021-06-01 | 9999-12-31 |
4.2 第二天(20210602):将${etldate}替换成20210602,并执行SQL。此时原表修改了002和004的手机号码,因此有将会有两条记录,一条记录记录了数据的历史状态,另外一条记录了数据的当前状态。然后原表还新增了005客户,因此此时的数据生效日期为2021-06-02,失效日期为9999-12-31
zipper_table_test_cust_dst.cust_id | zipper_table_test_cust_dst.phone | zipper_table_test_cust_dst.s_date | zipper_table_test_cust_dst.e_date |
001 | 1111 | 2021-06-01 | 9999-12-31 |
002 | 2222 | 2021-06-01 | 2021-06-02 |
002 | 2222-1 | 2021-06-02 | 9999-12-31 |
003 | 3333 | 2021-06-01 | 9999-12-31 |
004 | 4444 | 2021-06-01 | 2021-06-02 |
004 | 4444-1 | 2021-06-02 | 9999-12-31 |
005 | 5555 | 2021-06-02 | 9999-12-31 |
4.3 第三天(20210603):将${etldate}替换成20210602,并执行SQL。此时原表修改了001、002、005,并新增006.
zipper_table_test_cust_dst.cust_id | zipper_table_test_cust_dst.phone | zipper_table_test_cust_dst.s_date | zipper_table_test_cust_dst.e_date |
001 | 1111 | 2021-06-01 | 2021-06-03 |
001 | 1111-1 | 2021-06-03 | 9999-12-31 |
002 | 2222 | 2021-06-01 | 2021-06-02 |
002 | 2222-1 | 2021-06-02 | 2021-06-03 |
002 | 2222-2 | 2021-06-03 | 9999-12-31 |
003 | 3333 | 2021-06-01 | 9999-12-31 |
004 | 4444 | 2021-06-01 | 2021-06-02 |
004 | 4444-1 | 2021-06-02 | 9999-12-31 |
005 | 5555 | 2021-06-02 | 2021-06-03 |
005 | 5555-1 | 2021-06-03 | 9999-12-31 |
006 | 6666 | 2021-06-03 | 9999-12-31 |
4.4 第四天(20210604):将${etldate}替换成20210602,并执行SQL。此时原表更新了002,新增007,并删除了001. 需要注意的是,删除操作时,该条数据失效日期应该改为当天。
zipper_table_test_cust_dst.cust_id | zipper_table_test_cust_dst.phone | zipper_table_test_cust_dst.s_date | zipper_table_test_cust_dst.e_date |
001 | 1111 | 2021-06-01 | 2021-06-03 |
001 | 1111-1 | 2021-06-03 | 2021-06-04 |
002 | 2222 | 2021-06-01 | 2021-06-02 |
002 | 2222-1 | 2021-06-02 | 2021-06-03 |
002 | 2222-2 | 2021-06-03 | 2021-06-04 |
002 | 2222-3 | 2021-06-04 | 9999-12-31 |
003 | 3333 | 2021-06-01 | 9999-12-31 |
004 | 4444 | 2021-06-01 | 2021-06-02 |
004 | 4444-1 | 2021-06-02 | 9999-12-31 |
005 | 5555 | 2021-06-02 | 2021-06-03 |
005 | 5555-1 | 2021-06-03 | 9999-12-31 |
006 | 6666 | 2021-06-03 | 9999-12-31 |
007 | 7777 | 2021-06-04 | 9999-12-31 |
五、拉链表的数据回滚刷新
可通过以下代码查看拉链表的最新状态
select * from datadev.zipper_table_test_cust_dst where e_date = '9999-12-31';
通过以下代码查看拉链表的历史状态/快照
-- 查看拉链表的20210602的快照
select cust_id, phone, s_date, if(e_date > '2021-06-02', DATE('9999-12-31'), e_date) as e_date
from datadev.zipper_table_test_cust_dst
where s_date <= '2021-06-02';
因此,对于拉链表的数据回滚刷新,我们只要根据上诉代码找到那一天的历史快照,然后进行重刷即可。(注:我上面贴的拉链表insert语句,已经包含了数据回滚刷新功能,读者可自行进行测试——将${etldate}替换成要回滚的日期,然后INSERT OVERWRITE TABLE那行可以注释掉,单跑select查看结果即可)
六、另一种实现
上一种实现方式有一个缺点,随着拉链表数据量的增多,每次执行的时间也会随之增多。因此,需要改进:可采用hive结合ES的方式。
-- 拉链表(hive只存储新增/更新量,全量存储于ES)实现代码
-- 临时表,只存放T-1天的新增以及变化的记录
CREATE TABLE IF NOT EXISTS datadev.zipper_table_test_cust_dst_2 (
`id` STRING COMMENT 'es id',
`cust_id` STRING COMMENT '客户编号',
`phone` STRING COMMENT '手机号码',
`s_date` DATE COMMENT '生效时间',
`e_date` DATE COMMENT '失效时间'
)STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY")
;
drop table datadev.zipper_table_test_cust_dst_2;
select * from datadev.zipper_table_test_cust_dst_2 a;
INSERT OVERWRITE TABLE datadev.zipper_table_test_cust_dst_2
select concat_ws('-', curr.s_date, curr.cust_id) as id,
curr.cust_id as cust_id,
curr.phone as phone,
DATE(curr.s_date) as s_date,
DATE('9999-12-31') as e_date
from (
select cust_id, phone, from_unixtime(unix_timestamp(dt, 'yyyyMMdd'), 'yyyy-MM-dd') as s_date
from datadev.zipper_table_test_cust_src
where dt = '20210603' -- etldate
) as curr
left join (
select *
from datadev.zipper_table_test_cust_src
where dt = '20210602' -- prev_date
) as prev
on prev.cust_id = curr.cust_id
where NVL(curr.phone, '') <> NVL(prev.phone, '')
union all
select concat_ws('-', STRING(prev.s_date), prev.cust_id) as id,
prev.cust_id as cust_id,
prev.phone as phone,
prev.s_date as s_date,
case when NVL(prev.phone, '') = NVL(curr.phone, '') then prev.e_date
else DATE(from_unixtime(unix_timestamp(dt, 'yyyyMMdd'), 'yyyy-MM-dd'))
end as e_date
from (
select cust_id, phone, s_date, e_date,
-- 只更新最新的一条
row_number() over(partition by cust_id order by s_date desc) as r_num
from datadev.zipper_table_test_cust_dst_2
) as prev
inner join (
select *
from datadev.zipper_table_test_cust_src
where dt = '20210603' -- etldate
) as curr
on prev.cust_id = curr.cust_id
where prev.r_num = 1
;
-- mock: load delta data to es
CREATE TABLE IF NOT EXISTS datadev.es_zipper (
`id` STRING COMMENT 'es id',
`cust_id` STRING COMMENT '客户编号',
`phone` STRING COMMENT '手机号码',
`s_date` DATE COMMENT '生效时间',
`e_date` DATE COMMENT '失效时间'
)STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY")
;
drop table datadev.es_zipper;
select * from datadev.es_zipper;
INSERT OVERWRITE TABLE datadev.es_zipper
SELECT nvl(curr.id, prev.id) as id,
nvl(curr.cust_id, prev.cust_id) as cust_id,
nvl(curr.phone, prev.phone) as phone,
nvl(curr.s_date, prev.s_date) as s_date,
nvl(curr.e_date, prev.e_date) as e_date
FROM datadev.es_zipper prev
full join datadev.zipper_table_test_cust_dst_2 curr
on curr.id = prev.id;