package com.qf.gp1707.day06
import java.sql.{Connection, Date, DriverManager, PreparedStatement}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 求区域用户访问量(每个省的访问量)
*/
object IPSearch {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("IPSearch")
.setMaster("local[2]")
val sc = new SparkContext(conf)
//獲取全國IP的分配的基礎數據
val ipInfo: Array[(String, String, String)] = sc.textFile("./src/main/scala/com/qf/gp1707/day06/ipsearch/ip.txt").map(line => {
//切分
val fields = line.split("\\|")
//開始IP
val startIP = fields(2)
//結束IP
val endIP = fields(3)
//省份
val province = fields(6)
(startIP, endIP, province)
}).collect()
//將需要廣播的數據廣播到集群中的相應的Executor 广播到所有Executor避免网络大量IO,但是主要内存溢出
val broadcast: Broadcast[Array[(String, String, String)]] = sc.broadcast(ipInfo)
val userInfo = sc.textFile("./src/main/scala/com/qf/gp1707/day06/ipsearch/http.log")
// 切分用户点击流日志,并根据二分查找得到该用户属于哪个省份
val provinceAndOne: RDD[(String, Int)] = userInfo.map(line => {
val fields = line.split("\\|")
val ip = fields(1) // 用户的ip
val ip_long = ip2Long(ip) // 用户ip转换为long类型
// 获取到ip基础信息
val arrIpInfo: Array[(String, String, String)] = broadcast.value
// 通过二分查找用户的ip属于哪个ip段,返回该ip段的索引
val index = binarySearch(arrIpInfo, ip_long)
// 根据索引查找对应的省份
val province = arrIpInfo(index)._3
(province, 1)
})
val reduced: RDD[(String, Int)] = provinceAndOne.reduceByKey(_ + _)
println(reduced.collect().toBuffer)
reduced.foreachPartition(data2Mysql)
sc.stop()
}
def ip2Long(ip: String): Long = {
val fragments: Array[String] = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length) {
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
def binarySearch(arrIpInfo: Array[(String, String, String)], ip: Long): Int = {
var low = 0
var high = arrIpInfo.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= arrIpInfo(middle)._1.toLong) && (ip <= arrIpInfo(middle)._2.toLong)) {
return middle
}
if(ip < arrIpInfo(middle)._1.toLong) {
high = middle - 1
} else {
low = middle + 1
}
}
-1
}
val data2Mysql = (it: Iterator[(String, Int)]) => {
var conn: Connection = null
var ps: PreparedStatement = null;
val sql = "insert into location_info(location, counts, access_data) values(?,?,?)"
val jdbcUrl = "jdbc:mysql://192.168.88.131:3306/bigdata?useUnicode=true&characterEncoding=utf8"
val user = "root"
val password = "root"
try {
conn = DriverManager.getConnection(jdbcUrl, user, password)
it.foreach(line => {
ps = conn.prepareStatement(sql)
ps.setString(1, line._1)
ps.setInt(2, line._2)
ps.setDate(3, new Date(System.currentTimeMillis()))
ps.executeUpdate()
})
} catch {
case e: Exception => println(e.printStackTrace())
} finally {
if (ps != null)
ps.close()
if (conn != null)
conn.close()
}
}
}
日志格式如下:(备注:日志完整版请到本人博客主页上传文件中找到ipsearch压缩包)
1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.8.0|1.1.63.255|16844800|16859135|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.2.0.0|1.2.1.255|16908288|16908799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.2.2.0|1.2.2.255|16908800|16909055|亚洲|中国|北京|北京|海淀|北龙中网|110108|China|CN|116.29812|39.95931
1.2.4.0|1.2.4.255|16909312|16909567|亚洲|中国|北京|北京||中国互联网信息中心|110100|China|CN|116.405285|39.904989
1.2.5.0|1.2.7.255|16909568|16910335|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.2.8.0|1.2.8.255|16910336|16910591|亚洲|中国|北京|北京||中国互联网信息中心|110100|China|CN|116.405285|39.904989
对应的Mysql设计如下:
create database if not exists `bigdata`;
USE `bigdata`;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*Table structure for table `ip` */
DROP TABLE IF EXISTS `ip`;
CREATE TABLE `ip` (
`ip_start` varchar(45) DEFAULT NULL,
`ip_end` varchar(45) DEFAULT NULL,
`ip_start_num` bigint(20) DEFAULT NULL,
`ip_end_num` bigint(20) DEFAULT NULL,
`continent` varchar(45) DEFAULT NULL,
`country` varchar(45) DEFAULT NULL,
`province` varchar(45) DEFAULT NULL,
`city` varchar(45) DEFAULT NULL,
`district` varchar(45) DEFAULT NULL,
`isp` varchar(45) DEFAULT NULL,
`area_code` varchar(45) DEFAULT NULL,
`country_english` varchar(45) DEFAULT NULL,
`country_code` varchar(45) DEFAULT NULL,
`longitude` varchar(45) DEFAULT NULL,
`latitude` varchar(45) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;