from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.regression import LinearRegressionWithSGD
import numpy as np
import pandas as pd
from sklearn.externals import joblib
import warnings
warnings.filterwarnings("ignore")
# 加载
clf = joblib.load('/home/ouguangji/PycharmProjects/test1/qgfenxi/model/train_model_result.m')
p_data = pd.read_csv('/home/ouguangji/PycharmProjects/test1/qgfenxi/model/data1.csv')
x = p_data.drop(['y','x1'],axis=1).values
y = p_data[['y']].values
pp_data = []
for i in range(len(x)):
pp_data.append(LabeledPoint(*(y[i]), list(x[i])))
conf = SparkConf().setAppName('spark-streaming-gbdt').setMaster('local[8]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream('file:///home/ouguangji/桌面/1')
counts = lines.flatMap(lambda line: line.split("\n")).map(lambda line: line.split(","))
counts = counts.map(lambda line: clf.predict(np.asarray(list(map(float, list(line)))).reshape(1,19)))
# flatMap(lambda line:line.split('\n'))
counts.pprint()
ssc.start()
ssc.awaitTermination()
# a = [0.783,0.22,0.017,3,1,2.02,0,0,0,0,0,0,0,0,0,0,2,5,5]
# print(*(clf.predict(np.array(a).reshape(1,19))))
把训练好的GBDT模型放到spark streaming上运行,并可以实时的处理流过来的数据
猜你喜欢
转载自blog.csdn.net/qq_41617848/article/details/103248334
今日推荐
周排行