先闲扯下pyspark环境的问题;
前段时间在帮助算法组的同学使用spark跑一些模型,因为那边的同学没有使用过spark,且不会scala和java,而他们的诉求是使用python跑一些spark的任务;所以我这边就协助配置了一下python on spark的环境,这个环境配置起来还是挺费劲的;python的环境没有使用conda[为啥没用?个人习惯;但需要注意的是,如果使用conda的话python的版本必须低于3.8],我使用的是自己编译的python-3.6,而且每台hadoop每台机器都需要装;我是在一台机器把python需要用到的包都安装了一遍,然后分发到了各个机器,这里需要注意的是每台机器的路径必须一致;问题在于如果需要安装新的包,就得从新分发一遍,好在我们拥有所有机器的权限(⊙o⊙)…;为什么要这么做呢?在使用pyspark的时候,不可避免会使用python第三方的工具包,而每个任务的执行是依赖本地配置的python环境的,所以我们尽可能做到每个yarn的任务节点都配置一样的python环境,把可能用到的工具包都装上「这样的话就可以愉快的使用jieba分布式切词了~~~~」
回到正题:需求是获得每篇文章最相似的20篇文章,这里我贴出部分比较重要的代码段
第一步:
pyspark使用jieba对文章进行分词,这一步特别重要(我们过滤掉各种乱七八糟的数据后,大概有40w+个词),“fc()”这个方法是文章内容切词的入口
import jieba
import re
from pyspark import SparkFiles
from pyspark.sql import functions
#将本地的字典上传到每个worker节点
sc.addFile("/data/sparkml/word.dic")
jieba.load_userdict(SparkFiles.get("word.dic"))
#加载停用词
sc.addFile("/data/sparkml/stopword.dic")
stop = {}
for line in open(SparkFiles.get("stopword.dic")).readlines():
line = line.replace('\n','')
stop[line]=1
#解析xml数据
def parse_xml(content):
content = re.sub(r'</?\w+[^>]*>','',content)
content = content.replace(u'\ufeff', u' ').replace(u'\u3000', u' ')
return content
#过滤url
def filter_url(text):
re_tag = re.compile('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
new_text = re.sub(re_tag, '', text)
return new_text
#过滤标点符号,表情符号
def filter_symbol(word,restr=''):
word = filter_url(word)
# r = '[’!"#$%&\'~()~*+,-./:;<=>?@[\\]^_`{|}~ ̄ゞ。❤…★☎]+'
word = word.strip()
word = parse_xml(word)
co = re.compile("[^\u4e00-\u9fa5^a-z^A-Z^0-9]")
return co.sub(restr, word)
def fc(x):
x = filter_symbol(x)
result_tmp = jieba.cut(x, cut_all=False)
result = []
for k in result_tmp:
k = filter_symbol(k)
if stop.get(k)!=1 and len(k.lstrip())>0:
result.append(k)
return ",".join(result)
第二步:
使用spark word2vec对分词后的结果进行训练,然后转换成文档向量;spark会将文档所有词的的向量加和,然后求平均得到文档向量
from pyspark.ml.feature import Word2Vec
from pyspark.ml.linalg import Vectors
word2Vec = Word2Vec(vectorSize=300, minCount=5, inputCol="tokens", outputCol="result",numPartitions=20)
model = word2Vec.fit(doc)
def _array_to_vector(row):
return row.aid,Vectors.dense(row.result)
train_df = model.transform(doc).select("aid","result")
#获得文档向量
train = train_df.rdd.map(_array_to_vector).toDF(['aid','result'])
第3步:
使用LSH对文档向量集合进行分桶,然后得出文章距离小于0.1(这个值需要根据结果调整)的文章对,然后再取出每篇文章最相似的20篇
#计算文章之间的相似度
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.sql.functions import col
from pyspark import TaskContext
brp = BucketedRandomProjectionLSH(inputCol="result", outputCol="hashes", bucketLength=10.0,
numHashTables=3)
lsh_model = brp.fit(train)
tsDf = lsh_model.transform(train)
tsDf = tsDf.rdd.toDF(['aid','result','hashes'])
#计算文章距离小于0.1的文章对
brpDf = lsh_model.approxSimilarityJoin(tsDf, tsDf, 0.1, "EuclideanDistance")
如果这样就解决了就太完美了,往往理想美满,现实很骨感;在数据量不是特别大的情况下,这种方式确实跑得通,数据量到一定规模后,基本上跑不出来;而且也会经常出现数据倾斜的问题,而这个数据倾斜发生在模型层面,还没有办法调,LSH可调的参数也特别少;那怎么解决呢?解决的思路有两种:
1.如果文章原本就有分类,例如:java类别文章,python类别文章;那我们就没有必要使用全量的文章进行相似度计算,完全可以按照分类来计算相似度,这样通过减少数据输入量,将一次模型计算分解成多次模型计算,这样是可以计算出结果的
2.如果文章本身就没有很好的分类,或者有分类,但是有的分类的数据量非常多,也没办法跑出来,那么我们可以将数据进行一个聚类,然后再对每一个类簇中的数据进行相似度计算
以上方案都是在使用spark方案解决,其实可以考虑下使用gensim这个nlp框架来解决这个问题,而且相对spark,简单得多,具体大家可以了解下
Doc2Vec(x_train, min_count=1, window=3, vector_size=size, sample=1e-3, negative=5, workers=4)
说了这么多,我来说下我的最终方案:
1.使用spark word2vec模型获得文章向量,在数据处理方面spark还有有一定的优势,并且像word2vec这种模型,各种工具训练的结果都差不多
2.将文本向量导出成“KeyedVectors.load_word2vec_format”可以解析的文件,看到这里感觉有点懵逼,为啥不直接使用doc2vec;稍微解释下,使用doc2vec时,还需要将文档封装成
document = TaggededDocument(word_list, tags=12)
我们文章都是有自己的id,而在doc2vec中的tags只接受数值型id,这意味着我们还需要维护文章id和tags的关系,还挺麻烦的,所以联想自己之前使用gensim训练word2vec模型的一些经验,我们只需要将文档向量转换成gensim word2vec能够识别的模型文件,我们就可以使用word2vec跑文章相似度;模型文件的格式也特别简单
第一行有两列“文档数 向量维度”两列之间用空格分隔,从第二行起,第一列是文档id,之后的就是文档向量,文档向量之间用空格分隔,文档id和文档向量之间也是用空格分隔,就像模型文件“doc.vector”
130091 10
0000l22050025dfk 0.044057309906929736 0.05764063510578126 -0.018084216536954047 -0.004106918163597584 -0.0017703562043607235 -0.06230773141141981 -0.031140189757570626 -0.04945192651357502 -0.050057798926718536 -0.014574580336920918
…………
…………
使用gensim 的word2vec模型跑相似度,完整代码
# -*- coding: UTF-8 -*-
from gensim.models import KeyedVectors
model = KeyedVectors.load_word2vec_format("/data/sparkml/doc.vector")
words = model.wv.vocab.keys()
fo = open("/data/sparkml/doc.vector.sim", "w")
for word in model.wv.vocab.keys():
kvs = model.most_similar(word, topn=10)
for kv in kvs:
fo.write(",".join([word,kv[0],str(kv[1])])+"\n")
fo.close()
结果
0000l22050025dfk,0001zrx05004g73m,0.9999655485153198
0000l22050025dfk,0001vph05004k2yg,0.9999655485153198
0000l22050025dfk,0000oo205002dh40,0.9999655485153198
0000l22050025dfk,0001kqw050013hnq,0.9999655485153198
0000l22050025dfk,0000zea05002nyst,0.9999655485153198
0000l22050025dfk,0000onp05002dh40,0.9999655485153198
0000l22050025dfk,0001kpt050013hnq,0.999965488910675
0000l22050025dfk,0001wn805004fj0w,0.9978729486465454
0000l22050025dfk,00018bq05003a2on,0.9974880218505859
0000l22050025dfk,0000sit05000wror,0.9837393164634705
…………
…………
算文章相似度的工具特别多,这种方式有点另类,如果是由于我gensim用的姿势不对的地方,大家及时指出