其实这个是上篇文章的升级版。
先上demo代码吧,其中要改的地方还挺多的,此外,如果不将模型持久化的话,烦请自行修改相关代码(demo比较简单,我就不阐释他是干什么的了):
from pyspark.ml.feature import Word2Vec from pyspark.sql import SparkSession from pyspark import SparkConf from pyspark.ml import Pipeline, PipelineModel import os if __name__ == "__main__": # 环境变量 # 值为linux系统中master、worker的启动所需版本的python3命令 os.environ['PYSPARK_PYTHON'] = 'python3' # 值为linux系统中master、worker的启动所需版本的python命令 os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3' # 值为本地windows系统中spark目录 os.environ['SPARK_HOME'] = 'D:/PyWS3/spark-2.2.1' # 值为本机IP,与建立连接时所需要的IP,防止多张网卡时导致连接失败 os.environ['SPARK_LOCAL_IP'] = '192.168.xxx.xxx' os.environ['HADOOP_HOME'] = 'D:/PyWS3/spark-2.2.1/hadoop' # 工作流模型保存地址,建议存在hdfs或者linux文件系统中,windows系统下我个人存储下来是有问题的 # windows系统下生成的模型metadata中不存在part文件,导致load时会失败,待解决 model_path = 'hdfs://ip:9000/data/word2vecmodel' sparkconf = SparkConf() sparkconf.setAppName("word2vec").setMaster("spark://ip:7077").set("spark.submit.deployMode", "client").set('spark.driver.memory', '2g').set( 'spark.executor.memory', '2g').set('spark.executor.cores', 1).set( 'spark.network.timeout', 600).set('spark.executor.heartbeatInterval', 120).set('spark.cores.max', 4) spark = SparkSession.builder.config(conf=sparkconf).getOrCreate() documentDF = spark.createDataFrame([ ("Hi I heard about Spark".split(" "),), ("I wish Java could use case classes".split(" "),), ("Logistic regression models are neat".split(" "),) ], ["text"]) word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result") pipeline = Pipeline(stages=[word2Vec]) model = pipeline.fit(documentDF) model.write().overwrite().save(model_path) temp_model = PipelineModel.load(model_path) result = temp_model.transform(documentDF) for row in result.collect(): text, vector = row print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector))) spark.stop()
接着是需要下载的包与需要修改的内容:
需下载文件列表
1. 获取linux系统spark安装包,两者必需一致。下载地址:http://spark.apache.org/downloads.html
2. 不是必须。获取与hadoop版本对应的winutils.exe和hadoop.dll。下载地址:https://github.com/steveloughran/winutils
配置内容
1. python解释器版本需要与远程环境中一致!
- pycharm中设置
- 系统环境变量中设置,保证cmd中使用python命令或者其他启动python的命令时版本号与spark结点上python版本一致
2. 解释器相关变量
- pycharm中设置,路径根据spark安装包解压出来的目录来
- 系统环境变量中设置,路径根据spark安装包解压出来的目录来,没有PYTHONPATH就新建
3. 修改代码中相关环境变量设置的值,如何修改参看代码中注释
额外内容
出现此错误(其实这个错误对程序的运行毫无影响):
ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
将hadoop.dll放入C:\Windows\System32目录下,在任意英文路径目录下创建bin文件夹并将winutils.exe放入。比如D:\PyWS3\spark-2.2.1\hadoop\bin\winutils.exe。最后,代码中加入
os.environ['HADOOP_HOME'] = 'D:/PyWS3/spark-2.2.1/hadoop'