版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_33689414/article/details/85125302
Spark UDF实践之json解析
我们一般使用spark处理json字段时,通常使用schema来约束json的字段,但是json数据中也会有一些需要特殊处理的字段需要获取,那么我们就需要通过UDF来进行处理了。
下面解析一个json的数据做一个示例:
json数据源:
{"final_score":16, "risk_items":[{"item_id":3403925, "item_name":"7天内申请人在多个平台申请借款", "risk_level":"high", "group":"多平台借贷申请检测", "item_detail":{"discredit_times":null, "overdue_details":null, "platform_count":2, "court_details":null, "fraud_type":null, "platform_detail":["一般消费分期平台:1", "P2P网贷:1"], "high_risk_areas":null, "hit_list_datas":null, "frequency_detail_list":null}},{"item_id":3403927,"item_name":"1个月内申请人在多个平台申请借款","risk_level":"medium","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403929,"item_name":"3个月内申请人在多个平台申请借款","risk_level":"medium","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403931,"item_name":"6个月内申请人在多个平台申请借款","risk_level":"medium","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403935,"item_name":"18个月内申请人在多个平台申请借款","risk_level":"low","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403937,"item_name":"24个月内申请人在多个平台申请借款","risk_level":"low","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}},{"item_id":3403939,"item_name":"60个月以上申请人在多个平台申请借款","risk_level":"low","group":"多平台借贷申请检测","item_detail":{"discredit_times":null,"overdue_details":null,"platform_count":2,"court_details":null,"fraud_type":null,"platform_detail":["一般消费分期平台:1","P2P网贷:1"],"high_risk_areas":null,"hit_list_datas":null,"frequency_detail_list":null}}],"final_decision":"Accept","report_time":1495377281000,"success":true,"report_id":"ER2017052122344113605405","apply_time":1495377281000}
我们需要解析出,item_name分别为:7天内申请人在多个平台申请借款,1个月内申请人在多个平台申请借款,3个月内申请人在多个平台申请借款,6个月内申请人在多个平台申请借款,对应的platform_count的值。
下面就直接上代码了:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
if __name__ == '__main__':
# 定义UDF函数,字段的获取规则
def parse_1(risk_items, item_name):
for i in risk_items:
print(i)
print(type(i))
try:
if i.item_name == item_name:
return i.item_detail.platform_count
except:
return ""
# 解决一个python环境的bug,本地默认是python3,这里用的是python2.7版本
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python'
spark = SparkSession \
.builder \
.appName("application") \
.master("local") \
.getOrCreate()
# 注册UDF函数,sparksql函数名为td_parse1,定义的func名parse_1
spark.udf.register("td_parse1", parse_1)
# 读取json数据
df = spark.read.json("1.json")
# 创建临时表
df.createOrReplaceTempView("tmp")
# 定义sparksql
resDf = spark.sql(
"""
select
final_score as td_final_score,
td_parse1(risk_items,'7天内申请人在多个平台申请借款') as td_platform_count_7d,
td_parse1(risk_items,'1个月内申请人在多个平台申请借款') as td_platform_count_1m,
td_parse1(risk_items,'3个月内申请人在多个平台申请借款') as td_platform_count_3m,
td_parse1(risk_items,'6个月内申请人在多个平台申请借款') as td_platform_count_6m
from tmp""")
# 展示数据
resDf.show()
spark.stop()
解析出来的结果如图: