Spark零售项目联系代码
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
os.environ['SPARK_HOME'] = '/export/server/spark-2.3.0-bin-hadoop2.7'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python"
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
spark = SparkSession.builder \
.appName('test') \
.getOrCreate()
sc = spark.sparkContext
jdbcDF = spark.read.format("json")\
.option("header", True)\
.option("sep", "\t")\
.option("inferSchema", "true")\
.load("file:///tmp/pycharm_project_553/EduAnalysis/data/mini.json")
jdbcDF.printSchema()
jdbcDF.show(2)
"""
尽管字段很多, 但是我们用到的只需要几个即可:
1. storeProvince 省份
2. storeID 店铺ID
3. receivable 订单营收金额
4. dateTS 订单时间戳
5. payType 订单支付类型
"""
"""
过滤掉:
1. 单价大于10000的订单
2. 省份为字符串null的数据
"""
df = jdbcDF.filter("receivable<10000 and storeProvince!= 'null'").\
dropna(thresh=3, subset=['storeProvince', 'storeCity', 'storeDistrict'])
df.show()
df_sale = df.select("storeProvince", "receivable"). \
groupBy("storeProvince"). \
sum("receivable"). \
withColumnRenamed('sum(receivable)', 'money').\
withColumn('money', F.round('money', 2)).\
orderBy('money', ascending=False)
df_sale.show()
"""
TOP3 销售省份中, 有多少家店铺 日均销售额 1000+
"""
top3_province_df = df_sale.limit(3).select("storeProvince").\
withColumnRenamed("storeProvince", "top3_province")
top3_province_joined_df = df.join(top3_province_df, top3_province_df['top3_province'] == df['storeProvince'],
"inner")
top3_province_joined_df.groupBy('storeProvince', 'storeID',
F.from_unixtime(df['dateTS'].substr(0, 10), 'yyyy-MM-dd').alias('day')). \
sum('receivable'). \
filter('sum(receivable) > 1000'). \
dropDuplicates(subset=['storeID']). \
groupBy("storeProvince").count().show()
top3_province_joined_df.groupBy("storeProvince").avg("receivable").show()
def to_percent(data):
return str(round(data * 100)) + '%'
udf_to_percent = F.udf(to_percent, StringType())
top3_province_joined_df.select("storeProvince", "payType").createTempView("province_paytype")
spark.sql("""
select storeProvince, payType, (count(*)/pnt) as percent from
(select *, count(*) over(partition by storeProvince) as pnt from province_paytype)
group by storeProvince, payType, pnt
""").select("storeProvince", "payType", udf_to_percent("percent")).show()