package sparkProgram
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Dataset, SparkSession}
/**
* @Description * @Author 黄仁议<[email protected]>
* @Version V1.0
* @Since 1.0
* @Date 2019/6/4 0004 19:10
* @Description * @ClassName Spark_Sql_UDF_IpSearch
*/
object Spark_Sql_UDF_IpSearch {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("Spark_Sql_UDF_IpSearch").master("local[2]").getOrCreate()
import sparkSession.implicits._
//解析字典数据
val lines: Dataset[String] = sparkSession.read.textFile("D:\\data\\ip.txt")
val dictDataset = lines.map(line=> {
val arr = line.split("[|]")
val startip = arr(2)
val endip = arr(3)
val province = arr(6)
(startip,endip,province)
})
//先把要广播的数据收集到driver
val array: Array[(String, String, String)] = dictDataset.collect()
import sparkSession.implicits._
val broadcaseVar: Broadcast[Array[(String, String, String)]] = sparkSession.sparkContext.broadcast(array)
//处理日志数据
val logData: Dataset[String] = sparkSession.read.textFile("D:\\data\\http.log")
val logFrame = logData.map(log=>{
val arr = log.split("[|]")
val ip = arr(1)
val ipnum = ipToLong(ip)
ipnum
}).toDF("ip")
logFrame.createOrReplaceTempView("t_log")
//ip => province
val fun = (ip:Long)=>{
val dict = broadcaseVar.value
val province = BinarySearch(dict,ip)
province
}
//注册一个自定义函数:ip=>province
sparkSession.udf.register("ipToProvince",fun)
val res = sparkSession.sql("SELECT ipToProvince(ip) province,count(*) counts FROM t_log GROUP BY " +
"province ORDER BY counts DESC")
res.show()
sparkSession.stop()
}
//把IP信息转化成long数值
def ipToLong(ip:String)={
val ipFragmengt: Array[String] = ip.split("[.]")
var ipLongNum = 0l
//通过移位操作,转化成Long类型的数值
for(elem <-ipFragmengt)
{
ipLongNum = elem.toLong | ipLongNum << 8L
}
ipLongNum
}
//long数值ip在字典数据里进行匹配,返回省份信息
def BinarySearch(arr:Array[(String,String,String)],ip:Long):String={
var low = 0
var high = arr.length
while (low <= high){
val mid = (low+high)/2
if((ip >= arr(mid)._1.toLong)&& ip <=arr(mid)._2.toLong)
return arr(mid)._3
if(ip < arr(mid)._1.toLong)
high = mid-1
else
low = mid+1
}
"undifined"
}
}
结果:
+--------+------+
|province|counts|
+--------+------+
| 陕西| 1824|
| 北京| 1535|
| 重庆| 868|
| 河北| 383|
| 云南| 126|
+--------+------+
spark案例三用Spark_Sql_UDF自定义函数实现
猜你喜欢
转载自blog.csdn.net/weixin_43562705/article/details/90806938
今日推荐
周排行