Spark之广播变量详解------附加案例实现

1.广播变量的意义
当大数据进行业务处理的时候,所需要的数据存储在HDFS上,但是HDFS上的数据都是一块一块的,如果数据不完整的话就不能进行业务的正常处理,所以需要将数据全部集中起来,去通过广播,让所有进行处理的executors获得全部的数据。
2.下面一张高清大图说明广播的过程,Driver将数据collect到一起,然后将完整的数据分发到executors上,进行相应的处理
在这里插入图片描述
3.广播变量的例子
需求:查询日志中每个省所拥有的资源数

import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  *  广播变量的例子
  * Created by zhangjingcun on 2018/9/27 9:29.
  */
object IPLocation {
  val rulesFilePath = "D:\\data\\ip.txt"
  val accessFilePath = "D:\\data\\access.log"

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("IPLocation").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //1:读取IP规则资源库
    val ipRulesLines: RDD[String] = sc.textFile(rulesFilePath)
    //2:整理IP规则
    //117.93.244.0|117.93.255.255|1969091584|1969094655|亚洲|中国|江苏|盐城||电信|320900|China|CN|120.139998|33.377631
    val ipRules: RDD[(Long, Long, String)] = ipRulesLines.map(line => {
      val fields = line.split("[|]")
      val startNum = fields(2).toLong
      val endNum = fields(3).toLong
      val province = fields(6)
      (startNum, endNum, province)
    })
    //
    //var result = ipRules.collect()
    //println(result.toBuffer)
    //3: 将IP规则收集到Driver(collect)
    val allIpRulesInDriver: Array[(Long, Long, String)] = ipRules.collect()
    //4:将全部的ip资源库通过广播的方式发送到Executor
    //广播之后,在Driver端获取了广播变量的引用(如果没有广播完,就不往下走)
    val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(allIpRulesInDriver)

    //5: 读取访问日志
    val accessLogLine: RDD[String] = sc.textFile(accessFilePath)
    //6: 整理访问日志
    //20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59  from: http://bsalsa.com/ )|http://show.51.com/main.php|
    val provinceAddOne: RDD[(String, Int)] = accessLogLine.map(line=>{
      val fields = line.split("[|]")
      val ip = fields(1)
      val ipNum = MyUtils.ip2Long(ip)
      //通过广播变量的引用获取Executor中的全部IP规则,然后进行匹配ip规则
      val allIpRulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value
      //根据规则进行查找,(用二分查找算法)
      var province = "未知"
      val index = MyUtils.binarySearch(allIpRulesInExecutor, ipNum)
      if(index != -1 ){
        province = allIpRulesInExecutor(index)._3
      }
      (province, 1)
    })

    //7: 按照省份的访问次数进行计数
    val reduceRDD: RDD[(String, Int)] = provinceAddOne.reduceByKey(_+_)

    //8:打印结果
    //    var result = reduceRDD.collect()
    //    println(result.toBuffer)

    //计算结果,将计算好的结果写入到mysql中
    //触发一个action,将数据写到mysql的逻辑函数传入
    //    reduceRDD.foreach(t =>{
    //      val conn = DriverManager.getConnection("jdbc:mysql://bigdata01:3306/bigdata", "root", "123456")
    //      val pstm = conn.prepareStatement("Insert Into .... values(?.?)")
    //      pstm.setString(1, t._1)
    //      pstm.setInt(2, t._2)
    //      pstm.executeUpdate()
    //      pstm.close()
    //      conn.close()
    //    })
    reduceRDD.foreachPartition(MyUtils.data2MySQL _)

    //9:释放资源
    sc.stop()
  }
}

4.工具类

package day06

import java.sql.{Connection, DriverManager, PreparedStatement}

/**
  * Created by zx on 2017/12/12.
  * 两个工具类,一个转换成long,一个二分查找
  */
object MyUtils {
//Ip转换成Long类型
  def ip2Long(ip:String):Long ={
    val fragments = ip.split("[.]")
    var ipNum =0L
    for(i<- 0 until fragments.length){
      ipNum = fragments(i).toLong | ipNum << 8L
    }
    ipNum
  }

  def binarySearch(lines: Array[(Long,Long,String)],ip: Long):Int ={
    var low =0
    var high =lines.length-1
    while(low <=high){
      val middle =(low+high)/2
      if((ip>=lines(middle)._1) && (ip<=lines(middle)._2))
        return middle
      if(ip < lines(middle)._1)
        high=middle -1
      else{
        low =middle +1
      }
    }
    -1
  }
  def data2MySQL (it: Iterator[(String,Int)])= {
    //一个迭代器代表一个分区,分区中有多条数据
    //先获得一个JDBC连接
    val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "926718")
    //将数据通过Connection写入到数据库
    val pstm: PreparedStatement = conn.prepareStatement("insert into access_log values(?,?)") //将分区中的数据一条一条写入到MySQL
    it.foreach(tp => {
      pstm.setString(1, tp._1)
      pstm.setInt(2, tp._2)
      pstm.executeUpdate()
    }) //将分区中的数据全部写完之后,在关闭连接
    if (pstm != null) {
      pstm.close()
    }
    if (conn != null) {
      conn.close()
    }
  }

}

猜你喜欢

转载自blog.csdn.net/qq_41166135/article/details/82864632