Dataset的使用
按照官网上面的提示,我将DataFrame转换成Dataset来使用,代码如下:
package com.doudou.www.sql01
import org.apache.spark.sql.{Encoders, SparkSession}
object JDBCApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.
builder().appName("JDBCApp").master("local[2]").enableHiveSupport().getOrCreate()
val person = spark.read.format("jdbc")
.option("url","jdbc:mysql://doudou:3306/test")
.option("user","doudou")
.option("password","123456")
.option("dbtable","person").load()
person.createOrReplaceTempView("person")
val df = spark.sql("select name,age,gender,case when gender='male' then '男' else '女' end as sex from person")
import spark.implicits._
case class Person(name:String,age:Int,gender:String)
val df2 = df.select($"name",$"age",$"gender")
val df3 = df2.as[Person]
// val df2 = df.select("name,age,gender").as[Person]
df3.filter(x => x.name == "doudou" | x.name == "mengyao").map(x => x.name).show
spark.stop()
}
}
这个时候,我没有遇到编译的错误,但是在运行的过程当中idea给我返回了这样的信息:
Error:(28, 21) Unable to find encoder for type Person. An implicit Encoder[Person] is needed to store Person instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val df3 = df2.as[Person]
Error:(28, 21) not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[Person])org.apache.spark.sql.Dataset[Person].
Unspecified value parameter evidence$2.
val df3 = df2.as[Person]
问题出在case class Person这个类的定义位置,因为是定义在了object里面了,所以才出现这样的问题,现在将其提前到object的外面去定义,代码如下:
package com.doudou.www.sql01
import org.apache.spark.sql.{Encoders, SparkSession}
case class Person(name:String,age:Int,gender:String)
object JDBCApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.
builder().appName("JDBCApp").master("local[2]").enableHiveSupport().getOrCreate()
val person = spark.read.format("jdbc")
.option("url","jdbc:mysql://doudou:3306/test")
.option("user","doudou")
.option("password","123456")
.option("dbtable","person").load()
person.createOrReplaceTempView("person")
val df = spark.sql("select name,age,gender,case when gender='male' then '男' else '女' end as sex from person")
import spark.implicits._
val df2 = df.select($"name",$"age",$"gender")
val df3 = df2.as[Person]
// val df2 = df.select("name,age,gender").as[Person]
df3.filter(x => x.name == "doudou" | x.name == "mengyao").map(x => x.name).show
spark.stop()
}
}
运行就不会报错了。