./bin/spark-shell
./bin/spark-shell --master yarn-client //在yarn上启动
val df=sql("select * from default.orders")
df.select("user_id").distinct.count()
//selectExpr里面可以用hive sql语句
df.selectExpr("max(cast(user_id as int))").show()
df.groupBy("order_dow").count().show()
val priors = spark.sql("select * from default.order_products_prior")
val df2 = df.join(priors,"order_id").cache
val df1 = df.groupBy("order_dow").count().cache()
df2.uppersist //从内存中释放
import org.apache.spark.sql.SparkSession
object TestFunc {
def main(args: Array[String]): Unit = {
// 实例化sparksession 在client端自动实例化sparksession
// Spark session available as 'spark'.
val spark = SparkSession
.builder()
.appName("test")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()
val df = spark.sql("select * from badou.orders")
val priors = spark.sql("select * from badou.order_products_prior")
"""
|4.每个用户根据order_hour_of_day这列的值对order_dow进行排序
|1 2 08
|1 3 07
|
|1 [(2,08),(3,07)]
|
|=> 1 [(3,07),(2,08)] 一个用户最喜爱购买商品的top3
|rdd: (user_id,(order_number,order_hour_of_day))
""".stripMargin
import spark.implicits._
val orderNumberSort = df.select("user_id","order_number","order_hour_of_day")
.rdd.map(x=>(x(0).toString,(x(1).toString,x(2).toString))) //DataFrame转RDD
.groupByKey()
.mapValues(_.toArray.sortWith(_._2<_._2).slice(0,2))
.toDF("user_id","order_sort_by_hour")
// udf
import org.apache.spark.sql.functions._
val plusUDF = udf((col1:String,col2:String)=>col1.toInt+col2.toInt)
df.withColumn("plus",plusUDF(col("order_number"),col("order_dow"))).show()
}
}