背景:前一段时间有任务,需要使用xgboost4j重构公司里面的预测模型代码。然后就是各种开工,搞了一段时间后把自己遇到的问题回顾下。
结论:
1.xgboost4j针对不同的环境差异很大。有linux,mac和windows。因为涉及到编译源码,所以困难重重。
最简单的用法莫过于maven方式导入xgboost4j,但是这种方法只能在linux开发环境。如果用windows决定有问题;
2.xgboost4j和spark版本依赖性很大。
目前最新的xgboost是0.81版本,常用的还有0.8和0.72,但是这些比较常用的版本最少需要spark2.3及以上,就因为这个问题,在测试集群上各种报错。
xgboost4j 0.72,0.8,0.81 spark 2.3及以上
xgboost4j 0.72 spark2.0及以上
3.放几个在本地可以完美运行的例子
代码1:
import java.io.{File, FileInputStream, InputStream}
import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost}
/**
* 这个版本是scala版本
* 可运行的例子
* 使用数据集 agaricus.txt.train和agaricus.txt.test LabeledPoint格式
*/
object myPredictTest {
def main(args: Array[String]): Unit = {
val input_path = args(0)
val path = input_path + "/data/agaricus.txt.train"
val testpath = input_path + "/data/agaricus.txt.test"
val ins: InputStream = new FileInputStream(new File(path))
val dpath = new File(path)
val dpathTest = new File(testpath)
val trainDatas = new DMatrix(dpath.getPath)
val testDatas = new DMatrix(dpathTest.getPath)
val paramMap = List(
"eta" -> 0.6,
"max_depth" -> 3,
"objective" -> "binary:logistic"
).toMap
val round = 2
val model = XGBoost.train(trainDatas, paramMap, round)
val predictTrain = model.predict(testDatas)
val res = predictTrain.zip(testDatas.getLabel).map(
key => {
if (key._1(0) >= 0.5) {
//println(1.0,key._2)
key._2.toDouble - 1.0
} else {
key._2.toDouble - 0.0
}
}
)
val flsize = res.count(key => key.toInt == 0)
println(flsize)
val pre = flsize / 1611.toDouble
println("auc " + pre)
}
}
代码2:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import ml.dmlc.xgboost4j.scala.spark.XGBoost
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.mllib.regression
import org.apache.spark.rdd.RDD
object myTest {
/**
* 使用rdd接口方式运行xgboost
* 使用数据集 agaricus.txt.train和agaricus.txt.test LabeledPoint格式
*/
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val spark = SparkSession.builder
.master("local[*]")
.appName("example")
.config("spark.sql.shuffle.partitions", "20").getOrCreate()
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val input_path = "/Users/01376233/IdeaProjects/myxgboost/src/main/data"
val trainString = input_path+"/agaricus.txt.train"
val testString = input_path+"/agaricus.txt.test"
val train:RDD[regression.LabeledPoint] = MLUtils.loadLibSVMFile(spark.sparkContext, trainString)
val test = MLUtils.loadLibSVMFile(spark.sparkContext, testString)
//train.foreachPartition()
println("*****************"+train.getClass.getSimpleName)
println("*****************"+trainString.getClass.getSimpleName)
val traindata = train.map { x =>
val f = x.features.toArray
val v = x.label
LabeledPoint(v, Vectors.dense(f))
}
val testdata = test.map { x =>
val f = x.features.toArray
val v = x.label
Vectors.dense(f)
}
print(traindata.take(10))
val numRound = 15
//"objective" -> "reg:linear", //定义学习任务及相应的学习目标
//"eval_metric" -> "rmse", //校验数据所需要的评价指标 用于做回归
val paramMap = List(
"eta" -> 1f,
"max_depth" ->5, //数的最大深度。缺省值为6 ,取值范围为:[1,∞]
"silent" -> 1, //取0时表示打印出运行时信息,取1时表示以缄默方式运行,不打印运行时信息。缺省值为0
"objective" -> "binary:logistic", //定义学习任务及相应的学习目标
"lambda"->2.5,
"nthread" -> 1 //XGBoost运行时的线程数。缺省值toMap是当前系统可以获得的最大线程数
).toMap
println(paramMap)
val model = XGBoost.trainWithRDD(traindata, paramMap, numRound, 1, null, null, useExternalMemory = false, Float.NaN)
print("sucess")
val result=model.predict(testdata)
print(result.count())
println("*****************"+result.getClass.getSimpleName)
result.take(10).foreach(println(_))
spark.stop();
}
}
代码3:
import ml.dmlc.xgboost4j.scala.spark.XGBoost
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.{Row, SparkSession}
object myTest2 {
/**
* 使用DataFrame接口方式运行xgboost
* 使用数据集 agaricus.txt.train和agaricus.txt.test LabeledPoint格式
* @param args
*/
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val spark = SparkSession.builder.master("local").appName("example").
config("spark.sql.shuffle.partitions", "20").getOrCreate()
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val input_path = "/Users/01376233/IdeaProjects/myxgboost/src/main/data"
val trainString = input_path+"/agaricus.txt.train"
val testString = input_path+"/agaricus.txt.test"
val train = spark.read.format("libsvm").load(trainString).toDF("label", "feature")
val test = spark.read.format("libsvm").load(testString).toDF("label", "feature")
val numRound = 15
//"objective" -> "reg:linear", //定义学习任务及相应的学习目标
//"eval_metric" -> "rmse", //校验数据所需要的评价指标 用于做回归
val paramMap = List(
"eta" -> 1f,
"max_depth" -> 5, //数的最大深度。缺省值为6 ,取值范围为:[1,∞]
"silent" -> 1, //取0时表示打印出运行时信息,取1时表示以缄默方式运行,不打印运行时信息。缺省值为0
"objective" -> "binary:logistic", //定义学习任务及相应的学习目标
"lambda" -> 2.5,
"nthread" -> 1 //XGBoost运行时的线程数。缺省值是当前系统可以获得的最大线程数
).toMap
val model = XGBoost.trainWithDataFrame(train, paramMap, numRound, 1, obj = null, eval = null, useExternalMemory = false, Float.NaN, "feature", "label")
val predict = model.transform(test)
val scoreAndLabels = predict.select(model.getPredictionCol, model.getLabelCol)
.rdd.map { case Row(score: Double, label: Double) => (score, label) }
//get the auc
val metric = new BinaryClassificationMetrics(scoreAndLabels)
val auc = metric.areaUnderROC()
println("auc:" + auc)
}
}