文章目录
前言
spark集群提交的时候,各个worker需要将时序模型ARIMA保存,worker是不能保存到本地的,所以只能保存到集群。
但是这个模型包不是mllib的,本身并没有类似于save直接保存到hdfs的方法。
所以需要通过Python和hdfs交互的方式来保存。
思路
既然是Python和hdfs连接,肯定需要依赖包,或许有人会说集群上直接pip下载不就好了吗,但是我这的集群是人家的,不能下载,破坏人家的环境,所以只能通过submit的方式将源码包提交到集群。
首先需要去官网下载源码包,比如hdfs
,pydoop
,pyhdfs
都可以实现Python和hdfs的交互,这里以hdfs
包为例,其它两种方法套路一样。
官网如下:
https://pypi.org/project/hdfs/#files
官网上下载的.tar.gz解压后,我们需要把hdfs单独再zip一下,只用这个zip即可
大家可以直接下载我压缩过的,其中.tar.gz是官网下载的,.zip是要submit提交集群的
链接: https://pan.baidu.com/s/1gY4PeXEOg6UCZf2pmPyIEg 提取码: yitr 复制这段内容后打开百度网盘手机App,操作更方便哦
模型上传到hdfs
import pickle
from hdfs import Client
# model是你要保存的模型
# Client第一个参数是hdfs地址,第二个参数是你的hdfs路径
# write的地方是你模型要保存的hdfs位置
hdfsConn = Client('http://111.111:111', root='/user/111', timeout=1000, session=False)
with hdfsConn.write('/user/111/arima.model') as f:
pickle.dump(model, f)
读取hdfs上保存的模型
hdfsConn = Client('http://111.111:111', root='/user/111', timeout=1000, session=False)
with hdfsConn.read('/user/111/arima.model') as f:
model = pickle.loads(f.read())
# 然后就可以预测了
这里hdfsConn是一个http请求流,无论是pickle.load
还是joblib.load()
,都是先传一个地址,而我们读取hdfs这里要把整个文件都传过来,所以需要loads
提交spark集群
注:集群上如果可以直接pip依赖的包,直接pip即可。我这里因为集群是人家的所以不能破坏环境,只能通过submit的方式将源码包提交到集群。
PYSPARK_PYTHON=/data/anaconda3/bin/python3 \
/opt/spark/bin/spark-submit \
--master yarn \
--deploy-mode client \
--driver-memory 4g \
--executor-memory 4g \
--driver-cores 4 \
--executor-cores 4 \
--py-files /data/suanfa/ecpm_predict/resources/hdfs.zip,/data/suanfa/ecpm_predict/resources/eCPM_predict_to.zip\
predict_data.py >>./log/predict_log 2>>./log/predict_err
如上,需要将刚刚的zip提交到集群。