版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/eases_stone/article/details/84309096
方法1, 利用case class ,然后利用spark.implicits._ 调用toDF 构造 DataFrame,详见下面。
package com.sydney.dream.dianshang.create_data
import java.util.Random
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
object CreateDataFrameDemoV1 {
// 用户信息模型, 分别对应用户登录id, 用户名字, 用户别名, 用户年龄,
// 用户职业,用户城市,用户性别
case class User(userId: Long, userName: String, nickName: String,
age: Int, professional: String, city: String, sex: String )
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("InitSomeData")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
val random = new Random
// 模拟生成10000位顾客信息
val sexs = Array("male", "female")
// 分别对用户id, 用户名, 用户别名,年龄, 职业, 城市, 性别
// val persons = ArrayBuffer[(Long, String, String,
// Int, String, String, String)]()
val users = ArrayBuffer[User]()
var m = 0
while (m < 10000) {
val userId: Int = m
val userName = "user" + m
val name = "name" + m
val age = random.nextInt(100)
val professional = "professional" + random.nextInt(1000)
val city = "city" + random.nextInt(100)
val sex = sexs(random.nextInt(2))
val user = User(userId.toString.toLong, userName, name,
age, professional, city, sex)
users += user
// 错误的玩法:persons += (userId.toString.toLong, userName, name, age, professional, city, sex)
m = m + 1
}
// 同过scala 数据集,生成RDD
val personsRdd = sc.parallelize(users)
import spark.implicits._
// 通过隐式转换,来生成最终的数据
val personDF = personsRdd.toDF()
personDF.show()
}
}
方法二, 利用createDataFrame 指定StructScheme , 见下面
package com.sydney.dream.dianshang.create_data
import java.util.Random
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import scala.collection.mutable.ArrayBuffer
/**
* 从RDD 构造 spark DataFrame 的方法2, 利用StructScheme,
*/
object CreateDataFrameDemoV2 {
// 用户信息模型, 分别对应用户登录id, 用户名字, 用户别名, 用户年龄,
// 用户职业,用户城市,用户性别
case class User(userId: Long, userName: String, nickName: String,
age: Int, professional: String, city: String, sex: String )
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("InitSomeData")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
val random = new Random
// 模拟生成10000位顾客信息
val sexs = Array("male", "female")
// 分别对用户id, 用户名, 用户别名,年龄, 职业, 城市, 性别
val persons = ArrayBuffer[(Long, String, String,
Int, String, String, String)]()
var m = 0
while (m < 10000) {
val userId: Int = m
val userName = "user" + m
val name = "name" + m
val age = random.nextInt(100)
val professional = "professional" + random.nextInt(1000)
val city = "city" + random.nextInt(100)
val sex = sexs(random.nextInt(2))
val user = User(userId.toString.toLong, userName, name,
age, professional, city, sex)
persons += ((userId.toString.toLong, userName, name, age, professional, city, sex))
m = m + 1
}
// 同过scala 数据集,生成RDD
val personsRdd = sc.parallelize(persons)
// 以下是用手动制定scheme 的方式进行制定其表头信息,但是文中,我们用
// 反射的形式来映射schema
val personScheme = StructType(Array(
StructField("user_id", LongType, nullable = true),
StructField("user_name", StringType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("professional", StringType, nullable = true),
StructField("city", StringType, nullable = true),
StructField("sex", StringType, nullable = true)
))
val personsDf = spark.createDataFrame(personsRdd.map(line => Row(
line._1, line._2, line._3, line._4, line._5, line._6, line._7
)), personScheme)
// 展示所有的数据
println("展示所有的数据:" )
personsDf.show(100)
}
}