package day01
import scala.util.matching.Regex
/**
* 提供一些操作Apache Log的工具类供SparkCore使用
*/
case class ApacheAccessLog(
ipAddress: String, // IP地址
clientId: String, // 客户端唯一标识符
userId: String, // 用户唯一标识符
serverTime: String, // 服务器时间
method: String, // 请求类型/方式
endpoint: String, // 请求的资源
protocol: String, // 请求的协议名称
responseCode: Int,// 请求返回值:比如:200、401
contentSize: Long // 返回的结果数据大小
)
object ApacheAccessLog {
// Apache日志的正则
val PARTTERN: Regex =
"""^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r
/**
* 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false
*
* @param line
* @return
*/
def isValidateLogLine(line: String): Boolean = {
val options = PARTTERN.findFirstMatchIn(line)
if (options.isEmpty) {
false
} else {
true
}
}
/**
* 解析输入的日志数据
*
* @param line
* @return
*/
def parseLogLine(line: String): ApacheAccessLog = {
if (!isValidateLogLine(line)) {
throw new IllegalArgumentException("参数格式异常")
}
// 从line中获取匹配的数据
val options = PARTTERN.findFirstMatchIn(line)
// 获取matcher
val matcher = options.get
// 构建返回值
ApacheAccessLog(
matcher.group(1), // 获取匹配字符串中第一个小括号中的值
matcher.group(2),
matcher.group(3),
matcher.group(4),
matcher.group(5),
matcher.group(6),
matcher.group(7),
matcher.group(8).toInt,
matcher.group(9).toLong
)
}
}
package day01
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 1:求返回结果数据的平均值、最大值、最小值
* 2:统计各个返回值出现的数据次数
* 3:获取访问次数超过N次的IP地址
* 增加难度:对IP地址进行限制,黑名单的IP地址不做统计
* 4:获取访问次数最多的前N个资源=》TopN操作
*/
object Access_log {
//110.75.173.48 - - [30/May/2013:23:59:58 +0800] "GET /thread-36410-1-9.html HTTP/1.1" 200 68629
def main(args: Array[String]): Unit = {
val topN= 2000
val topK =3
val conf = new SparkConf().setAppName("Access_log").setMaster("local[*]")
val sc =new SparkContext(conf)
//读取数据
val lines: RDD[String] = sc.textFile(args(0))
//对数据进行匹配
val logsRDD: RDD[ApacheAccessLog] = lines.filter(line =>ApacheAccessLog.isValidateLogLine(line)).map(line=>{
ApacheAccessLog.parseLogLine(line)
})
//因为要对数据进行多次处理,所有对数据进行优化
logsRDD.cache()//将数据保存到内存中
/**
* 1:求返回结果数据的平均值、最大值、最小值
*/
val contentSizeRdd: RDD[Long] = logsRDD.map(line=>(line.contentSize))
contentSizeRdd.cache()
val contentSizeMax: Long = contentSizeRdd.max()
val contentSizeMin: Long = contentSizeRdd.min()
val contentSizeSum: Double = contentSizeRdd.sum()
val contentSizeCount: Long = contentSizeRdd.count()
val age: Double =1.0 *contentSizeSum /contentSizeCount
println(s"平均值: $age, 最大值:$contentSizeMax, 最小值:$contentSizeMin")
//释放内存中的资源
contentSizeRdd.unpersist(true)
/**
* 2:统计各个返回值出现的数据次数
*/
val responseCodeRdd: RDD[(Int, Int)] = logsRDD.map(line=>(line.responseCode,1)).reduceByKey(_+_)
println("每个状态返回值得次数:"+"\t\n"+s"${responseCodeRdd.collect().mkString("\t\n")}")
/**
* 3:获取访问次数超过N次的IP地址
* 增加难度:对IP地址进行限制,黑名单的IP地址不做统计
*/
val blackIPs=Array("110.75.173.48", "220.181.89.186")
//定义广播变量,这里方便点,就写死了
val broadcasted: Broadcast[Array[String]] = sc.broadcast(blackIPs)
//对ip地址进行黑名单过滤
val ipAddressRdd: RDD[(String, Int)] = logsRDD.filter(log => (!broadcasted.value.contains(log.ipAddress))).map(line => (line.ipAddress, 1))
.reduceByKey(_ + _).filter(tp => tp._2 > topN)
println(s"ipAddress :${ipAddressRdd.collect().mkString(", ")}")
/**
* 获取访问次数最多的前N个资源=》TopN操作
*/
val endpointRDD: Array[(String, Int)] = logsRDD.map(line=>(line.endpoint,1)).reduceByKey(_+_).top(topK)(TupleOrdering)
println(s"endpoint : ${endpointRDD.mkString(",")}")
//释放内存
logsRDD.unpersist(true)
//关闭资源
sc.stop()
}
}
//自定义排序规则
object TupleOrdering extends Ordering[(String,Int)]{
override def compare(x: (String, Int), y: (String, Int)): Int = {
x._2.compare(y._2)
}
}