idmapping(用户唯一标识)真实数据第一天数据生成
object idmapping_tabay {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
import spark.implicits._
val applog: Dataset[String] = spark.read.textFile("F:\\yiee_logs\\2020-01-11\\app\\doit.mall.access.log.8")
val data: RDD[Array[String]] = applog.rdd.map(line => {
val jsonObj = JSON.parseObject(line)
val userObj = jsonObj.getJSONObject("user")
val uid = userObj.getString("uid")
val phoneObj = userObj.getJSONObject("phone")
val imei = phoneObj.getString("imei")
val mac = phoneObj.getString("mac")
val imsi = phoneObj.getString("imsi")
val androidId = phoneObj.getString("androidId")
val deviceId = phoneObj.getString("deviceId")
val uuid = phoneObj.getString("uuid")
Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
})
val vertices: RDD[(Long, String)] = data.flatMap(arr => {
for (biaoshi <- arr) yield (biaoshi.hashCode.toLong, biaoshi)
})
val edges: RDD[Edge[String]] = data.flatMap(arr => {
for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
})
.map(edge => (edge, 1)).reduceByKey(_ + _)
.filter(tp => tp._2 > 2)
.map(x => x._1)
edges
val graph = Graph(vertices,edges)
val res: VertexRDD[VertexId] = graph.connectedComponents().vertices
res.toDF("biaoshi_hashcode","guid").write.parquet("data/dict/idmapping_dict/2020-02-15")
spark.stop()
}
}