package Test
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* 使用SparkSql实现access中的ip与ip规则的关联
* 前提条件:需要提前拿到全量的ip资源库信息,才可以生成DataFrame
*/
object SQLIIpLocation1 {
/**
* 定义一个ip转换的成十进制
* @param ip
* @return
*/
def ip2Long(ip:String):Long={
val fragments = ip.split("[.]")
var ipNum =0L
for(i<- 0 until fragments.length){
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SQLIIpLocation1")
.master("local[*]")
.getOrCreate()
//读取ip规则数据
val ipRulesLines: Dataset[String] = spark.read.textFile(args(0))
//导入隐式转换
import spark.implicits._
//整理ip规则数据
val tpDs: Dataset[(Long, Long, String)] = ipRulesLines.map(line => {
val fields: Array[String] = line.split("[|]")
val startNum = fields(2).toLong
val endNum = fields(3).toLong
val province = fields(6)
(startNum, endNum, province)
})
//将ip规则转换成DataFrame
val ipRulesDF: DataFrame = tpDs.toDF("start_num","end_num","province")
//将ip规则注册成视图
ipRulesDF.createOrReplaceTempView("v_ip_rules")
//读取访问日志数据
val accessLog: Dataset[String] = spark.read.textFile(args(1))
//整理访问日志数据
val ipLogs: DataFrame = accessLog.map(line => {
val fields: Array[String] = line.split("[|]")
val ip: String = fields(1)
ip2Long(ip)
}).toDF("ip")
//将ip日志注册成视图
ipLogs.createOrReplaceTempView("v_ip_logs")
val result = spark.sql("SELECT province,Count(1) counts From v_ip_rules a,v_ip_logs b WHERE b.ip >=a.start_num and b.ip <= a.end_num GROUP BY province ORDER BY counts desc")
result.show()
//释放资源
spark.stop()
}
}
SparkSql实现access中的ip与ip规则的关联(方法一)
猜你喜欢
转载自blog.csdn.net/LJ2415/article/details/85116378
今日推荐
周排行