本人菜鸡一只,正在努力学习提升自己,在工作中遇到了这个问题,因此记录下来!
前言:
提到窗口函数,我会第一个想起ROW_NUMBER()这个函数,实际上他还有两个兄弟,他们三个的区别这里稍微说下(因为我主要不是来介绍他们三个的)
以下三个函数相同点:
新增一列,根据一定规则将数据分区然后按照一定规则排序
三个函数的不同点:
ROW_NUMBER() :当有重复数据的时候,字段按照顺序会一直往下
RANK() :当有重复数据的时候,字段按照顺序会一直往下
DENSE_RANK() :当有重复数据的时候,顺序会一样
举例:
原始数据: | ROW_NUMBER | RANK | DENSE_RANK |
1000 | 1 | 1 | 1 |
1123 | 2 | 2 | 2 |
2322 | 3 | 3 | 3 |
2322 | 4 | 3 | 3 |
2412 | 5 | 5 | 4 |
我相信在做数据处理的时候,窗口函数还是蛮常用的,其实还有些其他的,比如:lead,lag,各种聚合(count,sum,max)等等
闲话不多说,进入正题!
说说场景(以下的数据量级皆为瞎编,但是不影响阅读):
我获得了800W+用户的行为数据,但是我现在要精确的保留500W的用户行为,当然这500W的用户行为也是有要求的,需要的是活跃的用户,那么我通过活跃用户的规则得到了600W+的活跃用户id,接下来,我要将这两份数据join然后获得其中的500W,看乱了?稍微整理下
1、原始日志表,约有800多W的用户行为
2、活跃用户id表,约有600多W的用户行为
3、前两表join得到活跃原始日志表,可以得到600多W的用户行为
4、我要从活跃原始日志表得到精确的500W用户的行为(每天的500W是随机的,不是固定的某500W个人,但是因为join后得到的是600W多,所以每天跑出来的500W不会和昨天的用户偏差特别的大)
因此我要做的有两件事情
第一、如何打随机值?
要满足两个条件:
条件一:相同用户当天应该有多条记录,那相同用户id的每条记录随机值必须是一样的,但是相同用户id每天的随机值又应该是不一样的(我知道有点绕,我解释下,假设用户A今天的随机值是0.2341,那他今天所有行为的值都应该是0.2341,但是明天或者后天就不应该是0.2341,不能一直都是同一个值,否则我就无法做到每天是500W的随机的用户)
条件二:随机出来的值,必须是自然数的(比如,用户A今天随机值是0.2341,用户B是0.5341,用户C是0.3421,那么应该转换成A的值是1,B的值是3,C的值是2),这样我才能通过where 随机值<500W来过滤掉500W活跃用户以外的行为。
因此:我的解决方法是hash(concat(日期,用户id)),得到随机值,然后通过窗口函数的order by 这个随机值,得到排序
第二、打上了随机值,如何过滤行为数据,使得只剩精确的500W用户的行为?
是有几套方案的,让我一一细说
方案一:有些人可能会说,那先把活跃用户id表,随机排序下,做成500W的,然后再去关联原始日志表不就可以了吗?
不好意思,这个不行,为什么,因为活跃用户500W,但是你不能确定这500W的活跃用户在今天一定有访问行为,这就会导致500W活跃表和原始日志关联之后,可能只剩400W+的用户行为,这就不满足一开始的需求,因此方案一pass!
方案二:取出原始日志中的所有用户id(当然是要去重过后的),然后直接和活跃的用户id相join,将这部分join的数据打上随机值,然后limit 500W,再拿这500W用户id去和今天的行为join,这样得到的肯定是500W的用户行为,该方案听起来复杂,但是能够完成需求。
方案三:原始日志表和活跃用户id两张表join,获得所有活跃用户原始日志表,然后通过DENSE_RANK() OVER(ORDER BY 随机值)来为每个用户打上一个唯一的序号,然后通过这个序号来where 500W,看起来方案三更合理(因为步骤少)
所以可用的就是方案二和方案三,我首选方案三,看起来比较简单,步骤少,代码量也少(能用2行代码完成的,千万别写10行来完成,不过代码越简单,越要有清晰明白的注释)
然鹅,我错了!
请看下面三张图,稍微解释下,我在保存数据之前会先count下数据的条数,然后完全count不了
看这种情况,貌似是数据倾斜,也就是说可能很多数据都灌倒了为数不多的分区中处理,而且这种倾斜不是等待就能解决的,基本上算卡死(我试过增加shuffle的分区数,也解决不了这个问题),我尝试去掉了DENSE_RANK()函数,结果这种情况立马消失了。所以,我判断肯定是窗口函数的实现机制,导致数据需要重分区并且导致了数据倾斜,那具体是如何导致的,恕本人才疏学浅,我真没搞明白,如果有大佬知道为什么还希望不吝赐教!
因此方案三看来确实。。。。是我想简单了!
所以我最后用方案二解决了这个问题。
我再描述下我的做法:
1、读入原始日志表,做cache
2、根据原始日志表生成当天的用户id(select 用户id from 原始日志表 group by 用户id)
3、将第二步的表和活跃表join,生成当天活跃用户id,并且通过ROW_NUMBER() OVER(ORDER BY 随机值) as rank,然后根据rank来where rank <= 500W得到当天500W活跃用户id
4、再将这500W的活跃用户id关联一开始做了cache的原始日志表,这样就保留下来500W用户的行为了
5、亲测这种方法,可行,而且不慢(当然大家还得根据自己集群和数据的情况调整资源空间大小)
所以总结下:
1、步骤少的方法不一定是最好的方法
2、join我觉得是相对来说比较常见(不会数据倾斜)的操作了,当然如果数据倾斜,join我们也有一些相应的方法来解决!
比如:spark2.2.0:记录一次数据倾斜的解决(扩容join)!:https://blog.csdn.net/lsr40/article/details/80322553
3、我还是希望以后我有机会弄懂窗口函数的实现原理
4、当有思路的时候,一定要把想法留下来(其实我的方案三和方案二是一起想出来的,然后我pass了方案二,直接采用方案三,后来方案三怎么都走不通,看到代码中有注释写着方案二,因此尝试了下,然后才解决了这个问题)