Spark 从RDD 到DataFrme 的两种方法, Get 打卡

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/eases_stone/article/details/84309096

方法1, 利用case class ,然后利用spark.implicits._ 调用toDF 构造 DataFrame,详见下面。

package com.sydney.dream.dianshang.create_data

import java.util.Random

import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer


object CreateDataFrameDemoV1 {

    // 用户信息模型, 分别对应用户登录id, 用户名字, 用户别名, 用户年龄,
    // 用户职业,用户城市,用户性别
    case class User(userId: Long, userName: String, nickName: String,
                    age: Int, professional: String, city: String, sex: String )

    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder
            .appName("InitSomeData")
            .master("local[*]")
            .getOrCreate()
        val sc = spark.sparkContext
        
        val random = new Random

        // 模拟生成10000位顾客信息
        val sexs = Array("male", "female")
        // 分别对用户id, 用户名, 用户别名,年龄, 职业, 城市, 性别
        //        val persons = ArrayBuffer[(Long, String, String,
        //            Int, String, String, String)]()
        val users = ArrayBuffer[User]()
        var m = 0
        while (m < 10000) {
            val userId: Int = m
            val userName = "user" + m
            val name = "name" + m
            val age = random.nextInt(100)
            val professional = "professional" + random.nextInt(1000)
            val city = "city" + random.nextInt(100)
            val sex = sexs(random.nextInt(2))
            val user = User(userId.toString.toLong, userName, name,
                age, professional, city, sex)
            users += user
            //          错误的玩法:persons += (userId.toString.toLong, userName, name, age, professional, city, sex)
            m = m + 1
        }

        // 同过scala 数据集,生成RDD
        val personsRdd = sc.parallelize(users)

        import spark.implicits._
        // 通过隐式转换,来生成最终的数据
        val personDF = personsRdd.toDF()

        personDF.show()

    }
}

方法二, 利用createDataFrame 指定StructScheme , 见下面

package com.sydney.dream.dianshang.create_data

import java.util.Random

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

import scala.collection.mutable.ArrayBuffer

/**
  * 从RDD 构造 spark DataFrame 的方法2,  利用StructScheme,
  */
object CreateDataFrameDemoV2 {

    // 用户信息模型, 分别对应用户登录id, 用户名字, 用户别名, 用户年龄,
    // 用户职业,用户城市,用户性别
    case class User(userId: Long, userName: String, nickName: String,
                    age: Int, professional: String, city: String, sex: String )

    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder
            .appName("InitSomeData")
            .master("local[*]")
            .getOrCreate()
        val sc = spark.sparkContext

        val random = new Random

        // 模拟生成10000位顾客信息
        val sexs = Array("male", "female")
        // 分别对用户id, 用户名, 用户别名,年龄, 职业, 城市, 性别
        val persons = ArrayBuffer[(Long, String, String,
            Int, String, String, String)]()
        var m = 0
        while (m < 10000) {
            val userId: Int = m
            val userName = "user" + m
            val name = "name" + m
            val age = random.nextInt(100)
            val professional = "professional" + random.nextInt(1000)
            val city = "city" + random.nextInt(100)
            val sex = sexs(random.nextInt(2))
            val user = User(userId.toString.toLong, userName, name,
                age, professional, city, sex)
            persons += ((userId.toString.toLong, userName, name, age, professional, city, sex))
            m = m + 1
        }

        // 同过scala 数据集,生成RDD
        val personsRdd = sc.parallelize(persons)

        // 以下是用手动制定scheme 的方式进行制定其表头信息,但是文中,我们用
        // 反射的形式来映射schema
        val personScheme = StructType(Array(
            StructField("user_id", LongType, nullable = true),
            StructField("user_name", StringType, nullable = true),
            StructField("name", StringType, nullable = true),
            StructField("age", IntegerType, nullable = true),
            StructField("professional", StringType, nullable = true),
            StructField("city", StringType, nullable = true),
            StructField("sex", StringType, nullable = true)
        ))
        val personsDf = spark.createDataFrame(personsRdd.map(line => Row(
            line._1, line._2, line._3, line._4, line._5, line._6, line._7
        )), personScheme)
        // 展示所有的数据
        println("展示所有的数据:" )
        personsDf.show(100)


    }
}

猜你喜欢

转载自blog.csdn.net/eases_stone/article/details/84309096