Spark零售项目联系代码

Spark零售项目联系代码

# -*- coding: utf-8 -*-
# Program function:读取数据
#
# -*- coding: utf-8 -*-
# Program function:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Import data types
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
os.environ['SPARK_HOME'] = '/export/server/spark-2.3.0-bin-hadoop2.7'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON


if __name__ == '__main__':
    # 1-环境变量
    spark = SparkSession.builder \
        .appName('test') \
        .getOrCreate()
    sc = spark.sparkContext
    # 2-获取数据
    jdbcDF = spark.read.format("json")\
        .option("header", True)\
        .option("sep", "\t")\
        .option("inferSchema", "true")\
        .load("file:///tmp/pycharm_project_553/EduAnalysis/data/mini.json")

    jdbcDF.printSchema()
    jdbcDF.show(2)

    """
    尽管字段很多, 但是我们用到的只需要几个即可:
    1. storeProvince 省份
    2. storeID 店铺ID
    3. receivable 订单营收金额
    4. dateTS 订单时间戳
    5. payType 订单支付类型
    """

    """
    过滤掉:
    1. 单价大于10000的订单
    2. 省份为字符串null的数据
    """
    df = jdbcDF.filter("receivable<10000 and storeProvince!= 'null'").\
        dropna(thresh=3, subset=['storeProvince', 'storeCity', 'storeDistrict'])

    df.show()

    # 省分组, 求和销售额, 排序金额 即可
    df_sale = df.select("storeProvince", "receivable"). \
        groupBy("storeProvince"). \
        sum("receivable"). \
        withColumnRenamed('sum(receivable)', 'money').\
        withColumn('money', F.round('money', 2)).\
        orderBy('money', ascending=False)
    df_sale.show()

    """
    TOP3 销售省份中, 有多少家店铺 日均销售额 1000+
    """
    top3_province_df = df_sale.limit(3).select("storeProvince").\
        withColumnRenamed("storeProvince", "top3_province")
    top3_province_joined_df = df.join(top3_province_df, top3_province_df['top3_province'] == df['storeProvince'],
                                      "inner")


    top3_province_joined_df.groupBy('storeProvince', 'storeID',
                                    F.from_unixtime(df['dateTS'].substr(0, 10), 'yyyy-MM-dd').alias('day')). \
        sum('receivable'). \
        filter('sum(receivable) > 1000'). \
        dropDuplicates(subset=['storeID']). \
        groupBy("storeProvince").count().show()

    # 4.4 	需求3-TOP3 省份中 各个省份的平均单单价
    top3_province_joined_df.groupBy("storeProvince").avg("receivable").show()


    def to_percent(data):
        return str(round(data * 100)) + '%'


    udf_to_percent = F.udf(to_percent, StringType())
    top3_province_joined_df.select("storeProvince", "payType").createTempView("province_paytype")




    spark.sql("""
        select storeProvince, payType, (count(*)/pnt) as percent from
        (select *, count(*) over(partition by storeProvince) as pnt from province_paytype)
        group by storeProvince, payType, pnt
    """).select("storeProvince", "payType", udf_to_percent("percent")).show()





猜你喜欢

转载自blog.csdn.net/HELLOWORLD2424/article/details/129821328
今日推荐