不论是数据分析工作还是算法建模工作,都难免需要数据预处理,难免会遇到缺失值的处理,话不多说,看下面场景:
+--------+----+----------------+
|province|nums| time|
+--------+----+----------------+
| anhui| 1|2019-06-15 13:20|
| anhui| 2| null|
| anhui| 3| null|
| anhui| 4|2019-06-15 13:40|
| anhui| 5|2019-06-15 14:40|
| anhui| 6| null|
| beijing| 1|2019-06-15 13:42|
| beijing| 2| null|
| beijing| 3| null|
+--------+----+----------------+
如果需要对“province”进行分组,按照“nums”的递增顺序,对“time”中的缺失值进行前向填充,如果用pandas处理,简单,代码如下:
pdf=df.toPandas()
pdf.sort_values("nums").groupby("province").\
apply(lambda group: group.fillna(method='ffill'))
结果如下:
确实用pandas处理很方便,但是在pyspark中,并没有ffill(前向填充这一功能),要实现这个功能就要借助spark的windows函数,代码如下:
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys
# 开窗函数,以id做分组,指定排序方式,设置窗口大小
window = Window.partitionBy("province").orderBy("nums").rowsBetween(-sys.maxsize, 0)
# last函数,返回分组中的最后一个值。ignorenulls为True表示只对null值应用
filled = last(df["time"], ignorenulls=True).over(window)
df_update = df.withColumn("time", filled)
结果如下:
+--------+----+----------------+
|province|nums| time|
+--------+----+----------------+
| beijing| 1|2019-06-15 13:42|
| beijing| 2|2019-06-15 13:42|
| beijing| 3|2019-06-15 13:42|
| anhui| 1|2019-06-15 13:20|
| anhui| 2|2019-06-15 13:20|
| anhui| 3|2019-06-15 13:20|
| anhui| 4|2019-06-15 13:40|
| anhui| 5|2019-06-15 14:40|
| anhui| 6|2019-06-15 14:40|
+--------+----+----------------+
如果需要进行后向填充,写法也比较类似,只需要做出以下改变即可:
- 把 .rowsBetween(-sys.maxsize,0)改成 .rowsBetween(0,sys.maxsize);
- 把last改成first