人人贷散标爬虫实例进阶-使用异步io

 写在前面:

1.该代码写于2020年4月5日,经查看,人人贷网站前端已出现变化,所以该代码无法直接运行。该代码为此版本的改善。人人贷散标爬虫实例_小zhan柯基-CSDN博客_人人贷爬虫

2.由于爬取数据量较大,达到几十万条,因此需要考虑健壮性与爬取速度。对于爬取速度,由于request库采用阻塞式访问,每分钟只可爬取200条贷款记录,而使用异步IO库,则爬取速度可以提升6~7倍,每分钟可爬取1200条左右的贷款记录。

3.虽代码无法运行,但使用aiohttp的过程仍有一定借鉴意义。

4.参考资料:异步IO - 廖雪峰的官方网站


#多进程模块
#多进程也可以用进程池Pool来写
from multiprocessing import Process, Queue
import time

#selenium模拟登录更新cookie
from selenium import webdriver
from selenium.webdriver.common.keys import Keys

#爬虫模块
import requests
from bs4 import BeautifulSoup
import re
import json
import csv

#异步io
import asyncio
import aiohttp
try:
    from aiohttp import ClientError
except:
    from aiohttp import ClientProxyConnectionError as ProxyConnectionError


# pattern = re.compile(r'.*var info = (.*?)截至借款前5年内借款人.*')
pattern = re.compile(r'.*var info = (.*?)var detail = .*')

def timestamp_datetime(value):
    format = '%Y-%m-%d %H:%M:%S'
    value = time.localtime(value)
    dt = time.strftime(format, value)
    return dt


def async_get_new_cookie():
    print("******************正在登录并更新cookie******************")
    # 无窗口浏览器参数
    opt = webdriver.ChromeOptions()
    opt.set_headless()
    driver = webdriver.Chrome(options=opt)
    # driver = webdriver.Chrome()
    driver.maximize_window()
    driver.get('https://www.renrendai.com/login')
    # 不考虑验证码的情况
    # print(driver.page_source)
    driver.find_element_by_xpath('//span[@class="tab-password"]').click()  # 点击登录进入登录界面
    print("******************输入手机号中******************")
    driver.find_element_by_xpath('//input[@placeholder="手机号/邮箱"]').send_keys('188****9029', Keys.TAB)  # 发送帐号名
    print("******************输入密码中******************")
    span_click = driver.find_element_by_xpath('//span[@id="rememberme-login"]')
    driver.execute_script("arguments[0].setAttribute(arguments[1], arguments[2])", span_click, "class", 'is-choose')
    driver.find_element_by_xpath('//input[@name="j_password"]').send_keys('zzz*****!!', Keys.ENTER)  # 发送密码并回车

    time.sleep(15)  # 等待cookie加载完成
    cookies_get = driver.get_cookies()

    # c = requests.cookies.RequestsCookieJar()
    cookies = {}
    for item in cookies_get:
        cookies[item["name"]] = item["value"]
    print("******************登录完毕******************")
    driver.quit()
    return cookies



async def asyncGetSingleHtmlText(url_borrower,cookies,q):
    my_header = {
        "User-Agent":"User-Agent:Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.11 (KHTML, like Gecko)Ubuntu/11.10 Chromium/27.0.1453.93 Chrome/27.0.1453.93 Safari/537.36",
    }
    conn = aiohttp.TCPConnector(verify_ssl=False)
    async with aiohttp.ClientSession(connector=conn) as session:
        try:
            async with session.get(url_borrower, cookies=cookies,timeout=100,headers=my_header,allow_redirects=False) as result:
                result_text = await result.text()
                q.put(result_text)
        except Exception as e:
            try:
                async with session.get(url_borrower, cookies=cookies, headers=my_header, timeout=100,allow_redirects=False) as result:
                    result_text = await result.text()
                    q.put(result_text)
            except Exception as e:
                print("Exception in asyncGetSingleHtmlText()", e.args)
                pass
            # print(e)
            # print(str(e))
        # except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError):
        #     print("Exception in asyncGetSingleHtmlText()")

def asyncgetHtmlText(q,url_list_borrower):
    BATCH_TEST_SIZE = 500
    # cookies = async_get_new_cookie()
    count = len(url_list_borrower)
    cookies_update = 0
    try:
        for i in range(0, count, BATCH_TEST_SIZE):
            start = i
            stop = min(i + BATCH_TEST_SIZE, count)
            loop = asyncio.get_event_loop()
            cookies = async_get_new_cookie()
            tasks = [asyncGetSingleHtmlText(url_borrower,cookies,q) for url_borrower in url_list_borrower[start:stop]]
            loop.run_until_complete(asyncio.wait(tasks))
            # print("******************等待60s******************")
            # time.sleep(60)
    except Exception as e:
        print('Exception in asyncgetHtmlText', e.args)


# 读数据进程执行的代码:
def asyncParseAndSave(q):
    while True:
        html_text = q.get(True)
        try:
            bs = BeautifulSoup(html_text, "html.parser")
            info = str(bs.find("script", {"src": "/ps/static/common/page/layout_c0258d7.js"}).next_sibling.string).replace("\n","")
            infoProcess = pattern.findall(info)[0].encode('utf-8').decode("utf-8").replace('\\u0022', '"').replace("\\u002D","-").replace("'","").replace("\\u005C","\\").replace(";","") #+ '"}}'
            info_dict = json.loads(infoProcess)
            # print(info_dict)
            if "gender" not in info_dict["borrower"]:
                print("gender not in borrower'key",info_dict["loan"]["loanId"])
                continue

            # invester_dict = html_text[1]
            # temp = []
            # # print(invester_dict)
            # # print(type(invester_dict))
            # # print(invester_dict["status"])
            # if invester_dict["status"] == 0 and invester_dict["data"]["joinCount"] > 0:
            #     for borrower in invester_dict["list"]:
            #         temp.append([borrower["amount"], timestamp_datetime(int(borrower["lendTime"]) / 1000)])
            # else:
            #     temp = ""
            # print(temp)

            with open("Mydata.csv","a") as csvfile:
                writer = csv.writer((csvfile))
                writer.writerow([info_dict["loan"]["loanId"],timestamp_datetime(int(info_dict["loan"]["openTime"])/1000),info_dict["loan"]["months"],info_dict["loan"]["leftMonths"],info_dict["loan"]["amount"],info_dict["userLoanRecord"]["notPayInterest"],
                                 info_dict["productRepayType"],info_dict["loan"]["borrowType"],info_dict["loan"]["interest"],info_dict["borrower"]["gender"],
                                 info_dict["borrower"]["homeTown"],info_dict["borrower"]["birthDay"],info_dict["borrower"]["graduation"],info_dict["borrower"]["office"],info_dict["borrower"]["officeDomain"],info_dict["borrower"]["officeScale"],info_dict["borrower"]["position"],
                                 info_dict["borrower"]["salary"],info_dict["borrower"]["carLoan"],info_dict["borrower"]["hasCar"],info_dict["borrower"]["marriage"],
                                 info_dict["borrower"]["houseLoan"],info_dict["borrower"]["hasHouse"],info_dict["borrower"]["creditLevel"],
                                 info_dict["loan"]["overDued"],info_dict["userLoanRecord"]["totalCount"],info_dict["userLoanRecord"]["successCount"],info_dict["userLoanRecord"]["alreadyPayCount"],info_dict["userLoanRecord"]["failedCount"],info_dict["loan"]["description"],
                                 ])
            print("id:{} has done".format(info_dict["loan"]["loanId"]))
            csvfile.close()

        except Exception as e:
            print("Exception in parser:",info_dict["loan"]["loanId"])
            continue

#定期更新cookie
def get_new_cookie(session):
        print("******************正在登录并更新cookie******************")
        #无窗口浏览器参数
        opt = webdriver.ChromeOptions()
        opt.set_headless()
        driver = webdriver.Chrome(options=opt)
        driver.maximize_window()
        driver.get('https://www.renrendai.com/login')
        # 不考虑验证码的情况
        driver.find_element_by_xpath('//span[@class="tab-password"]').click()  # 点击登录进入登录界面
        print("******************输入手机号中******************")
        driver.find_element_by_xpath('//input[@placeholder="手机号/邮箱"]').send_keys('yourAccountName', Keys.TAB)  # 发送帐号名
        print("******************输入密码中******************")
        driver.find_element_by_xpath('//input[@name="j_password"]').send_keys('yourPwd', Keys.ENTER)  # 发送密码并回车
        time.sleep(15)  # 等待cookie加载完成
        cookies = driver.get_cookies()

        c = requests.cookies.RequestsCookieJar()
        for item in cookies:
            c.set(item["name"], item["value"])
        session.cookies.update(c)  # 载入cookie
        print("******************登录完毕******************")
        driver.quit()

# 写数据进程执行的代码:
def getHtmlText(q,url_list):
    htmlTextList = []
    session = requests.Session()
    get_new_cookie(session)
    exception_count = 0
    my_header = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36",
    }
    for index,url in enumerate(url_list):#len(url_list):
        try:
            res = session.get(url,timeout=10,headers=my_header)
            res.raise_for_status()
            res.encoding = res.apparent_encoding
            htmlTextList.append(res.text)
            print("request:"+str(index))
            if (index+1)%250 == 0:
                print(res.text)
                get_new_cookie(session)
            if (index+1)%10 == 0:
                q.put(htmlTextList)
                htmlTextList = []
        except Exception as e:
            print("Exception in request:",index)
            exception_count += 1
            #多次解析错误,可能原因是
            if exception_count > 20:
                print("exception_count==50")
                time.sleep(60)
                get_new_cookie(session)
                exception_count = 0

# 读数据进程执行的代码:
def parseAndSave(q):
    while True:
        html_text_list = q.get(True)
        for index,html_text in enumerate(html_text_list):
            try:
                bs = BeautifulSoup(html_text, "html.parser")
                info = str(bs.find("script", {"src": "/ps/static/common/page/layout_c0258d7.js"}).next_sibling.string).replace("\n","")
                infoProcess = pattern.findall(info)[0].encode('utf-8').decode("utf-8").replace('\\u0022', '"').replace("\\u002D","-").replace("'","").replace("\\u005C","\\").replace(";","") #+ '"}}'
                # print(infoProcess)
                info_dict = json.loads(infoProcess)
                # print(info_dict)
                if "gender" not in info_dict["borrower"]:
                    print("gender not in borrower'key,index:",index)
                    continue

                with open("all.csv","a") as csvfile:
                    writer = csv.writer((csvfile))
                    writer.writerow([info_dict["loan"]["loanId"],info_dict["loan"]["months"],info_dict["loan"]["leftMonths"],info_dict["loan"]["amount"],info_dict["userLoanRecord"]["notPayInterest"],
                                     info_dict["productRepayType"],info_dict["loan"]["borrowType"],info_dict["loan"]["interest"],info_dict["borrower"]["gender"],
                                     info_dict["borrower"]["homeTown"],info_dict["borrower"]["birthDay"],info_dict["borrower"]["graduation"],info_dict["borrower"]["office"],info_dict["borrower"]["officeDomain"],info_dict["borrower"]["officeScale"],info_dict["borrower"]["position"],
                                     info_dict["borrower"]["salary"],info_dict["borrower"]["carLoan"],info_dict["borrower"]["hasCar"],info_dict["borrower"]["marriage"],
                                     info_dict["borrower"]["houseLoan"],info_dict["borrower"]["hasHouse"],info_dict["borrower"]["creditLevel"],
                                     info_dict["loan"]["overDued"],info_dict["userLoanRecord"]["totalCount"],info_dict["userLoanRecord"]["overdueCount"],info_dict["loan"]["description"],
                                     ])
                print("id:{} has done".format(info_dict["loan"]["loanId"]))
                csvfile.close()

            except Exception as e:
                print("Exception in parser:",info_dict["loan"]["loanId"])
                continue


if __name__=='__main__':
    print("******************begining******************")
    # #0.多线程/反

    #1.爬虫准备工作,提供url片段
    init_url_borrower = "https://www.renrendai.com/loan-{}.html"
    n = 700 * 10000 #600 * 10000  #660 650 640 630 620 610
    # n2 = 620 * 10000
    #     # n3 = 610 * 10000
    #     # n4 = 600 * 10000
    url_list1 = [init_url_borrower.format(i + n + 0000)  for i in range(25000)]
    url_list2 = [init_url_borrower.format(i + n + 25000) for i in range(25000)]
    url_list3 = [init_url_borrower.format(i + n + 50000) for i in range(25000)]
    url_list4 = [init_url_borrower.format(i + n + 75000) for i in range(25000)]


    #2.父子进程就绪
    #2.1父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw1 = Process(target=asyncgetHtmlText, args=(q, url_list1))
    pw2 = Process(target=asyncgetHtmlText, args=(q, url_list2))
    pw3 = Process(target=asyncgetHtmlText, args=(q, url_list3))
    pw4 = Process(target=asyncgetHtmlText, args=(q, url_list4))

    pr = Process(target=asyncParseAndSave, args=(q,))
    #2.2启动子进程pw*,pd,
    pw1.start()
    # pw2.start()
    # pw3.start()
    # pw4.start()

    pr.start()
    #2.3等待pw结束即全部读取进程工作完毕,才强制中止pr进程
    pw1.join()
    pw2.join()
    pw3.join()
    pw4.join()

    print("******************everything is ok,please terminate ******************")

猜你喜欢

转载自blog.csdn.net/zsllsz2022/article/details/121034623