//Scala程序
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object BiAppDailyReportScala {
def main(args: Array[String]): Unit = {
/**
* 初始化spark程序
*/
val conf = new SparkConf()
conf.setMaster("local[*]")
.setAppName("StdVillageInfoScala")
val spark = SparkSession
.builder()
.config(conf)
.config("spark.sql.shuffle.partitions", 50)
.config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
.enableHiveSupport()
.getOrCreate()
/**
* 处理分区问题
*/
val par_day = if (args.length == 1) {
args(0)
} else {
MyUtilScala.getDiffDayFromToday("yyyyMMdd", -1)
}
val date = MyUtilScala.str2Date("yyyyMMdd", par_day)
val dayAgo31 = MyUtilScala.getDiffDayFromToday("yyyyMMdd", -31, date)
val yestodayStartMillions = MyUtilScala.str2Date("yyyyMMdd HH:mm:ss", par_day + " 00:00:00").getTime
val dayAgo31StartTime = MyUtilScala.formatDate("yyyy-MM-dd HH:mm:ss", MyUtilScala.str2Date("yyyyMMdd HH:mm:ss", dayAgo31 + " 00:00:00"))
val dayAgo1EndTime = MyUtilScala.formatDate("yyyy-MM-dd HH:mm:ss", MyUtilScala.str2Date("yyyyMMdd HH:mm:ss", par_day + " 23:59:59"))
val dayAgo1StartTime = MyUtilScala.formatDate("yyyy-MM-dd HH:mm:ss", MyUtilScala.str2Date("yyyyMMdd HH:mm:ss", par_day + " 00:00:00"))
val load_app_user_sql =
s"""
|select user_id,phone,system_time
|from std.std_app_user_info
|where par_day = '$par_day'
""".stripMargin
val load_app_log_sql =
s"""
|select user_id,system_time
|from std.std_app_log
|where par_day >= '$dayAgo31' and par_day <= '$par_day'
""".stripMargin
val load_household_info_sql =
s"""
|select phone,city_id
|from std.std_household_info
|where par_day = '$par_day'
""".stripMargin
spark.sql(load_app_user_sql).createTempView("app_user_tmp")
spark.sql(load_app_log_sql).createTempView("app_log_tmp")
spark.sql(load_household_info_sql).createTempView("household_info_tmp")
val process_app_trend_sql =
s"""
|select
| '$par_day' as bydate
| ,count(app_user.user_id) as regcnt
| ,count(case when app_log.user_id is not null then app_user.user_id end) as actcnt
|from
| (select user_id
| from app_user_tmp
| ) as app_user
|left join
| (select user_id
| from app_log_tmp
| group by user_id
| ) as app_log
| on app_user.user_id = app_log.user_id
""".stripMargin
val app_trend_df = spark.sql(process_app_trend_sql)
val process_app_bycity_sql =
s"""
|select
| '$par_day' as bydate
| ,case when household_info.city_id in ('1','73','236') then household_info.city_id else '000' end as cityid
| ,count(app_user.user_id) as regcnt
| ,count(case when app_log.user_id is not null then app_user.user_id end) as act30cnt
| ,count(
| case when app_log.user_id is not null and app_log.system_time >= '$yestodayStartMillions'
| then app_user.user_id end) as actcnt
| ,count(
| case when app_log.system_time >= '$dayAgo31StartTime' and app_log.system_time <= '$dayAgo1EndTime'
| then app_user.user_id end) as new30cnt
| ,count(
| case when app_log.system_time >= '$dayAgo1StartTime' and app_log.system_time <= '$dayAgo1EndTime'
| then app_user.user_id end) as newcnt
|from
| (select user_id,phone,system_time
| from app_user_tmp
| ) as app_user
|left join
| (select phone,city_id
| from household_info_tmp
| ) as household_info
| on app_user.phone = household_info.phone
|left join
| (select user_id,system_time
| from app_log_tmp
| ) as app_log
| on app_user.user_id = app_log.user_id
|group by
| case when household_info.city_id in ('1','73','236') then household_info.city_id else '000' end
""".stripMargin
val app_bycity_df = spark.sql(process_app_bycity_sql)
val prop = new Properties()
prop.setProperty("user", MyUtilScala.readConf("mysql.conf", "user"))
prop.setProperty("password", MyUtilScala.readConf("mysql.conf", "password"))
val url = MyUtilScala.readConf("mysql.conf", "url")
val app_trend_table = "DailyReportAppTrend"
val app_bycitytable = "DailyReportAppByCity"
app_trend_df
.write
.mode(SaveMode.Append)
.jdbc(url, app_trend_table, prop)
app_bycity_df
.write
.mode(SaveMode.Append)
.jdbc(url, app_bycitytable, prop)
// spark.sql(process_app_trend_sql).show()
/**
* 结束spark程序
*/
spark.stop()
}
}
活跃解析
猜你喜欢
转载自www.cnblogs.com/lazybones/p/10551320.html
今日推荐
周排行