idmapping(用户唯一标识)第二天数据生成
object idmapping_nextday {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
import spark.implicits._
val applog: Dataset[String] = spark.read.textFile("F:\\yiee_logs\\2020-01-11\\app\\doit.mall.access.log.8")
val data: RDD[Array[String]] = applog.rdd.map(line => {
val jsonObj = JSON.parseObject(line)
val userObj = jsonObj.getJSONObject("user")
val uid = userObj.getString("uid")
val phoneObj = userObj.getJSONObject("phone")
val imei = phoneObj.getString("imei")
val mac = phoneObj.getString("mac")
val imsi = phoneObj.getString("imsi")
val androidId = phoneObj.getString("androidId")
val deviceId = phoneObj.getString("deviceId")
val uuid = phoneObj.getString("uuid")
Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
})
val vertices: RDD[(Long, String)] = data.flatMap(arr => {
for (biaoshi <- arr) yield (biaoshi.hashCode.toLong, biaoshi)
})
val edges: RDD[Edge[String]] = data.flatMap(arr => {
for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
})
.map(edge => (edge, 1)).reduceByKey(_ + _)
.filter(tp => tp._2 > 2)
.map(x => x._1)
edges
val schema = new StructType()
.add("biaoshi_hashcode",DataTypes.LongType)
.add("guid",DataTypes.LongType)
val preDayIdmp: DataFrame = spark.read.parquet("data/dict/idmapping_dict/2020-02-15")
val preDayIdmpVertices = preDayIdmp.rdd.map({
case Row(idFlag: VertexId, guid: VertexId) =>
(idFlag, "")
})
val preDayEdges = preDayIdmp.rdd.map(row => {
val idFlag = row.getAs[VertexId]("biaoshi_hashcode")
val guid = row.getAs[VertexId]("guid")
Edge(idFlag, guid, "")
})
val graph = Graph(vertices.union(preDayIdmpVertices),edges.union(preDayEdges))
val res_tuples: VertexRDD[VertexId] = graph.connectedComponents().vertices
val idMap = preDayIdmp.rdd.map(row => {
val idFlag = row.getAs[VertexId]("biaoshi_hashcode")
val guid = row.getAs[VertexId]("guid")
(idFlag, guid)
}).collectAsMap()
val bc = spark.sparkContext.broadcast(idMap)
val todayIdmpResult: RDD[(VertexId, VertexId)] = res_tuples.map(tp => (tp._2, tp._1))
.groupByKey()
.mapPartitions(iter=>{
val idmpMap = bc.value
iter.map(tp => {
var todayGuid = tp._1
val ids = tp._2
var find = false
for (elem <- ids if !find) {
val maybeGuid: Option[Long] = idmpMap.get(elem)
if (maybeGuid.isDefined) {
todayGuid = maybeGuid.get
find = true
}
}
(todayGuid,ids)
})
})
.flatMap(tp=>{
val ids = tp._2
val guid = tp._1
for (elem <- ids) yield (elem,guid)
})
import spark.implicits._
todayIdmpResult.coalesce(1).toDF("biaoshi_hashcode", "guid").write.parquet("data/dict/idmapping_dict/2020-02-16")
spark.stop()
}
}