版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lv_yishi/article/details/83959031
(1)RDD之间的join
import org.apache.spark.sql.SparkSession
object joinDemo {
//BroadcastHashJoin
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("joinDemo").master("local[*]").getOrCreate()
import sparkSession.implicits._
val lines =sparkSession.createDataset(Array("1,hanmeimei,China","2,tom,USA","3,Jerry,apan"))
//数据整理
val tupleDS = lines.map(line=>{
val field=line.split(",")
val id = field(0).toLong
val name = field(1)
val country = field(2)
(id,name,country)
})
val df1=tupleDS.toDF("id","name","country")
val countrys = sparkSession.createDataset(List("China,中国","USA,美国"))
val tupleDS2 =countrys.map(line=>{
val fields = line.split(",")
val ename = fields(0)
val cname = fields(1)
(ename,cname)
})
val df2 = tupleDS2.toDF("ename","cname")
//创建一个视图
df1.createTempView("t_user")
df2.createTempView("t_countrys")
//join操作
val res = sparkSession.sql("select u.id,u.name,c.cname from t_user as u join t_countrys as c on country=ename")
res.show()
//查看执行计划
res.explain()
sparkSession.stop()
}
}
(2)DataFrame之间的join
import org.apache.spark.sql.{DataFrame, SparkSession}
object joinDemo2 {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("joinDemo").master("local[*]").getOrCreate()
import sparkSession.implicits._
//对表的大小不限制,默认10M
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin",true)
val df1: DataFrame =Seq(
(0,"tom"),
(1,"jeryy"),
(2,"kate")
).toDF("id","name")
val df2=Seq(
(0,18),
(1,20),
(3,30)
).toDF("aid","age")
df2.repartition()
val res = df1.join(df2,$"id"===$"aid")
res.explain()
res.show()
}
}