本篇文章,主要讲解
1.什么是拉链表 以及 拉链表示例
2.不同原始表情况下,拉链表如何构建
首先介绍下什么是拉链表
1.什么是拉链表 以及 拉链表示例
什么是拉链表
当维度数据发生变化时,将旧数据置为失效,将更改后的数据当作新的记录插入到维度表中,并开始生效,这样能够记录数据在某种粒度上的变化历史。
拉链表的示例
请参见我的另一篇文章
https://blog.csdn.net/u010003835/article/details/104420723
2.不同原始表情况下,拉链表如何构建
1.第一种情况,规范的表
什么是规范的表,规范的意思是原始表的含有 create_time , update_time
假设原始表到数仓的表,每天全量的采集方式
下面我们构建表,并且插入测试数据
use data_warehouse_test;
CREATE TABLE IF NOT EXISTS user_zipper_org (
user_id BIGINT COMMENT '用户id'
,user_name STRING COMMENT '用户姓名'
,create_time DATE COMMENT '创建时间'
,update_time DATE COMMENT '修改时间'
)
PARTITIONED BY(
pt STRING COMMENT '数据分区'
)
STORED AS ORC
;
ALTER TABLE user_zipper_org DROP IF EXISTS PARTITION (pt = '20200320');
ALTER TABLE user_zipper_org DROP IF EXISTS PARTITION (pt = '20200321');
INSERT INTO TABLE user_zipper_org PARTITION (pt = '20200320')
VALUES
(1, 'szh', '2020-01-01', '2020-01-01')
,(2, 'yuqin', '2020-01-01', '2020-01-01')
,(3, 'heping', '2020-01-01', '2020-01-01')
,(4, 'quxingma', '2020-01-01', '2020-01-01')
,(5, 'zhouzhou', '2020-01-01', '2020-01-01')
;
INSERT INTO TABLE user_zipper_org PARTITION (pt = '20200321')
VALUES
(1, 'szh2', '2020-01-01', '2020-03-21')
,(2, 'yuqin', '2020-01-01', '2020-01-01')
,(3, 'heping3', '2020-01-01', '2020-03-21')
,(4, 'quxingma', '2020-01-01', '2020-01-01')
,(5, 'zhouzhou', '2020-01-01', '2020-01-01')
,(6, 'newuser', '2020-03-21', '2020-03-21')
;
构建最终拉链表,这里面为了保证数据安全,我们最终的拉链表是一张分区表
首先,由于是从20200320 开始构建的拉链表,那么原始表所有的数据都相当于当前生效的数据,我们根据这些数据构建20200320的最终拉链表数据。
其次,我们需要通过 最终表 20200320 分区的数据去构建临时表。
上面两个步骤的SQL如下
use data_warehouse_test;
CREATE TABLE IF NOT EXISTS user_zipper_final
(
user_id BIGINT COMMENT '用户id'
,user_name STRING COMMENT '用户姓名'
,create_time DATE COMMENT '创建时间'
,update_time DATE COMMENT '修改时间'
,start_date DATE COMMENT '生效时间'
,end_date DATE COMMENT '失效时间'
)
PARTITIONED BY(
pt STRING COMMENT '数据分区'
);
INSERT OVERWRITE TABLE user_zipper_final PARTITION (pt = '20200320')
SELECT
org.user_id
,org.user_name
,org.create_time
,org.update_time
,'2020-03-20' AS start_date
,'9999-12-31' AS end_date
FROM user_zipper_org AS org
WHERE pt = '20200320'
;
DROP TABLE IF EXISTS tmp_user_zipper_mid;
CREATE TABLE tmp_user_zipper_mid AS
SELECT
org.user_id
,org.user_name
,org.create_time
,org.update_time
,org.start_date
,org.end_date
FROM user_zipper_final AS org
WHERE pt = '20200320'
;
关键的步骤,我们根据 20200321 发生变动的数据 (新增的 与 修改的)
新增的 create_time 为 2020-03-21
修改的 update_time 为 2020-03-21
和 20200320 的全量拉链表数据,去构建 20200321 的拉链表数据 (这里我们先存储为临时表,即中间表)
首先,新增与变动的数据我们可以通过这种方式获得
2020-03-21 发生变动的数据
SELECT
org.user_id
,org.user_name
,org.create_time
,org.update_time
,'2020-03-20' AS start_date
,'9999-12-31' AS end_date
FROM user_zipper_org AS org
WHERE pt = '20200321'
AND (
(
create_time = '2020-03-21'
)
OR
(
update_time = '2020-03-21'
)
)
;
20200321 的全量拉链表数据,可以通过以下方式构建
use data_warehouse_test;
DROP TABLE IF EXISTS tmp_user_zipper_mid;
CREATE TABLE tmp_user_zipper_mid AS
SELECT *
FROM
(
SELECT
final.user_id
,final.user_name
,final.create_time
,final.update_time
,final.start_date
,CAST (
(
CASE
WHEN
(
new_data.user_id IS NOT NULL
AND
final.end_date >= '2020-03-21'
)
THEN '2020-03-20'
ELSE final.end_date
END
)
AS DATE
)
AS end_date
FROM
user_zipper_final AS final
LEFT JOIN (
SELECT
org.user_id
,org.user_name
,org.create_time
,org.update_time
FROM user_zipper_org AS org
WHERE pt = '20200321'
AND (
(
create_time = '2020-03-21'
)
OR
(
update_time = '2020-03-21'
)
)
) AS new_data
ON new_data.user_id = final.user_id
WHERE final.pt = '20200320'
UNION ALL
SELECT
new_data.user_id
,new_data.user_name
,new_data.create_time
,new_data.update_time
,CAST( '2020-03-21' AS DATE ) AS start_date
,CAST ('9999-12-31' AS DATE ) AS end_date
FROM (
SELECT
org.user_id
,org.user_name
,org.create_time
,org.update_time
FROM user_zipper_org AS org
WHERE pt = '20200321'
AND (
(
create_time = '2020-03-21'
)
OR
(
update_time = '2020-03-21'
)
)
) AS new_data
) AS tmp
;
最后,我们把临时表的结果插入到新的拉链表分区。
然后,我们查验下数据 (
1.获取最新的拉链表分区数据
2.通过拉链表,获取2020-03-20的数据
3.通过拉链表,获取2020-03-21的数据
)
use data_warehouse_test;
INSERT OVERWRITE TABLE user_zipper_final PARTITION (pt = '20200321')
SELECT
*
FROM tmp_user_zipper_mid
;
SELECT *
FROM user_zipper_final
WHERE pt = '20200321'
;
SELECT *
FROM user_zipper_final
WHERE pt = '20200321'
AND start_date <= '2020-03-20'
AND end_date >= '2020-03-20'
;
SELECT *
FROM user_zipper_final
WHERE pt = '20200321'
AND start_date <= '2020-03-21'
AND end_date >= '2020-03-21'
;
2.第二种情况,非规范的原始表
什么是非规范的原始表,就是原始表中不存在 create_time 或者 update_time 。 或者更甚者 2者都不存在,这样的情况下,我们该如何去构建拉链表呢?
我们细致的去分析这个问题,如何解决?
首先,我们可以看到通过单一的create_time 或者 update_time ,我们无法分辨发生变动的数据。
由此,我们的思路是构建一列,去标识 发生变动的数据。
我们选择将所有列取 md5 值的方式!!!!
假设我们的原始表有以下几列,
user_id, user_name, create_time
则 MD5(CONCAT(user_id, user_name, create_time))
假设,我们已经有初始化的拉链表了,那么难点就在于如何获取发生变动的数据 (新增的 与 修改的)
获取发生变动的数据SQL 如下:
SELECT
ta.*
FROM
(
SELECT
user_id
,user_name
,create_time
,MD5(CONCAT(user_id,user_name,create_time)) AS user_flag
FROM user_zipper_org
WHERE pt = '20200321'
) AS ta
LEFT JOIN (
SELECT
user_id
,user_flag
FROM user_zipper_final
WHERE pt = '20200320'
AND start_date <= '2020-03-20'
AND end_date >= '2020-03-20'
) AS tb
ON ta.user_id = tb.user_id
AND ta.user_flag != tb.user_flag
;