本文主要是用SparkMLlib中的一些算法来对PM进行预测,其中涉及了dataframe中对列使用自定义函数的操作,很方便!!
任务:读取pm.csv,将含有缺失值的行扔掉(或用均值填充)将数据集分为两部分,0.8比例作为训练集,0.2比例作为测试集
(1)使用month,day,hour,DEWP,TEMP,PRES,cbwd,Iws,Is,Ir作为特征列(除去No,year,pm),pm作为label列,使用训练集、随机森林算法进行回归建模,使用回归模型对测试集进行预测,并评估。
1) 将数据读取到dataframe中然后将标签列和特征列选出,同时去除数值为空的数据
val conf = new SparkConf().setAppName("input").setMaster("local[*]")
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)
val url = "C:\\Users\\shuangmm\\Desktop\\pm.csv"
val dataDF =sqlContext.read.format("csv")
.option("header","true")
.option("inferSchema",true.toString)
//这是自动推断属性列的数据类型。
.load(url).na.drop()
2) 选取特征列,使用了VectorAssembler函数,将数据转换为向量,方便输进模型,但是有一列是字符格式的,需要进行处理
使用StringIndexer的方法将字符串改为索引的形式。
val index = new StringIndexer()
.setInputCol("cbwd")
.setOutputCol("cbwd_in")
val feat = new VectorAssembler()
.setInputCols(Array("month","day","hour","DEWP","TEMP","PRES","cbwd_in","Iws","Is","Ir"))
.setOutputCol("features")
3) 创建随机森林回归模型
4) 创建pineline
5) 划分数据集,8:2进行划分
6) 进行预测及模型的评估
val rf = new RandomForestClassifier()
.setLabelCol("levelnum")
.setFeaturesCol("features")
//.setNumTrees(10)
.setMaxDepth(20)//20棵树 0.3189
// Chain indexer and forest in a Pipeline
val pipeline = new Pipeline()
.setStages(Array(index,feat,rf))
val Array(trainingData, testData) = features.randomSplit(Array(0.8, 0.2))
// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
val result = predictions.select("levelnum","prediction", "features","levelstr")//.show(100)
result.show(100)
val pre = result.withColumn("predtionStr",levelStr(result("prediction")))
pre.show(100)
// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("levelnum")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))
运行结果
(2)按照下面标准处理pm列,数字结果放进(levelNum)列,字符串结果放进(levelStr)列优(0) 50 良(1)50~100 轻度污染(2) 100~150 中度污染(3) 150~200 重度污染(4) 200~300 严重污染(5) 大于300及以上
划重点了
本题的需求是根据pm列的值来重新生成两个列,开始想着用rdd来处理这个列,就需要用dataframe和rdd之间来回转换,想起之前处理篮球信息的时候有一个z-socre的处理,当时有一个方法是selectexpr的方法比如直接列名加1,就会自动生成一列列名加一的数据。通过查资料看到dataframe中可以对列使用自定义函数。下面开始变魔法
- 首先自定义两个函数,转换成相应的数值和对应的字符串
Pm列的类型是double类型
//将值和等级进行转换
def testNum(x : Double) : Double = {
var leveNum = 0
if(x < 50)
leveNum = 0
if(x >= 50 & x <= 100)
leveNum = 1
if(x > 100 & x < 150)
leveNum = 2
if(x >= 150 & x <= 200)
leveNum = 3
if(x > 200 & x < 300)
leveNum = 4
if(x >= 300)
leveNum = 5
leveNum
}
//自定义函数,将字符串进行对应
def testStr(x : Double) : String = {
var leveStr = x match {
case 0 => "优"
case 1 => "良"
case 2 => "轻度污染"
case 3 => "中度污染"
case 4 => "重度污染"
case 5 => "严重污染"
}
leveStr
}
!!!!注册自定义函数,这部分非常关键,给自定义函数起了一个新的名字,
方便之后的调用。必不可少的一步!!
import org.apache.spark.sql.functions.{udf}
val levelNum = udf(testNum _)
val levelStr = udf(testStr _)
- 读取数据存到dataframe中的过程和上面相同。
使用自定义函数,withColumn的方法第一个参数是函数执行结束之后的新列名。
val level = dataDF.withColumn("levelnum",levelNum(dataDF("pm")))//.show(10)
val features = level.withColumn("levelstr",levelStr(level("levelnum")))//.show(10)
可以看到执行后的结果,将值和标准全部转换过来了
(3)使用month,day,hour,DEWP,TEMP,PRES,cbwd,Iws,Is,Ir作为特征列(除去No,year,pm),levelNum作为label列,使用训练集、随机森林算法进行分类建模。使用分类模型对测试集进行预测对预测结果df进行处理,基于prediction列生成predictionStr(0-5转换优-严重污染),对结果进行评估
思路:和随机森林的回归模型算法类似,不同的是
需要对预测出来的结果进行列的转化,用到之前的自定义函数。
数据处理的过程都相同,进行模型的建立,之前没有设置参数,跑出来的结果不太好,然后设置了树的深度,在设置之后程序运行时间大大增加,不过准确率也增加了,之前准确率只有50%左右,现在能到70%多
Pineline创建
模型预测
数据处理
val rf = new RandomForestClassifier()
.setLabelCol("levelnum")
.setFeaturesCol("features")
//.setNumTrees(10)
.setMaxDepth(20)//20棵树 0.3189
// Chain indexer and forest in a Pipeline
val pipeline = new Pipeline()
.setStages(Array(index,feat,rf))
val Array(trainingData, testData) = features.randomSplit(Array(0.8, 0.2))
// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
val result = predictions.select("levelnum","prediction", "features","levelstr")//.show(100)
result.show(100)
val pre = result.withColumn("predtionStr",levelStr(result("prediction")))
pre.show(100)
结果
主要是自定义函数的使用,,注册函数的那部分一定要好好看,能省非常多的事。
源码看这里
如果有需要数据集当做练习的可以私信我,发给你数据集。