SparkDataFrame 将多行数据平铺展开

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/dpengwang/article/details/82859059

场景如下,某个user_id在不用的月份在id1~id_12上都有数据,机器学习训练时,这些id对应不同月份上的数据要么在一起进行编码,最终成为一行数据,要么把这些id对应不同月份的数据展开,成为新的feature,比如id1在201805上的数据可以作为一个新的feature–>201805_id1
在这里插入图片描述
变成如下的形式(每个id在不同月份的值变成了新id)

user_id 201703.0_id1 201307.0_id1 201306.0_id and so on …
def gen_expr(feature_array: Array[String]): Seq[Column] = {
      val expr =feature_array.toSeq.filter(x=>x!="user_id" && x!="month")
        .flatMap(x=>Seq(toDouble(first(col(x))).alias(x.toString.replace(".","_"))))
      expr
    }
val mibasic = spark.read.parquet("/user/h_mifi/user/mifi_compete/mibasic_compete_data")
val mibasic_roll =mibasic.groupBy("user_id")
      .pivot("month")
      .agg(expr1.head,expr1.tail:_*)
      .na.fill(-1)

主要使用的是spark dataset的pivot函数,先groupBy("user_id"),将数据按照user_id进行分组,然后pivot我们需要展开的属性,比如我们要在month上展开,那么就要pivot("month"),pivot后要跟agg函数,因为pivot后,对应字段下会生成一列数据,比如pivot("month")后,如果原始数据中id1在201805有n条数据,那么pivot后,对应的201805_id1字段下就会有高度为n的列,在pivot后紧跟的agg函数就要对这些列进行处理,比如使用sum,max,min等等,在这里我用first取第一行,因为原始数据中每个month下每个id只有一行数据。另外要说明的是,pivot后产生新字段的命名规则是:pivot字段名_原始id名,比如代码中的201805.0_id1

猜你喜欢

转载自blog.csdn.net/dpengwang/article/details/82859059