object DataProcess extends App {
val spark = SparkSession
.builder()
.appName("UserBehiviorToHHDataPartition")
.getOrCreate()
val dataCollection = spark.sparkContext.textFile("./testdata/ods")
import spark.implicits._
val mergePartitonStroragePath = "./testdata/dwd2"
val datadf = dataCollection.map( r => r.split("\t")).map(r => (r(0).toInt,(r(1).toLong/1000).toInt,r(2).toInt,r(4),r(5)))
.toDF("user_id","event_time","behivior_id","behivior_pop","record_date").filter($"user_id" === 4707776)
datadf.write.format("parquet").mode("overwrite").partitionBy("record_date").save(mergePartitonStroragePath)
spark.stop()
}
class JsonInfoGetString extends UDF2[String, String, String] {
def call(jsonInfo: String, keyName: String): String = {
var value: String = ""
try {
val typeOfHashMap: Type = new TypeToken[java.util.Map[String, String]]() {}.getType
val map: java.util.Map[String, String] = FunnelUtil.gson.fromJson(jsonInfo, typeOfHashMap)
if (map.containsKey(keyName))
value = map.get(keyName).toString
} catch {
case ex: Throwable =>
}
value
}
}
class JsonInfoGetLong extends UDF2[String, String, Long] {
def call(jsonInfo: String, keyName: String): Long = {
var value: Long = 0l
try {
val typeOfHashMap: Type = new TypeToken[java.util.Map[String, Object]]() {}.getType
val map: java.util.Map[String, Object] = FunnelUtil.gson.fromJson(jsonInfo, typeOfHashMap)
if (map.containsKey(keyName))
value = map.get(keyName).toString.toFloat.toLong
} catch {
case ex: Throwable =>
}
value
}
}
class JsonInfoGetDouble extends UDF2[String, String, Float] {
def call(jsonInfo: String, keyName: String): Float = {
var value: Float = 0.0f
try {
val typeOfHashMap: Type = new TypeToken[java.util.Map[String, Object]]() {}.getType
val map: java.util.Map[String, Object] = FunnelUtil.gson.fromJson(jsonInfo, typeOfHashMap)
if (map.containsKey(keyName))
value = map.get(keyName).toString.toFloat
} catch {
case ex: Throwable =>
}
value
}
}
def benchTest(name:String,sql:String): Unit ={
val b2 = System.currentTimeMillis()
spark.sql(sql).show(10,false)
val e2 = System.currentTimeMillis()
println(s"name $name , cost: ${e2-b2}")
}
交流 code 09-18
猜你喜欢
转载自lingzhi007.iteye.com/blog/2393628
今日推荐
周排行