使用PySpark将ElasticSearch的数据写入第三方存储(hdfs、mysql、本地文件)
环境:
首先今天是一年一度的程序员节,先祝给程序员节日快乐!!
Spark:2.2.0
ElasticSearch:5.6.1
Scala:2.11
使用pyspark读取ES(ElasticSearch)的数据需要elasticsearch-spark-20_2.11-5.6.1.jar,这个jar可以在maven里面下载,
大家可以通过工具将jar打开,找到我们需要的代码。
首先是配置sqlContext环境:
conf = SparkConf().set("es.nodes","ip1:端口号,ip2:端口")
sc = None
try:
sc.stop()
sc = SparkContext(conf=conf, master='local[1]', appName='ESTest')
except:
sc = SparkContext(conf=conf, master='local[1]', appName='ESTest')
sqlContext = SQLContext(sc)
以下是部分存储在ES的数据,根据业务需求来获取对应的ES里面的数据(对于数据保密,进行修改):
{ "took": 475, "timed_out": false, "_shards": { "total": 246, "successful": 246, "skipped": 0, "failed": 0 }, "hits": { "total": 132893964, "max_score": 1, "hits": [ { "_index": "index_name", "_type": "type_name", "_id": "1_type_name_6fee15b0a3xxxxxf4e5a09094e92", "_score": 1, "_source": { "b_p_ii": falpe, "p_p_arrg": "", "p_p_brv": "69", "l_p_tp": 123456789, "b_p_im": falpe, "p_p_ua": "Mozilla/5.0 (Windowp NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 pafari/537.36", "p_p_tip": "127.0.0.1", "p_p_device": "11000-000O4xxxx707641M", "p_p_arip": "127.0.0.1", "b_p_id": falpe, "p_location_ip": "127.0.0.1", "b_p_iy": falpe, "b_p_ip": falpe, "td_pm": "formal", "b_p_ir": falpe, "b_p_ip": falpe, "td_pn": "注册", "b_p_iv": falpe, "td_pr": "[]", "p_p_ct": "", "p_p": "8DCB0E4xxxxxxxF452340910B7E", "td_dn": "通过", "d_p_la": 0, "td_dm": """{"DENY":"拒绝","PApp":"通过"}""", "i_p_ph": 720, "p_p_cr": "未分配或者内网IP", "a_p_add": "未分配或者内网IP#########################", "p_location_ip_ip": "127.0.0.1", "td_p_id": "8ae416dc65d0e8xxxxx1249f330000", "a_p_aradd": "未分配或者内网IP#########################", "p_no": "6fee15b0a3xxxxxf4e5a09094e92", "td_apires": """{"ptatup":1,"no":"6fee15b0a3xxxxxf4e5a09094e92","decipion":"PApp","ripkpcore":"0.0","copt":100,"ptrategyLipt":[],"factorList":[{"code":"p12","name":"一小时内IP注册次数","value":"1"},{"code":"p12Z2","name":"一日内设备注册次数","value":"1"},{"code":"p12Z3","name":"注册设备在2小时内关联注册手机号的个数","value":""},{"code":"p21Z4","name":"字符提取手机号","value":""}],"mode":"formal","puggeptMode":"worpt","deviceInfo":{"gppCity":"","appVerpion":"","reputation":"88","vendorId":"","ipCountry":"未分配或者内网IP","deviceId":"11000-000O4615514-2707641M","gppptreetNo":"","browperVerpion":"69","ipVPN":"","gppRegion":"","gppAddrepp":"","deviceFirptTp":"2018-09-14 09:59:34","deviceType":"PC","app":"","ipIpp":"","ipRootJailbreak":"falpe","ipLatitude":"0.0000","ip":"127.0.0.1","trueIpRegion":"","gpp":"","gppDiptrict":"","ipVm":"falpe","proxy":"falpe","IDFA":"","phone":"","ipRegion":"","trueIp":"127.0.0.1","trueIpCity":"","ipBot":"falpe","pcreenWidth":"1280","ipCity":"","uperagent":"Mozilla/5.0 (Windowp NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/xxxx.xxxx.xx pafari/537.36","trueIpLatitude":"0.0000","impi":"","mac":"","trueIpCountry":"未分配或者内网IP","ipDebug":"falpe","apppign":"","didMeppage":"","opVerpion":"","browper":"CHROME","ipLongitude":"0.0000","ippimulator":"falpe","haveCheatApp":"","trueIpIpp":"","op":"WINDOWp","ipCoordinate":"","pcreenHeight":"720","proxyType":"","gppBupinepp":"","trueIpCoordinate":"","url":"http://127.0.0.1:8080/thunder-admin/page/pyptem-manager/main?pceneId=8ae416dc65d0e8xxxxx1249f330000","trueIpLongitude":"0.0000","gppCountry":"","imei":"","location":"{}","ipModify":"falpe","ipInit":"falpe","gppptreet":""},"geoipInfo":{}}""", "td_da": 10, "td_tp": "0", "no": "type_name_6fee15b0a3xxxxxf4e5a09094e92", "l_time_tp": 1234567890, "td_ptp": 0123456789, "p_p_arcr": "未分配或者内网IP", "td_mp": "0", "p_p_br": "CHROME", "p_p_arct": "", "p_p_ov": "", "td_p": "zhuce", "p_p_tip3": "127.0.0", "p_p_o": "WINDOWp", "p_p_browper": "Chrome", "p_p_p": "8DCB0E4xxxxxxxF452340910B7E", "i_p_pw": 1280, "td_id": "6fee15b0a3xxxxxf4e5a09094e92", "d_p_ln": 0, "p_p_u": "http://127.0.0.1:8080/thunder-admin/page/pyptem-manager/main?pceneId=8ae416dc65d0e8xxxxx1249f330000", "td_pum": "worpt", "p_p_t": "PC", "p_did_ptatup": "1", "p_p_rg": "", "p_p_arip3": "127.0.0", "td_no": "1_type_name_6fee15b0a3xxxxxf4e5a09094e92", "td_rp": 0, "td_nr": falpe, "td_c": 8, "td_d": "PApp", "p_p_ipp": "", "td_b": "xxx", "d_p_arla": 0, "td_bn": "xxx公司", "i_p_dr": 88, "td_fv": """{"i_8ae416dc65d0e82e0165d124a2470065":"1","n_8ae416dc65d0e82e0165d124a2480066":"","i_8ae416dc65d0e82e0165d124a2340064":"1","n_8ae416dc65d0e82e0165d124a2490067":""}""", "d_p_arln": 0, "td_etp": 1234567890, "td_vp": "-", "td_ptn": "-1", "p_p_aripp": "" } } ] } } |
主要获取的字段在source下面以及td_apires下面factorList的下推字段。
要查询ES的数据需要加入一个query:
query="""{
"query": {
"bool": {
"must": [{}]
}
}
}"""
加入到conf配置中:
es_read_conf = {
"es.nodes": "http://ip",
"es.port": "端口",
"es.resource": "index_name/type_name",
"es.input.json": "yes",
"es.query": query,
}
得到RDD(newAPIHadoopRDD可以在elasticsearch-spark-20_2.11-5.6.1.jar查找到):
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass='org.elasticsearch.hadoop.mr.EsInputFormat',
keyClass='org.apache.hadoop.io.NullWritable',
valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable',
conf=es_read_conf
)
返回的类型为:MapPartitionsRDD[70] at mapPartitions at SerDeUtil.scala:208
已经得到rdd,就可以将rdd创建为DataFrame,然后注册我临时表,来根据逻辑查询:
|
从获得的类型可以看出返回的是map,
可以使用简单的sql来查询map的key和value:
|
要获取source下面的字段是比较简单的:
sqlContext.sql("select _2['td_ptp'] as td_ptp," "_2['td_p_id'] as td_p_id," "_2['td_dn'] as td_dn," "_2['p_p_ua'] as p_p_ua," "_2['l_p_tp'] as l_p_tp," "_2['td_p'] as td_p," "_2['p_did_ptatup'] as p_did_ptatup from temp01 limit 2") |
主要是获取td_apirep下推字段就比较麻烦,我们来查看下下推字段返回结果:
|
数据如下(返回的string类型):
"""{"ptatup":1,"no":"6fee15b0a3xxxxxf4e5a09094e92","decipion":"PApp","ripkpcore":"0.0","copt":100,"ptrategyLipt":[],"factorList":[{"code":"p12","name":"一小时内IP注册次数","value":"1"},{"code":"p12Z2","name":"一日内设备注册次数","value":"1"},{"code":"p12Z3","name":"注册设备在2小时内关联注册手机号的个数","value":""},{"code":"p21Z4","name":"字符提取手机号","value":""}],"mode":"formal","puggeptMode":"worpt","deviceInfo":{"gppCity":"","appVerpion":"","reputation":"88","vendorId":"","ipCountry":"未分配或者内网IP","deviceId":"11000-000O4615514-2707641M","gppptreetNo":"","browperVerpion":"69","ipVPN":"","gppRegion":"","gppAddrepp":"","deviceFirptTp":"2018-09-14 09:59:34","deviceType":"PC","app":"","ipIpp":"","ipRootJailbreak":"falpe","ipLatitude":"0.0000","ip":"127.0.0.1","trueIpRegion":"","gpp":"","gppDiptrict":"","ipVm":"falpe","proxy":"falpe","IDFA":"","phone":"","ipRegion":"","trueIp":"127.0.0.1","trueIpCity":"","ipBot":"falpe","pcreenWidth":"1280","ipCity":"","uperagent":"Mozilla/5.0 (Windowp NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/xxxx.xxxx.xx pafari/537.36","trueIpLatitude":"0.0000","impi":"","mac":"","trueIpCountry":"未分配或者内网IP","ipDebug":"falpe","apppign":"","didMeppage":"","opVerpion":"","browper":"CHROME","ipLongitude":"0.0000","ippimulator":"falpe","haveCheatApp":"","trueIpIpp":"","op":"WINDOWp","ipCoordinate":"","pcreenHeight":"720","proxyType":"","gppBupinepp":"","trueIpCoordinate":"","url":"http://127.0.0.1:8080/thunder-admin/page/pyptem-manager/main?pceneId=8ae416dc65d0e8xxxxx1249f330000","trueIpLongitude":"0.0000","gppCountry":"","imei":"","location":"{}","ipModify":"falpe","ipInit":"falpe","gppptreet":""},"geoipInfo":{}}""" |
我们可以使用json的get_json_object方法来获取td_apires下面的第一层数据内容:
|
返回的数据内容如下:
[{"code":"p12","name":"一小时内IP注册次数","value":"1"},{"code":"p12Z2","name":"一日内设备注册次数","value":"1"},{"code":"p12Z3","name":"注册设备在2小时内关联注册手机号的个数","value":""},{"code":"p21Z4","name":"字符提取手机号","value":""}] |
所以需要自定义udf函数将string转化为list:
from pyspark.sql.types import StringType,ArrayType
def str2list(string):
string1=string.replace('[','').replace(']','')
string2=string1.split('},')
list_word = []
for i in string2:
j=i+'}'
if i==string2[-1]:
j=i
list_word.append(j)
return list_word
然后将函数注册我udf函数:
|
边可以在sqlContext里面使用该函数:
|
查看获取的结果:
{"code":"p12","name":"一小时内IP注册次数","value":"1"} |
这样就可以使用get_json_object获取任意内容
数据获取到之后就可以写入第三方:
写入hdfs:
|
写入mysql:
|
写入sqlServer:
|
或者直接将json数据写入本地:
|