用spark清洗数据踩过的坑(spark和Python保存csv的区别);以及调pg库还是api获取数据的策略

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/sinat_26566137/article/details/82149369

(1)解析表有问题

import sys

from pyspark.sql.types import StructType, StringType, StructField

reload(sys)
sys.setdefaultencoding('utf8')
# Path for spark source folder
import os

# os.environ['SPARK_HOME'] = "/opt/spark-2.0.1-bin-hadoop2.7/opt/spark-2.0.1-bin-hadoop2.7"

# Append pyspark  to Python Path
# sys.path.append("/opt/spark-2.0.1-bin-hadoop2.7/python/")
# sys.path.append("/opt/spark-2.0.1-bin-hadoop2.7/python/lib/")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    from pyspark.sql import SQLContext
    from pyspark.sql import DataFrame
    from pyspark.sql import Row
    print("Successfully imported Spark Modules")
except ImportError as e:
    print("Can not import Spark Modules", e)
    sys.exit(1)
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import Row
import os
sparkpath = "spark://192.168.31.10:7077"

# Path for spark source folder
# os.environ['SPARK_HOME'] = "/opt/spark-2.0.1-bin-hadoop2.7"

# Append pyspark  to Python Path
# sys.path.append("/opt/spark-2.0.1-bin-hadoop2.7/python/")
# sys.path.append("/opt/spark-2.0.1-bin-hadoop2.7/python/lib/")

hdfspath = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/courtnotice.csv'
hdfspath_1 = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc_litigant.txt'
hdfspath_2 = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc_detail_tmp.csv'
hdfspath_3 ='hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/feature/feature_extract_shixin.csv'
hdfspath_4 ='hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/mid/saic/share_as_fr.csv'
hdfs_temp = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/update_trade_cnt_feature_data.csv'
hdfs_temp_02 = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/new_update_lgbm_model_data.csv'

hdfs_temp_03 = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/test_revoke_features_to_results.csv '
trademark_raw = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/trademark.csv'
trademark_mid = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/mid/trademark_mid.csv'
trademark_feature ='hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/feature/feature_extract_trademark.csv'
feature_extract_judgedoc ='hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/feature/feature_extract_judgedoc.csv'
judgedoc_litigant_mid = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/mid/judgedoc_litigant.csv'
judgedoc_litigant_raw = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc_litigant.txt'

#对外投资公司执行次数
network_c_inv_c_zhixing_cnt_temp ='hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/feature/network_c_inv_c_zhixing_cnt.csv'

# 公司类型的股东
raw_path_c_inv_c_share_from_saic = "hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/mid/list_company_a/c_inv_c_share.csv"
#该公司没有公司类型的股东;

#公司   ---投资--->  公司  /hdfs/riskModelNotDaily/raw/saic/company_inv_company.csv
company_inv_company = "hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/raw/saic/company_inv_company.csv"
#主体公司的股东投资的公司
rst_c_inv_c_inv_c = "hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/mid/network/c_inv_c_inv_c.csv"

#主体公司股东对外作为股东的公司
share_as_share_output = "hdfs://192.168.31.10:9000/hdfs/riskModelNotDaily/mid/network/share_as_share.csv"
cq_bank_1000 = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank.csv'

shixin = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/shixin.csv'
punish = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/punish.csv'
punish_litigants = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/punish_litigants.csv'
patent = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/company_patent.csv'
# trademark = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/mid/trademark_mid.csv'
courtnotice = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/mid/courtnotice_mid.csv'
courtannouncement = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/part-r-00000-83ceb54d-a0ac-4c67-b8c9-948bb2d11aa4.csv'
courtsszc_company = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/court_sszc_crawl_company.csv'
courtsszc_detail = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/court_sszc_crawl.csv'

judgedoc_company = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc_litigant.csv'
judgedoc_detail = 'hdfs://192.168.31.10:9000/hdfs/riskModelAuto/2018-07-24/raw/judgedoc.csv'

shixin_company = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/court_shixin_company.csv'
trademark = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/trademark.csv'
# punish = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/punish.csv'
# punish_litigants = 'hdfs://192.168.31.10:9000/hdfs/tmp/statistic/punish_litigants.csv'

# df1 = pd.read_csv('/home/sc/Downloads/staticdata/3.01_data/update_trade_cnt_feature_data.csv')
# print(df1.groupby(df1['_c1']).size())
def test_courtsszc(spark,sc):
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        courtsszc_company,header = True)
    dfkk2.createOrReplaceTempView('y2')
    dfkk3 = sqlContext.read.csv(
        courtsszc_detail, header=True)
    dfkk3.createOrReplaceTempView('y3')
    # dfkk2.show()
    dfhh1 = sqlContext.sql(
        """select y2.company_name,y3.* from y2 left join y3 on y2.sc_data_id = y3.sc_data_id
         """)
    dfhh1.show()
    dfhh1.createOrReplaceTempView('hh1')
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh2 = sqlContext.sql(
        """select y4.company_name as company_name_a,hh1.* from y4 left join hh1 on y4.company_name = hh1.company_name
         """)
    dfhh2.show()
    dfhh2.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_courtsszc.csv",
                                   mode='overwrite', header=True,quoteAll = True)
    spark.stop()

def test_judgedoc(spark,sc):
    'litigant_name, doc_id,litigant_type,case_type,case_reason,publish_date,trail_date,case_result'
    judgedoc_sample_field = ['litigant_name','doc_id','litigant_type','case_type',
                             'case_reason','publish_date','trail_date','case_result']
    judgedoc_sample_schema = StructType(
    [StructField(field_name, StringType(), True) for field_name in judgedoc_sample_field])
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        judgedoc_company,header = False,sep = '\t')
    dfkk2.createOrReplaceTempView('y2')
    # print(dfkk2.columns)
    # spark.stop()
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh2 = sqlContext.sql(
        """select y4.company_name,y2._c0 as doc_id from y4 left join y2 on y4.company_name = y2._c1
         """)
    # dfhh2.show()
    dfhh2.createOrReplaceTempView('hh2')
    dfkk3 = sqlContext.read.csv(
        judgedoc_detail,quote="",schema=judgedoc_sample_schema,header=False,nanValue='nan',
    nullValue='nan',inferSchema=True)
    print(dfkk3.columns)
    print(type(dfkk3))
    # print(dfkk3[1:3])
    spark.stop()
    # dfkk3 = dfkk3 \
    #     .filter(lambda line: len((line["value"] or "").split(',')) == 8) \
    #     .map(lambda line: Row(**dict(zip(judgedoc_sample_field, line["value"].split(','))))) \
    #     .toDF(judgedoc_sample_schema)
    #     #  \

    dfkk3.createOrReplaceTempView('y3')
    # dfkk2.show()
    dfhh1 = sqlContext.sql(
        """select hh2.company_name,y3.* from hh2 left join y3 on hh2.doc_id = y3.doc_id
         """)
    # dfhh1.show()
    dfhh1.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_judgedoc.csv",
                                   mode='overwrite', header=True,quoteAll = True,sep = '\t')
    spark.stop()

def test_shixin(spark,sc):
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        shixin_company,header = True)
    dfkk2.createOrReplaceTempView('y2')
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh2 = sqlContext.sql(
        """select y4.company_name as company_name_a,y2.* from y4 left join y2 on y4.company_name = y2.company_name
         """)
    dfhh2.show()
    dfhh2.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_shixin.csv",
                                   mode='overwrite', header=True, quoteAll = True )
    spark.stop()

def test_punish(spark,sc):
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        punish_litigants,header = True)
    dfkk2.createOrReplaceTempView('y2')
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh3 = sqlContext.sql(
        """select company_name,y2.sc_data_id from y4 left join y2 on y4.company_name = y2.name
         """)
    dfhh3.createOrReplaceTempView('y3')
    dfkk6 = sqlContext.read.csv(
        punish, header=True)
    dfkk6.createOrReplaceTempView('y6')
    dfhh5 = sqlContext.sql(
        """select company_name,y6.* from y3 left join y6 on y3.sc_data_id = y6.sc_data_id
         """)

    dfhh5.show()
    dfhh5.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_punish.csv",
                                   mode='overwrite', header=True, quoteAll = True )
    spark.stop()

def test_trademark(spark,sc):
    sqlContext = SQLContext(sparkContext=sc)
    dfkk2 = sqlContext.read.csv(
        trademark,header = True)
    dfkk2.createOrReplaceTempView('y2')
    dfkk4 = sqlContext.read.csv(
        cq_bank_1000, header=True)
    dfkk4.createOrReplaceTempView('y4')
    dfhh3 = sqlContext.sql(
        """select y4.company_name as company_name_a,y2.* from y4 left join y2 on y4.company_name = y2.company_name
         """)
    dfhh3.repartition(1).write.csv("hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_trademark.csv",
                                   mode='overwrite', header=True, quoteAll = True )
    spark.stop()

#
if __name__ == '__main__':

    spark = SparkSession.builder.master(sparkpath).appName("SC_ETL_ccs_spark") \
        .getOrCreate()
    # spark.conf.set("spark.driver.maxResultSize", "4g")
    # spark.conf.set("spark.sql.broadcastTimeout", 1200)
    # spark.conf.set("spark.sql.crossJoin.enabled", "true")
    # spark.conf.set("spark.cores.max", "10")
    sc = spark.sparkContext
    # test_courtsszc(spark, sc)
    # test_judgedoc(spark, sc)
    # test_shixin(spark, sc)
    # test_punish(spark, sc)
    test_trademark(spark, sc)

对于保存的结果表,用csv打开后为乱码.可以保存的时候设置参数:quoteAll = True;就是
参数详细解释:

对于裁判文书:
dfkk3 = sqlContext.read.csv(
judgedoc_detail,quote=”“,schema=judgedoc_sample_schema,header=False,nanValue=’nan’,
nullValue=’nan’,inferSchema=True)
读表的时候有问题,这样设置参数还是有问题;

参考:https://blog.csdn.net/u010801439/article/details/80033341
pandas的to_csv函数:
DataFrame.to_csv(path_or_buf=None, sep=', ', na_rep='', float_format=None, columns=None, 
header=True, index=True, index_label=None, mode='w', encoding=None, compression=None, 
quoting=None, quotechar='"', line_terminator='\n', chunksize=None, tupleize_cols=None, 
date_format=None, doublequote=True, escapechar=None, decimal='.')
quoting : optional constant from csv module

CSV模块的可选常量
默认值为to_csv.QUOTE_MINIMAL。如果设置了浮点格式,那么浮点将转换为字符串,因此csv.QUOTE_NONNUMERIC会将它们视为非数值的。

quotechar : string (length 1), default ‘”’
字符串(长度1),默认“”
用于引用字段的字符

(2)对于程序中有异常抛出的地方,可以直接用try来解决,
形如:
try:
except exception ,e:
logging.error(‘error message: %s’, e.message)

   def test_get_model_risk_result__no_mock(self):
        # -- given
        biz_svc = BizRevokeController()
        df1 = pd.read_csv('/home/sc/Desktop/cq_bank_10000/com_10000.csv')
        # df1=df1[2100:2500]
        # df1=df1[1:2]
        # print(df1[1500:1501])
        Finadata = []
        for idx, row in df1.iterrows():
            # company_name =u'云南驰宏锌锗股份有限公司'
            company_name = row['company_name']
            print(company_name)
            if unicode(company_name) != u'云南驰宏锌锗股份有限公司' and unicode(company_name) !=u'重庆光大(集团)有限公司':
                # -- when
                try:
                    risk_res = biz_svc.get_model_risk_result(company_name, True)
                except Exception, e:
                    logging.error('error message: %s', e.message)
                    continue
                if risk_res.evaluate_dto:
                    revoke_prob = risk_res.evaluate_dto.risk_prob
                    revoke_rank = risk_res.evaluate_dto.risk_rank
                else:
                    revoke_prob = None
                    revoke_rank = None
                status_message = risk_res.status_message

                Finadata.append(
                        {'company_name': company_name,'revoke_prob':revoke_prob,'revoke_rank':revoke_rank,
                         'status_message':status_message})

        fieldnames = ['company_name', 'revoke_prob','revoke_rank','status_message']
        # 'risk_score','rank', 'original_industry', 'big_hy_code', , 'sub_rank']
        with open(save_path, 'w') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            # writer.writeheader()
            for data in Finadata:
                writer.writerow(data)

(3)写成csv文件的时候:
追加模式可以,每一次追加都会自动带上表头,那么就可以省略掉表头:
例如:将writer.writeheader()注释掉;
with open(save_path, ‘w’) as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# writer.writeheader()
for data in Finadata:
writer.writerow(data)

总结:
(1)对于csv保存在spark和Python中关于引号的设置很关键;
上述解析表有问题,是因为,分隔符不确定,有可能一个长句下面也有逗号,那么就会把长句也切分开了,导致乱码,因此,较好的方式是将数据的每一字段用双引号括起来,因此,quoteAll以及quote这样的设置很重要;
dfhh5.repartition(1).write.csv(“hdfs://192.168.31.10:9000/hdfs/tmp/statistic/cq_bank_punish.csv”,
mode=’overwrite’, header=True, quoteAll = True )
上述是在spark中写csv的设置;下面是在Python中写csv的设置;
df.to_csv(‘/home/sc/Downloads/staticdata/cq_bank/new/court_notice.csv’,
sep=’,’, quotechar=’”’, index=False, columns=columns_1,quoting=1)
这样设置后,用ubuntu中csv打开不会串列;
核心:调用api获取字段,逐步读取字段进行保存;
(2)数据清洗策略及逻辑处理技巧:
需要详细字段,可考虑直接读库或者直接调api,因为公司少,因此优先选择先调api;调库的话导全表出来比较慢,另外在join的时候,优先将小表作为左表,大表放在右边,这样加快运算速度;

疑问:为什么那些表存在数据库里面是正常的,取出来之后就各种取不对了,比如裁判文书的csv,取出来,就解析不成指定的字段了,唯一之前的处理方法是将文件保存成txt,重新解析txt,读取每一行line,再做处理;;可是,我用同样处理txt的方式处理csv,处理不成功,会报一些奇奇怪怪的错误,待解决,,txt和csv的读取区别;还有spark session read.txt,
read.csv,saveasTextfile

f_e_api = FeatureExtractApiSvc()
columns_1 = ['companyName', 'scDataId', 'litigant', 'publishYear', 'docType',
             'caseReason','court', 'publishTime','role']
columns_2 = ['scDataId', 'litigant', 'publishYear', 'docType',
             'caseReason', 'court', 'publishTime','role']
for idx,row in df1.iterrows():
    data = court_notice.court_notice_company(row['company_name'])
    # print(data)
    if data:
        try:
            if data['data']:
                for index, name in enumerate(data['data']['info']):
                    a.append(data['data']['companyName'])
                    for j in columns_2:
                        a.append(name[j])
                    A.append(a)
                    a=[]
        except Exception as e:
            continue
    a = []

df = pd.DataFrame(A, columns=columns_1)
# df.t
df.to_csv('/home/sc/Downloads/staticdata/cq_bank/new/court_notice.csv',
          sep=',', quotechar='"', index=False, columns=columns_1,quoting=1)

详细代码:

from __future__ import absolute_import
import sys

from api.feature_extract_api_svc import FeatureExtractApiSvc

reload(sys)
sys.setdefaultencoding('utf8')
from api.api_utils.api_helper import ApiHelper
from conf.config import GET_HY_MAP_INFO
from scpy.logger import get_logger
import pandas as pd

logging = get_logger(__file__)


class CourtAnnouncementMapApiSvc(object):
    """
    行业映射辅助字段
    """

    @staticmethod
    def court_announcement_company(company_name):
        """
        根据公司名获取行业映射的辅助字段
        see wiki http://192.168.31.157:8200/doku.php?id=huangyu:网络图:中金全库网络图
        """
        url_court_announcement_company_format = 'http://192.168.31.121:9488/api/courtAnnouncement/info' + u'?companyName={}'
        url = url_court_announcement_company_format.format(company_name)
        return ApiHelper.get_url_dict(url)
''
C_a = CourtAnnouncementMapApiSvc()
cq_bank_1000 = '/home/sc/Desktop/cq_bank.csv'
df1 = pd.read_csv(cq_bank_1000)
# df1=df1[1:20]
A=[]
a=[]
columns_1 = ['companyName', 'caseReason', 'court', 'judgeTime', 'judgeYear', 'judgedocId',
             'litigant', 'litigantType', 'litigantTypeAlias', 'scDataId']
columns_2 = ['caseReason', 'court', 'judgeTime', 'judgeYear', 'judgedocId',
             'litigant', 'litigantType', 'litigantTypeAlias', 'scDataId']
for idx,row in df1.iterrows():
    data = C_a.court_announcement_company(row['company_name'])
    # print(data)
    if data:
        try:
            if data['data']:
                for index, name in enumerate(data['data']['info']):
                    a.append(str(data['data']['companyName']))
                    for j in columns_2:
                        a.append(name[j])
                    A.append(a)
                    a=[]
        except Exception as e:
            continue
    a = []

df = pd.DataFrame(A,columns=columns_1,dtype=str)
# df.t
df.to_csv('/home/sc/Downloads/staticdata/cq_bank/new/court_announcement_new.csv',
          sep=',',quotechar='"',index=False,columns=columns_1,na_rep='nan',quoting=1,
          )
print(df)


##司法拍卖
# 'http://192.168.31.121:9488/api/courtNotice/info?companyName=%E4%B8%AD%E4%BF%A1%E9%93%B6%E8%A1%8C%E8%82%A1%E4%BB%BD%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8%E6%B7%B1%E5%9C%B3%E5%88%86%E8%A1%8C'
# http://192.168.31.121:9488/api/sszcNew/search?scDataId=29a18368-976e-11e7-8dd7-9c5c8e925e38




'''
class CourtNoticeApiSvc(object):
    """
    行业映射辅助字段
    """

    @staticmethod
    def court_notice_company(company_name):
        """
        根据公司名获取行业映射的辅助字段
        see wiki http://192.168.31.157:8200/doku.php?id=huangyu:网络图:中金全库网络图
        """
        url_court_notice_company_format = 'http://192.168.31.121:9488/api/courtNotice/info' + u'?companyName={}'
        url = url_court_notice_company_format.format(company_name)
        return ApiHelper.get_url_dict(url)

court_notice = CourtNoticeApiSvc()
cq_bank_1000 = '/home/sc/Desktop/cq_bank.csv'
df1 = pd.read_csv(cq_bank_1000)
# A=[[u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'', u'\u4e50\u5c71\u5e02\u4e94\u901a\u6865\u533a\u4eba\u6c11\u6cd5\u9662', u'2016-06-16 15:50:00', 2016, u'', [{u'litigantName': u'\u4ee3\u5efa\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'industryCode': u'7212', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4e2d\u56fd\u5e73\u5b89\u8d22\u4ea7\u4fdd\u9669\u80a1\u4efd\u6709\u9650\u516c\u53f8\u5609\u5dde\u652f\u516c\u53f8', u'industryCode': u'68', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u675c\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4ee3\u8d85', u'industryCode': u'0000', u'litigantType': u'\u539f\u544a', u'litigantTypeAlias': u'\u539f\u544a'}], u'', u'', u'd58e534e72940b9ab03a64d0b5d535f2'], [u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'', u'\u4e50\u5c71\u5e02\u4e94\u901a\u6865\u533a\u4eba\u6c11\u6cd5\u9662', u'2016-06-16 15:50:00', 2016, u'', [{u'litigantName': u'\u4ee3\u5efa\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'industryCode': u'7212', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4e2d\u56fd\u5e73\u5b89\u8d22\u4ea7\u4fdd\u9669\u80a1\u4efd\u6709\u9650\u516c\u53f8\u5609\u5dde\u652f\u516c\u53f8', u'industryCode': u'68', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u675c\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4ee3\u8d85', u'industryCode': u'0000', u'litigantType': u'\u539f\u544a', u'litigantTypeAlias': u'\u539f\u544a'}], u'', u'', u'cf1d311ce9722fc44de44c94799b5f9e'], [u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'', u'\u4e50\u5c71\u5e02\u4e94\u901a\u6865\u533a\u4eba\u6c11\u6cd5\u9662', u'2016-06-16 14:30:00', 2016, u'', [{u'litigantName': u'\u4ee3\u5efa\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u539f\u544a', u'litigantTypeAlias': u'\u539f\u544a'}, {u'litigantName': u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'industryCode': u'7212', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4e2d\u56fd\u5e73\u5b89\u8d22\u4ea7\u4fdd\u9669\u80a1\u4efd\u6709\u9650\u516c\u53f8\u5609\u5dde\u652f\u516c\u53f8', u'industryCode': u'68', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u675c\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}], u'', u'', u'e3eb6264454d2df361da2c40a265e1b3'], [u'\u91cd\u5e86\u534e\u5b87\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'\u5efa\u8bbe\u5de5\u7a0b\u65bd\u5de5\u5408\u540c\u7ea0\u7eb7', u'\u91cd\u5e86\u5e02\u6e1d\u5317\u533a\u4eba\u6c11\u6cd5\u9662\xa0', u'2018-03-29 00:00:00', 2018, u'', [{u'litigantName': u'\u91cd\u5e86\u534e\u59ff\u5efa\u7b51\u5b89\u88c5\u5de5\u7a0b\u6709\u9650\u516c\u53f8', u'industryCode': u'49', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u91cd\u5e86\u5929\u6d77\u5b89\u88c5\u5de5\u7a0b\u6709\u9650\u516c\u53f8', u'industryCode': u'4910', u'litigantType': u'\u539f\u544a', u'litigantTypeAlias': u'\u539f\u544a'}, {u'litigantName': u'\u91cd\u5e86\u534e\u5b87\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'industryCode': u'7010', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}], u'', u'', u'cfbf1ea1502fd77cd0d537d816bd988c']]

A=[]
a=[]
# df1 =df1[1:20]
f_e_api = FeatureExtractApiSvc()
columns_1 = ['companyName', 'scDataId', 'litigant', 'publishYear', 'docType',
             'caseReason','court', 'publishTime','role']
columns_2 = ['scDataId', 'litigant', 'publishYear', 'docType',
             'caseReason', 'court', 'publishTime','role']
for idx,row in df1.iterrows():
    data = court_notice.court_notice_company(row['company_name'])
    # print(data)
    if data:
        try:
            if data['data']:
                for index, name in enumerate(data['data']['info']):
                    a.append(data['data']['companyName'])
                    for j in columns_2:
                        a.append(name[j])
                    A.append(a)
                    a=[]
        except Exception as e:
            continue
    a = []

df = pd.DataFrame(A, columns=columns_1)
# df.t
df.to_csv('/home/sc/Downloads/staticdata/cq_bank/new/court_notice.csv',
          sep=',', quotechar='"', index=False, columns=columns_1,quoting=1)

'''

###裁判文书

'''
class JudgedocApiSvc(object):
    """
    行业映射辅助字段
    """

    @staticmethod
    def judgedoc_company(company_name):
        """
        根据公司名获取行业映射的辅助字段
        see wiki http://192.168.31.157:8200/doku.php?id=huangyu:网络图:中金全库网络图
        """
        url_judgedoc_company_format = 'http://192.168.31.121:9488/api/judgedoc/info' + u'?companyName={}'
        url = url_judgedoc_company_format.format(company_name)
        return ApiHelper.get_url_dict(url)

judgedoc = JudgedocApiSvc()
cq_bank_1000 = '/home/sc/Desktop/cq_bank.csv'
df1 = pd.read_csv(cq_bank_1000)
# A=[[u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'', u'\u4e50\u5c71\u5e02\u4e94\u901a\u6865\u533a\u4eba\u6c11\u6cd5\u9662', u'2016-06-16 15:50:00', 2016, u'', [{u'litigantName': u'\u4ee3\u5efa\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'industryCode': u'7212', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4e2d\u56fd\u5e73\u5b89\u8d22\u4ea7\u4fdd\u9669\u80a1\u4efd\u6709\u9650\u516c\u53f8\u5609\u5dde\u652f\u516c\u53f8', u'industryCode': u'68', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u675c\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4ee3\u8d85', u'industryCode': u'0000', u'litigantType': u'\u539f\u544a', u'litigantTypeAlias': u'\u539f\u544a'}], u'', u'', u'd58e534e72940b9ab03a64d0b5d535f2'], [u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'', u'\u4e50\u5c71\u5e02\u4e94\u901a\u6865\u533a\u4eba\u6c11\u6cd5\u9662', u'2016-06-16 15:50:00', 2016, u'', [{u'litigantName': u'\u4ee3\u5efa\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'industryCode': u'7212', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4e2d\u56fd\u5e73\u5b89\u8d22\u4ea7\u4fdd\u9669\u80a1\u4efd\u6709\u9650\u516c\u53f8\u5609\u5dde\u652f\u516c\u53f8', u'industryCode': u'68', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u675c\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4ee3\u8d85', u'industryCode': u'0000', u'litigantType': u'\u539f\u544a', u'litigantTypeAlias': u'\u539f\u544a'}], u'', u'', u'cf1d311ce9722fc44de44c94799b5f9e'], [u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'', u'\u4e50\u5c71\u5e02\u4e94\u901a\u6865\u533a\u4eba\u6c11\u6cd5\u9662', u'2016-06-16 14:30:00', 2016, u'', [{u'litigantName': u'\u4ee3\u5efa\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u539f\u544a', u'litigantTypeAlias': u'\u539f\u544a'}, {u'litigantName': u'\u56db\u5ddd\u548c\u90a6\u6295\u8d44\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'industryCode': u'7212', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u4e2d\u56fd\u5e73\u5b89\u8d22\u4ea7\u4fdd\u9669\u80a1\u4efd\u6709\u9650\u516c\u53f8\u5609\u5dde\u652f\u516c\u53f8', u'industryCode': u'68', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u675c\u6ce2', u'industryCode': u'0000', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}], u'', u'', u'e3eb6264454d2df361da2c40a265e1b3'], [u'\u91cd\u5e86\u534e\u5b87\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'\u5efa\u8bbe\u5de5\u7a0b\u65bd\u5de5\u5408\u540c\u7ea0\u7eb7', u'\u91cd\u5e86\u5e02\u6e1d\u5317\u533a\u4eba\u6c11\u6cd5\u9662\xa0', u'2018-03-29 00:00:00', 2018, u'', [{u'litigantName': u'\u91cd\u5e86\u534e\u59ff\u5efa\u7b51\u5b89\u88c5\u5de5\u7a0b\u6709\u9650\u516c\u53f8', u'industryCode': u'49', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}, {u'litigantName': u'\u91cd\u5e86\u5929\u6d77\u5b89\u88c5\u5de5\u7a0b\u6709\u9650\u516c\u53f8', u'industryCode': u'4910', u'litigantType': u'\u539f\u544a', u'litigantTypeAlias': u'\u539f\u544a'}, {u'litigantName': u'\u91cd\u5e86\u534e\u5b87\u96c6\u56e2\u6709\u9650\u516c\u53f8', u'industryCode': u'7010', u'litigantType': u'\u88ab\u544a', u'litigantTypeAlias': u'\u88ab\u544a'}], u'', u'', u'cfbf1ea1502fd77cd0d537d816bd988c']]

A=[]
a=[]
# df1 =df1[1:2]
f_e_api = FeatureExtractApiSvc()
columns_1 = ['companyName', 'docId', 'caseType', 'caseReason', 'court',
             'judgeProcess','legalSource', 'title','caseCode','role','url',
             'litigant','publishDate','trailDate','caseResult','updateAt']
columns_2 = ['docId', 'caseType', 'caseReason', 'court',
             'judgeProcess','legalSource', 'title','caseCode','role','url',
             'litigant','publishDate','trailDate','caseResult','updateAt']
for idx,row in df1.iterrows():
    data = judgedoc.judgedoc_company(row['company_name'])
    # print(data)
    if data:
        try:
            if data['data']:
                for index, name in enumerate(data['data']['data']):
                    a.append(data['data']['companyName'])
                    for j in columns_2:
                        a.append(name[0][j])
                    A.append(a)
                    a=[]
        except Exception as e:
            continue
    a = []

df = pd.DataFrame(A, columns=columns_1)
# df.t
df.to_csv('/home/sc/Downloads/staticdata/cq_bank/new/judgedoc.csv',
          sep=',', quotechar='"', index=False, columns=columns_1,quoting=1)

'''

猜你喜欢

转载自blog.csdn.net/sinat_26566137/article/details/82149369
今日推荐