最近在工作之余参加了一个CTR预估的比赛,CTR预估是一个成熟又经典的问题,工业界目前使用的主流方案仍然是LR+海量特征。趁着这一次比赛的机会,正好抱着学习的心态尝试着学习用spark集群来训练一下LR。
在学校的时候大家训练模型一般都是用python+pandas+numpy+sklearn,这一套工具在单机的环境下非常的简单易学,但是面对海量数据或者高维稀疏矩阵的计算,就显得无能为力。
相比之下,spark作为分布式计算框架,用户操作起来的感觉更多是,虽然笨重,但是算得快啊。
spark提供了两套机器学习的库,mllib和ml。前者主要适用于RDD的处理,而后者主要适用于dataframe的处理。
目前spark的用户中基于spark.dataframe已经成为了主流,mllib这个库也不再维护,转向更新ml这个库。
spark上支持cpp、java、python和scala,其中scala是spark的原生语言,本文就以scala为例,训练了一个非常简单的LR模型。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.Vectors
import scala.collection.mutable.ListBuffer
object newtrainLR {
val spark = SparkSession.builder().appName("LR-Predict").getOrCreate()
//数据读入
val trainPath = "../train_with_hour.csv"
val testPath = "../test_with_hour.csv"
val trainDF = spark.read.format("csv").option("header", "true").load(trainPath)
val testDF = spark.read.format("csv").option("header", "true").load(testPath)
val newTrainDF = trainDF.drop("_c0", "Unnamed: 0", "time", "city", "app_paid").withColumn("flag", lit(1))
val newTestDF = testDF.drop("_c0", "Unnamed: 0", "time", "city").
withColumn("click", lit(3)).
withColumn("flag", lit(2))
//合并train、test,一起做one-hot编码
val allDF = newTrainDF.union(newTestDF)
//获取列名array
val colNameDF = allDF.drop("flag", "click")
// 要进行OneHotEncoder编码的字段
val categoricalColumns = colNameDF.columns
//采用Pileline方式处理机器学习流程
val stagesArray = new ListBuffer[PipelineStage]()
for (cate <- categoricalColumns) {
//使用StringIndexer 建立类别索引
val indexer = new StringIndexer().setInputCol(cate).setOutputCol(s"${cate}Index")
// 使用OneHotEncoder将分类变量转换为二进制稀疏向量
val encoder = new OneHotEncoder().setInputCol(indexer.getOutputCol).setOutputCol(s"${cate}classVec")
stagesArray.append(indexer, encoder)
}
val assemblerInputs = categoricalColumns.map(_ + "classVec")
// 使用VectorAssembler将所有特征转换为一个向量
val assembler = new VectorAssembler().setInputCols(assemblerInputs).setOutputCol("features")
//使用pipeline批处理
val pipeline = new Pipeline()
pipeline.setStages(stagesArray.toArray)
val pipelineModel = pipeline.fit(allDF)
val dataset = pipelineModel.transform(allDF)
val newDF = dataset.select("click", "features", "flag")
//拆分train、test
val processedTrain = newDF.filter(col("flag") === 1).drop("flag")
val processedTest = newDF.filter(col("flag") === 2).drop("click", "flag")
//处理label列
val indexer2Click = new StringIndexer().setInputCol("click").setOutputCol("ctr")
val finalTrainDF = indexer2Click.fit(processedTrain).transform(processedTrain).drop("click")
//随机分割测试集和训练集数据
val Array(trainingDF, testDF) = finalTrainDF.randomSplit(Array(0.7, 0.3), seed = 1)
println(s"trainingDF size=${trainingDF.count()},testDF size=${testDF.count()}")
val lrModel = new LogisticRegression().
setLabelCol("ctr").
setFeaturesCol("features").
setMaxIter(10000).
setThreshold(0.5).
setRegParam(0.15).
fit(trainingDF)
val predictions = lrModel.transform(testDF).select($"ctr".as("label"), "features", “rawPrediction", "probability", "prediction")
//使用BinaryClassificationEvaluator来评价我们的模型
val evaluator = new BinaryClassificationEvaluator()
evaluator.setMetricName("areaUnderROC")
val auc = evaluator.evaluate(predictions)
val newprediction = lrModel.transform(processedTest).select("probability")
//取出预测为1的probability
val reseult2 = newprediction.map(line => {
val dense = line.get(line.fieldIndex("probability")).asInstanceOf[org.apache.spark.ml.linalg.DenseVector]
val y = dense(1).toString
(y)
}).toDF("pro2ture")
reseult2.repartition(1).write.text(“../firstLrResultStr")