这次我们介绍人口密度计算的案例
在案例之前会有spark的scala统计点击流日志代码小案例。
1. 击流日志分析案例
1.1需求
下面的系列数据主要包括用户IP,浏览网址,时间,请求方式等,统计PV,UV,和被访问的TopN,
下面是一条样例数据
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
1.2代码实现
统计PV量,设置相同的K-V(PV,1),通过reduceByKey进行累加
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//todo:需求:利用spark分析点击流日志数据------PV总量
object PV {
def main(args: Array[String]): Unit = {
//1、创建sparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")
//2、创建sparkContext对象
val sc = new SparkContext(sparkConf)
//3、读取数据文件
val data: RDD[String] = sc.textFile("D:\\data\\access.log")
//4、获取pv总量
println(data.count())
//5、关闭sparkContext
sc.stop()
}
}
统计UV数量,通过
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object UV {
def main(args: Array[String]): Unit = {
//todo:构建SparkConf和 SparkContext
val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
//todo:读取数据
val file: RDD[String] = sc.textFile("d:\\access.log")
//todo:对每一行分隔,获取IP地址
val ips: RDD[(String)] = file.map(_.split(" ")).map(x=>x(0))
//todo:对ip地址进行去重,最后输出格式 ("UV",1)
val uvAndOne: RDD[String] = ips.distinct()
println(uvAndOne.count())
sc.stop()
}
}
UV统计,在转换为Map之前,对IP进行去重使用distinct
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object UV {
def main(args: Array[String]): Unit = {
//todo:构建SparkConf和 SparkContext
val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
//todo:读取数据
val file: RDD[String] = sc.textFile("d:\\temp\\access.log")
//todo:对每一行分隔,获取IP地址
val ips: RDD[(String)] = file.map(_.split(" ")).map(x=>x(0))
//todo:对ip地址进行去重,最后输出格式 ("UV",1)
val uvAndOne: RDD[(String, Int)] = ips.distinct().map(x=>("UV",1))
//todo:聚合输出
val totalUV: RDD[(String, Int)] = uvAndOne.reduceByKey(_+_)
totalUV.foreach(println)
//todo:数据结果保存
totalUV.saveAsTextFile("d:\\temp\\out")
sc.stop()
}
}
统计访问Top5,截取字段信息的访问地址位置,通过reduceByKey进行累加,按照降序排序,再使用take截取前五位
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 求访问的topN
*/
object TopN {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//读取数据
val file: RDD[String] = sc.textFile("d:\\data\\access.log")
//将一行数据作为输入,输出(来源URL,1)
val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1))
//聚合 排序-->降序
val result: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false)
//通过take取topN,这里是取前5名
val finalResult: Array[(String, Int)] = result.take(5)
println(finalResult.toBuffer)
sc.stop()
}
}
2. 使用spark实现IP地址查询
2.1 需求
在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。
一般的实现思路是,首先通过运营上获取用户访问的日志信息,一个基站塔会对应一个IP段的区域,通过不同的用户的IP地址,可以对用户的位置进行定位,进而推断出人流密度。
2.2 实现思路
这里我们需要两张表,一张是IP的城市字段信息,一张是用户的访问日志
截取部分如下
//访问日志的信息
20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59 from: http://bsalsa.com/ )|http://show.51.com/main.php|
//城市IP的信息
1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
思路
- 加载城市ip段信息,获取ip起始数字和结束数字,经度,维度
- 加载日志数据,获取ip信息,然后转换为数字,和ip段比较
- 比较的时候采用二分法查找,找到对应的经度和维度
- 然后对经度和维度做单词计数
2.3代码实现
mport java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
//todo:需求:利用spark实现ip地址查询
object IpLocation {
//将ip地址转化为Long 192.168.200.100
def ip2Long(ip: String): Long = {
//将IP地址转为Long,这里有固定的算法
val ips: Array[String] = ip.split("\\.")
var ipNum:Long=0L
for(i <- ips){
ipNum=i.toLong | ipNum <<8L
}
ipNum
}
//利用二分法查询,查询到long类型的数字在数组的下标
def binarySearch(ipNum: Long, broadcastValue: Array[(String, String, String, String)]): Int = {
//开始下标
var start=0
//结束下标
var end =broadcastValue.length-1
while(start <=end){
var middle=(start+end)/2
if(ipNum>=broadcastValue(middle)._1.toLong && ipNum<=broadcastValue(middle)._2.toLong){
return middle
}
if(ipNum < broadcastValue(middle)._1.toLong){
end=middle
}
if(ipNum> broadcastValue(middle)._2.toLong){
start=middle
}
}
-1
}
//将数据写入到mysql表中
def data2mysql(iter:Iterator[((String,String), Int)]) = {
//定义数据库连接
var conn:Connection=null
//定义PreparedStatement
var ps:PreparedStatement=null
//编写sql语句,?表示占位符
val sql="insert into iplocation(longitude,latitude,total_count) values(?,?,?)"
//获取数据库连接
conn=DriverManager.getConnection("jdbc:mysql://192.168.200.100:3306/spark","root","123456")
ps=conn.prepareStatement(sql)
//遍历迭代器
iter.foreach(line=>{
//对sql语句中的?占位符赋值
ps.setString(1,line._1._1)
ps.setString(2,line._1._2)
ps.setInt(3,line._2)
//执行sql语句
ps.execute()
})
}
def main(args: Array[String]): Unit = {
//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("IpLocation").setMaster("local[2]")
//2、创建sparkContext
val sc = new SparkContext(sparkConf)
//3、读取城市ip信息,获取ip开始数字,ip结束数字,经度,维度
val city_ip_rdd: RDD[(String, String, String, String)] = sc.textFile("D:\\data\\ip.txt").map(_.split("\\|")).map(x => (x(2), x(3), x(13), x(14)))
//将城市ip信息广播到每一个worker节点
val cityIpBroadcast: Broadcast[Array[(String, String, String, String)]] = sc.broadcast(city_ip_rdd.collect())
//4、获取日志数据,获取所有ip地址
val destData: RDD[String] = sc.textFile("D:\\data\\20090121000132.394251.http.format").map(_.split("\\|")(1))
//5、遍历所有IP地址,去广播变量中进行匹配,获取对应的经度和维度
val result: RDD[((String, String), Int)] = destData.mapPartitions(iter => {
//获取广播变量的值
val broadcastValue: Array[(String, String, String, String)] = cityIpBroadcast.value
//遍历迭代器获取每一个ip地址
iter.map(ip => {
//将IP地址转化为Long
val ipNum: Long = ip2Long(ip)
//拿到long类型数字去广播变量中进行匹配,利用二分查询
val index: Int = binarySearch(ipNum, broadcastValue)
//返回结果数据 ((经度,维度),1)
((broadcastValue(index)._3, broadcastValue(index)._4), 1)
})
})
//6、把相同经度和维度出现的次数累加
val finalResult: RDD[((String, String), Int)] = result.reduceByKey(_+_)
//7、打印输出结果
finalResult.collect().foreach(x=>println(x))
//保存结果数据到mysql表中
finalResult.foreachPartition(data2mysql)
//8、关闭sc
sc.stop()
}
}