Pyspark
注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark_SQL5
#博学谷IT学习技术支持
前言
今天继续分享Pyspark_SQL5。
一、基于Pandas完成UDF函数
自定义Python函数的要求: SeriesToSeries
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as win
if __name__ == '__main__':
print("spark pandas udf")
spark = SparkSession.builder.appName("spark pandas udf").master("local[*]") \
.config('spark.sql.shuffle.partitions', 200) \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
schema = StructType().add("a", IntegerType()) \
.add("b", IntegerType())
df = spark.createDataFrame([
(1, 5),
(2, 6),
(3, 7),
(4, 8),
], schema=schema)
df.createTempView("t1")
# 针对DSL格式
@F.pandas_udf(returnType=IntegerType())
def sum_ab(a: pd.Series, b: pd.Series) -> pd.Series:
return a + b
# 针对SQL格式
spark.udf.register("sum_ab", sum_ab)
spark.sql("""
select
*,
sum_ab(a,b) as sum_ab
from t1
""").show()
df.select("*", sum_ab("a", "b").alias("sum_ab")).show()
spark.stop()
二、基于Pandas实现自定义UDAF函数
自定义Python函数的要求: SeriesTo标量
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as win
if __name__ == '__main__':
print("spark pandas udaf")
spark = SparkSession.builder.appName("spark pandas udaf").master("local[*]") \
.config('spark.sql.shuffle.partitions', 200) \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
schema = StructType().add("a", IntegerType()) \
.add("b", IntegerType())
df = spark.createDataFrame([
(1, 5),
(2, 6),
(3, 7),
(4, 8),
], schema=schema)
df.createTempView("t1")
df.show()
# 针对DLS格式
@F.pandas_udf(returnType=FloatType())
def avg_column(a: pd.Series) -> float:
return a.mean()
# 针对SQL格式
spark.udf.register("avg_column", avg_column)
spark.sql("""
select
*,
avg_column(a) over(order by a) as a_avg,
avg_column(b) over(order by b) as b_avg
from t1
""").show()
df.select("*",
avg_column("a").over(win.orderBy("a")).alias("a_avg"),
avg_column("b").over(win.orderBy("b")).alias("b_avg")).show()
spark.stop()
三、基于Pandas实现自定义UDF和UDAF综合案例
数据:
_c0,对手,胜负,主客场,命中,投篮数,投篮命中率,3分命中率,篮板,助攻,得分
0,勇士,胜,客,10,23,0.435,0.444,6,11,27
1,国王,胜,客,8,21,0.381,0.286,3,9,28
2,小牛,胜,主,10,19,0.526,0.462,3,7,29
3,火箭,负,客,8,19,0.526,0.462,7,9,20
4,快船,胜,主,8,21,0.526,0.462,7,9,28
5,热火,负,客,8,19,0.435,0.444,6,11,18
6,骑士,负,客,8,21,0.435,0.444,6,11,28
7,灰熊,负,主,10,20,0.435,0.444,6,11,27
8,活塞,胜,主,8,19,0.526,0.462,7,9,16
9,76人,胜,主,10,21,0.526,0.462,7,9,28
# 需求1:助攻这一列+10
# 需求2:篮板+助攻的次数
# 需求3:统计胜负的平均分
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as win
if __name__ == '__main__':
print("spark pandas udf example")
spark = SparkSession.builder.appName("spark pandas udf example").master("local[*]") \
.config('spark.sql.shuffle.partitions', 200) \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
df = spark.read.format("csv") \
.option("header", True).option("inferSchema", True) \
.load("file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/data.csv")
df.createTempView("t1")
df.printSchema()
df.show()
# 需求1:助攻这一列+10
# 需求2:篮板+助攻的次数
# 需求3:统计胜负的平均分
@F.pandas_udf(returnType=IntegerType())
def method01(score: pd.Series) -> pd.Series:
return score + 10
@F.pandas_udf(returnType=IntegerType())
def method02(score1: pd.Series, score2: pd.Series) -> pd.Series:
return score1 + score2
@F.pandas_udf(returnType=FloatType())
def method03(score: pd.Series) -> float:
return score.mean()
spark.udf.register("method01", method01)
spark.udf.register("method02", method02)
spark.udf.register("method03", method03)
spark.sql("""
select
*,
method01(`助攻`) as z_10,
method02(`助攻`,`篮板`) as z_l_plus
from t1
""").show()
#
spark.sql("""
select
`胜负`,
method03(`得分`) as avg_score
from t1
group by `胜负`
""").show()
df.select("*",
method01("助攻").alias("z_10"),
method02("助攻", "篮板").alias("z_l_plus")
).show()
df.select("胜负", "得分").groupBy("胜负").agg(
method03("得分").alias("avg_score")
).show()
spark.stop()
总结
今天主要和大家分享了pandas UDF和UDAF。