hive对update和delete的支持不是很好,但是我们可以将这两种操作转化为insert操作,查询时取最新记录来满足hive表数据更新的需求。
思路:
为每一张表增加两个字段:
updated:标记此次变更时间, 每次变更都需要填写。
deleted: 标记此次记录是否为删除操作,只需删除的时候标记为1即可。
举例:
假设我们原来有以下数据(主键为id和id2):
id | name | age | id2 |
---|---|---|---|
111 | 张三 | 23 | 1 |
222 | 李四 | 44 | 1 |
333 | 王五 | 10 | 1 |
333 | 赵六 | 30 | 2 |
然后做了以下变更:
- 在2017/07/01 00:02:00
- 将张三年龄改为
24
- 将李四姓名改为
李思
,年龄改为45
- 将张三年龄改为
- 在2018/05/25 09:00:10
- 将张三年龄改为
25
删除
了李四的记录
- 将张三年龄改为
经过变更后我们的数据变成了这样:
id | name | age | id2 |
---|---|---|---|
111 | 张三 | 25 | 1 |
333 | 王五 | 10 | 1 |
333 | 赵六 | 30 | 2 |
hive表的最终数据是这样(包含了所有历史数据和每次变更记录):
id | name | age | id2 | updated | deleted |
---|---|---|---|---|---|
111 | 张三 | 23 | 1 | null | null |
111 | 张三 | 24 | 1 | 2017/07/01 00:02:00 | 0 |
111 | 张三 | 25 | 1 | 2018/05/25 09:00:10 | 0 |
222 | 李四 | 44 | 1 | null | null |
222 | 李思 | 45 | 1 | 2017/07/01 00:02:00 | 0 |
222 | 李思 | 45 | 1 | 2018/05/25 09:00:10 | 1 |
333 | 王五 | 10 | 1 | null | null |
333 | 赵六 | 30 | 2 | null | null |
如果想从hive获取最新数据,可以先按照主键进行分窗,对每组主键相同的记录取updated最新的且deleted不为1的记录即可。
代码(hive on spark):
val createsql = """create table adata
(id string, name string, age int, id2 string, updated string, deleted int)"""
spark.sql(createsql)
val insertsql = """
insert into adata values
('111', '张三', 23, '1', null, null),
('111', '张三', 24, '1', '2017/07/01 00:02:00', 0),
('111', '张三', 25, '1', '2018/05/25 09:00:10', 0),
('222', '李四', 44, '1', null, null),
('222', '李思', 45, '1', '2017/07/01 00:02:00', 0),
('222', '李思', 45, '1', '2018/05/25 09:00:10', 1),
('333', '王五', 10, '1', null, null),
('333', '赵六', 30, '2', null, null)
"""
spark.sql(insertsql)
spark.table("adata").show
/*
+---+------+---+---+-------------------+-------+
| id| name|age|id2| updated|deleted|
+---+------+---+---+-------------------+-------+
|111| 张三| 23| 1| null| null|
|111| 张三| 24| 1|2017/07/01 00:02:00| 0|
|111| 张三| 25| 1|2018/05/25 09:00:10| 0|
|222| 李四| 44| 1| null| null|
|222| 李思| 45| 1|2017/07/01 00:02:00| 0|
|222| 李思| 45| 1|2018/05/25 09:00:10| 1|
|333| 王五| 10| 1| null| null|
|333| 赵六| 30| 2| null| null|
+---+------+---+---+-------------------+-------+
*/
val primary_key = "id, id2"
val selectsql = s"""
select * from
(select *,
row_number() over (partition by ${primary_key} order by updated desc) as updated_n
from adata
) tmp
where updated_n = 1
and (deleted is null or deleted != 1)
"""
spark.sql(selectsql).show
/*
+---+------+---+---+-------------------+-------+---------+
| id| name|age|id2| updated|deleted|updated_n|
+---+------+---+---+-------------------+-------+---------+
|333| 王五| 10| 1| null| null| 1|
|333| 赵六| 30| 2| null| null| 1|
|111| 张三| 25| 1|2018/05/25 09:00:10| 0| 1|
+---+------+---+---+-------------------+-------+---------+
*/