活跃解析

//Scala程序
import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object BiAppDailyReportScala {
  def main(args: Array[String]): Unit = {
    /**
      * 初始化spark程序
      */
    val conf = new SparkConf()
    conf.setMaster("local[*]")
      .setAppName("StdVillageInfoScala")

    val spark = SparkSession
      .builder()
      .config(conf)
      .config("spark.sql.shuffle.partitions", 50)
      .config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
      .enableHiveSupport()
      .getOrCreate()

    /**
      * 处理分区问题
      */
    val par_day = if (args.length == 1) {
      args(0)
    } else {
      MyUtilScala.getDiffDayFromToday("yyyyMMdd", -1)
    }

    val date = MyUtilScala.str2Date("yyyyMMdd", par_day)

    val dayAgo31 = MyUtilScala.getDiffDayFromToday("yyyyMMdd", -31, date)
    val yestodayStartMillions = MyUtilScala.str2Date("yyyyMMdd HH:mm:ss", par_day + " 00:00:00").getTime
    val dayAgo31StartTime = MyUtilScala.formatDate("yyyy-MM-dd HH:mm:ss", MyUtilScala.str2Date("yyyyMMdd HH:mm:ss", dayAgo31 + " 00:00:00"))
    val dayAgo1EndTime = MyUtilScala.formatDate("yyyy-MM-dd HH:mm:ss", MyUtilScala.str2Date("yyyyMMdd HH:mm:ss", par_day + " 23:59:59"))
    val dayAgo1StartTime = MyUtilScala.formatDate("yyyy-MM-dd HH:mm:ss", MyUtilScala.str2Date("yyyyMMdd HH:mm:ss", par_day + " 00:00:00"))

    val load_app_user_sql =
      s"""
         |select user_id,phone,system_time
         |from std.std_app_user_info
         |where par_day = '$par_day'
      """.stripMargin
    val load_app_log_sql =
      s"""
         |select user_id,system_time
         |from std.std_app_log
         |where par_day >= '$dayAgo31' and par_day <= '$par_day'
      """.stripMargin
    val load_household_info_sql =
      s"""
         |select phone,city_id
         |from std.std_household_info
         |where par_day = '$par_day'
      """.stripMargin
    spark.sql(load_app_user_sql).createTempView("app_user_tmp")
    spark.sql(load_app_log_sql).createTempView("app_log_tmp")
    spark.sql(load_household_info_sql).createTempView("household_info_tmp")

    val process_app_trend_sql =
      s"""
         |select
         |   '$par_day' as bydate
         |   ,count(app_user.user_id) as regcnt
         |   ,count(case when app_log.user_id is not null then app_user.user_id end) as actcnt
         |from
         |   (select user_id
         |   from app_user_tmp
         |   ) as app_user
         |left join
         |   (select user_id
         |   from app_log_tmp
         |   group by user_id
         |   ) as app_log
         |   on app_user.user_id = app_log.user_id
      """.stripMargin
    val app_trend_df = spark.sql(process_app_trend_sql)

    val process_app_bycity_sql =
      s"""
         |select
         |   '$par_day' as bydate
         |   ,case when household_info.city_id in ('1','73','236') then household_info.city_id else '000' end as cityid
         |   ,count(app_user.user_id) as regcnt
         |   ,count(case when app_log.user_id is not null then app_user.user_id end) as act30cnt
         |   ,count(
         |      case when app_log.user_id is not null and app_log.system_time >= '$yestodayStartMillions'
         |      then app_user.user_id end) as actcnt
         |   ,count(
         |      case when app_log.system_time >= '$dayAgo31StartTime' and app_log.system_time <= '$dayAgo1EndTime'
         |      then app_user.user_id end) as new30cnt
         |   ,count(
         |      case when app_log.system_time >= '$dayAgo1StartTime' and app_log.system_time <= '$dayAgo1EndTime'
         |      then app_user.user_id end) as newcnt
         |from
         |   (select user_id,phone,system_time
         |   from app_user_tmp
         |   ) as app_user
         |left join
         |   (select phone,city_id
         |   from household_info_tmp
         |   ) as household_info
         |   on app_user.phone = household_info.phone
         |left join
         |   (select user_id,system_time
         |   from app_log_tmp
         |   ) as app_log
         |   on app_user.user_id = app_log.user_id
         |group by
         |   case when household_info.city_id in ('1','73','236') then household_info.city_id else '000' end
      """.stripMargin
    val app_bycity_df = spark.sql(process_app_bycity_sql)


    val prop = new Properties()
    prop.setProperty("user", MyUtilScala.readConf("mysql.conf", "user"))
    prop.setProperty("password", MyUtilScala.readConf("mysql.conf", "password"))
    val url = MyUtilScala.readConf("mysql.conf", "url")
    val app_trend_table = "DailyReportAppTrend"
    val app_bycitytable = "DailyReportAppByCity"

    app_trend_df
      .write
      .mode(SaveMode.Append)
      .jdbc(url, app_trend_table, prop)

    app_bycity_df
      .write
      .mode(SaveMode.Append)
      .jdbc(url, app_bycitytable, prop)

    //    spark.sql(process_app_trend_sql).show()

    /**
      * 结束spark程序
      */
    spark.stop()
  }
}
 

猜你喜欢

转载自www.cnblogs.com/lazybones/p/10551320.html