scala spark-core 综合练习

package day01

import scala.util.matching.Regex
/**
  * 提供一些操作Apache Log的工具类供SparkCore使用
  */
case class ApacheAccessLog(
                             ipAddress: String, // IP地址
                             clientId: String, // 客户端唯一标识符
                             userId: String, // 用户唯一标识符
                             serverTime: String, // 服务器时间
                             method: String, // 请求类型/方式
                             endpoint: String, // 请求的资源
                             protocol: String, // 请求的协议名称
                             responseCode: Int,// 请求返回值:比如:200、401
                             contentSize: Long // 返回的结果数据大小

                          )

object ApacheAccessLog {
  // Apache日志的正则
  val PARTTERN: Regex =
    """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  /**
    * 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false
    *
    * @param line
    * @return
    */
  def isValidateLogLine(line: String): Boolean = {
    val options = PARTTERN.findFirstMatchIn(line)

    if (options.isEmpty) {
      false
    } else {
      true
    }
  }

  /**
    * 解析输入的日志数据
    *
    * @param line
    * @return
    */
  def parseLogLine(line: String): ApacheAccessLog = {
    if (!isValidateLogLine(line)) {
      throw new IllegalArgumentException("参数格式异常")
    }

    // 从line中获取匹配的数据
    val options = PARTTERN.findFirstMatchIn(line)

    // 获取matcher
    val matcher = options.get

    // 构建返回值
    ApacheAccessLog(
      matcher.group(1), // 获取匹配字符串中第一个小括号中的值
      matcher.group(2),
      matcher.group(3),
      matcher.group(4),
      matcher.group(5),
      matcher.group(6),
      matcher.group(7),
      matcher.group(8).toInt,
      matcher.group(9).toLong
    )
  }
}
package day01

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 1:求返回结果数据的平均值、最大值、最小值
  * 2:统计各个返回值出现的数据次数
  * 3:获取访问次数超过N次的IP地址
  * 增加难度:对IP地址进行限制,黑名单的IP地址不做统计
  * 4:获取访问次数最多的前N个资源=》TopN操作
  */
object Access_log {
//110.75.173.48 - - [30/May/2013:23:59:58 +0800] "GET /thread-36410-1-9.html HTTP/1.1" 200 68629
def main(args: Array[String]): Unit = {
  val topN= 2000
  val topK =3
  val conf = new SparkConf().setAppName("Access_log").setMaster("local[*]")
  val sc =new SparkContext(conf)

  //读取数据
  val lines: RDD[String] = sc.textFile(args(0))
  //对数据进行匹配
  val logsRDD: RDD[ApacheAccessLog] = lines.filter(line =>ApacheAccessLog.isValidateLogLine(line)).map(line=>{
    ApacheAccessLog.parseLogLine(line)
  })
    //因为要对数据进行多次处理,所有对数据进行优化
  logsRDD.cache()//将数据保存到内存中
  /**
    * 1:求返回结果数据的平均值、最大值、最小值
    */
  val contentSizeRdd: RDD[Long] = logsRDD.map(line=>(line.contentSize))
  contentSizeRdd.cache()
  val contentSizeMax: Long = contentSizeRdd.max()
  val contentSizeMin: Long = contentSizeRdd.min()
  val contentSizeSum: Double = contentSizeRdd.sum()
  val contentSizeCount: Long = contentSizeRdd.count()
  val age: Double =1.0 *contentSizeSum /contentSizeCount

  println(s"平均值: $age, 最大值:$contentSizeMax, 最小值:$contentSizeMin")
  //释放内存中的资源
  contentSizeRdd.unpersist(true)

  /**
    * 2:统计各个返回值出现的数据次数
    */
  val responseCodeRdd: RDD[(Int, Int)] = logsRDD.map(line=>(line.responseCode,1)).reduceByKey(_+_)
  println("每个状态返回值得次数:"+"\t\n"+s"${responseCodeRdd.collect().mkString("\t\n")}")

  /**
    * 3:获取访问次数超过N次的IP地址
    *  增加难度:对IP地址进行限制,黑名单的IP地址不做统计
    */

    val blackIPs=Array("110.75.173.48", "220.181.89.186")
    //定义广播变量,这里方便点,就写死了
    val broadcasted: Broadcast[Array[String]] = sc.broadcast(blackIPs)
  //对ip地址进行黑名单过滤
  val ipAddressRdd: RDD[(String, Int)] = logsRDD.filter(log => (!broadcasted.value.contains(log.ipAddress))).map(line => (line.ipAddress, 1))
    .reduceByKey(_ + _).filter(tp => tp._2 > topN)
    println(s"ipAddress :${ipAddressRdd.collect().mkString(", ")}")

  /**
    * 获取访问次数最多的前N个资源=》TopN操作
    */
  val endpointRDD: Array[(String, Int)] = logsRDD.map(line=>(line.endpoint,1)).reduceByKey(_+_).top(topK)(TupleOrdering)
  println(s"endpoint : ${endpointRDD.mkString(",")}")

  //释放内存
  logsRDD.unpersist(true)
  //关闭资源
  sc.stop()
}
}

//自定义排序规则
object TupleOrdering extends Ordering[(String,Int)]{
  override def compare(x: (String, Int), y: (String, Int)): Int = {
    x._2.compare(y._2)
  }
}

猜你喜欢

转载自blog.csdn.net/LJ2415/article/details/85009080