本文旨在对基本的Spark RDD api进行简单练习
原日志格式:
从hdfs上读取日志文件:
val rdd = sc.textFile("hdfs://master:9000/spark/localhost_access_log.2015-04-24.txt")
过滤不正确的数据并且将日期合并:
rdd.map(_.split(" ")).filter(_.length == 10).map(x => x(0)+" "+x(3)+x(4)+" "+x(5)+" "+x(6)+" "+x(7)+" "+x(8)).saveAsTextFile("hdfs://Master:9000/spark/output")
此时数据格式变为:
读取初步处理后的数据:
val rdd1= sc.textFile("hdfs://master:9000/spark/output/part-00000")
统计状态码404错误的日志:
rdd1.map(_.split(" ")).filter(_(5).toInt == 404).count
设置日期解析器格式:
val format = new java.text.SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ssZ]")
解析日志时间:
val rdd3 = rdd1.map(_.split(" ")).map(r => (format.parse(r(1)),r(0),r(2),r(3),r(4),r(5))).collect
Spark shell上执行collect的结果:
日志最后格式如下:
时间戳变为: