前年的文章,备份
spark编写了一段统计CSDN中不同邮箱的密码白痴程序,数据来源于最近被公布的csdn明文数据,以下代码重点是spark在编程性上的体验,性能上比较没有意思,这点数据单机都能计算,计算结果仅供娱乐。 感谢CSDN提供这么好的数据样本
目前网上能找到的spark范例很少,这个算是扫盲吧。写这段代码过程顺便也熟悉了scala的各种写法。
代码主要分3部分:
1、白痴密码的top分布
2、白痴密码指数化,并封装成广播变量
3、计算邮箱的密码白痴程度。
主要用到了spark的广播变量,例子中使用P2P的广播方式。
在做collect前做下filter,减少返回的数据量。这里默认过滤掉重复率1000以下的邮箱,同时认为重复率1000以上的为白痴密码,重复率越高,白痴程度越高。PS:本来是想计算出每个邮箱白痴密码占的比例,但是白痴密码的界限很难设定,重复率100W和1000的还是有区别的,所以用权重的方式,就是将其指数化。
主要用到了map,filter,reduceByKey,broadcast, 目前对spark了解程序还不够,这个实现并不一定是最好的解决方案,也欢迎讨论。PS:开始实现了一个更烂的。
直接给代码。(> 就是 >)
import spark.SparkContext import org.slf4j.Logger import org.slf4j.LoggerFactory import SparkContext._ import scala.collection.mutable.HashMap object CSDNPassword { val log = LoggerFactory.getLogger("CSDNPassword"); def main(args: Array[String]) { System.setProperty("spark.broadcast.factory", "spark.broadcast.BitTorrentBroadcastFactory") val spark = new SparkContext(args(0), "CSDNPassword", null, Array(args(1))) val slices = if (args.length > 2) args(2).toInt else 2 log.info("spark.slices :" + slices) val file = spark.textFile(args(3), slices).filter(_.contains("@")) val passwords = file.map { user => val user_s = user.split("#") val pw = user_s(1).trim() if (pw.length() > 5) (pw, 1) else ("Nil", 0) } val passwordLimit = if(args.length > 4) args(4).toInt else 1000 val mailLimit = if(args.length > 5) args(5).toInt else 1000 val passwordModel = passwords.reduceByKey(_ + _) val filePassoword = passwordModel.filter(_._2 > passwordLimit)//收集出现超过passwordLimit的密码 val passwordCollect = filePassoword.collect(); /** * 白痴密码指数化,(count*1000)/max */ val zhishu = 1000 val passwordMap = new HashMap[String, Int]() val sortPassword = passwordCollect.sortWith((A, B) => A._2 > B._2) val max = sortPassword(0)._2 passwordMap += (sortPassword(0)._1 -> zhishu) for (i <- 1 to sortPassword.length - 1) { val weight = (sortPassword(i)._2 * zhishu) / max passwordMap += (sortPassword(i)._1 -> weight) } //包装成广播变量, 提供给所有work使用 val passwordMapBro = spark.broadcast(passwordMap) log.info("---------------------create broadcast-------------------------") ///////////////////////////////////////////////////////////////// val users2 = file.map{ user => val user_s = user.split("#") val pw = user_s(1).trim() val weight = passwordMapBro.value.getOrElse(pw, 0)//得到密码白痴程序 val user2_s = user.split("@") if (pw.length() > 5 && user2_s.length > 1) (user2_s(1).toLowerCase(), (1, weight)) else ("Nil",(0, 0)) } val user2ReduceByKey = users2.reduceByKey{(a,b)=> (a._1 + b._1, a._2 + b._2) } val yes = user2ReduceByKey.filter(_._2._1 > mailLimit).map{user => //过滤出超过mailLimit的邮箱 (user._1, user._2._2/user._2._1) //除以基数 } //收集,排序 val arrays = yes.collect() val sortMail = arrays.sortWith((A, B) => A._2 > B._2) val len = if(sortMail.length > 100) 100 else sortMail.length for (i <- 0 to len -1) { log.info(i + " mail:" + sortMail(i)._1 + ", 密码白痴程度:" + sortMail(i)._2) } spark.stop() } }
计算结果如图:
统计结果仅供娱乐。