目前打算先学习SQLContent, 因为Hive环境还没搭好, 一步一步来 先把spark的原理弄明白后再去研究hadoop的组件。
这篇文章主要是讲如何使用SQLContext去读取csv文件, 然后根据表头注册表, 进行数据分析
要通过SQLContext去操作csv文件, 那么我们需要用到spark-csv_xxx.jar
sbt地址为:
// https://mvnrepository.com/artifact/com.databricks/spark-csv_2.10
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.4.0"
maven地址:
<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv_2.10 -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
引入jar包后, 我们创建一个scala object, 在里面创建sparkconfig, sparkcontext, 以及SQLContext
val sparkconfig = new SparkConf().setAppName("SQLTest").setMaster("local[2]") val sc = new SparkContext(sparkconfig) val sqlContext = new SQLContext(sc)
在天池里面参加一个项目后拿到了一个一份数据, 这个方法里面主要就是分析这份数据。 数据主要结构为: (2.csv)
re_monitor_id re_index value release_time exceed_flag
主要是一个排污检测数据, 每一个小时检测一次, 有很多个站点, 记录不一样项目的监测值, 如果这个项目超标了, 那么 exceed_flag就变成t否则是f
re_monitor_id是检测点id
re_index 是检测项目
value是检测值
release_time是检测时间
还有一个csv文件 存储所有的检测项目, 我们只用到一个表头 (3.csv)
index
检测项目
首先第一步, 把两个文件中有用的表头注册一下, 注册到临时表:
val companyEmission = sqlContext.csvFile("C:/cache/2.csv").select("re_monitor_id", "re_index", "re_monitor_info_id", "value", "release_time", "exceed_flag").toDF val typeEmission = sqlContext.csvFile("C:/cache/3.csv").select("monitor_id", "index").toDF companyEmission.registerTempTable("cp"); typeEmission.registerTempTable("type")
然后把检测类型取出来作为一个数组 (由于项目不多, 不用担心OOM)
var monitorType = sqlContext.sql("select distinct(index) from type").toDF val typeArray = monitorType.collect
其次再将检测数据拿过来, 存成(re_index, re_monitor_id_release_time) 的(K,V)格式 --- 只取exceed_flag为t的row
val overData = sqlContext.sql("select re_monitor_id, re_index, release_time from cp where exceed_flag =" + '"' + "t" + '"').map(f => (f.get(1).toString, f.get(0).toString + "_" + f.get(2)))
遍历检测类型, 目的是获取每个检测类型根据每个小时来看, 一个月内在同一小时内超标最多的检测点
typeArray.foreach(x => { var typeD = x.get(0).toString val sepData = overData.filter(y => y._1.contains(typeD)).map(z => { // println(z._2.split(" ").apply(1)) val date = sp.parse(z._2.split("_").apply(1).toString().substring(0, 19)) val hour = date.getHours() (hour, z._2.split("_").apply(0).toString()) }).groupByKey var sortedData = sepData.sortBy(_._2.size, false) if (sortedData.count > 0) { println("****************************************************") println("result for " + typeD + "is") println("Hour: " + sortedData.first._1 + "\r\n" + "Times: " + sortedData.first._2.size) println("****************************************************") } })
这里只取出了各种类型的污染, 哪个小时超排最多。 其实我们可以根据数据, 获取到哪些点对哪些污染数据超标最多, 然后还可以知道是不是每次在晚上特定时间断排放的特别多。 这样分析出来的数据 就比较有用了. 数据请看附件