本地小说:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from multiprocessing import Pool
class XS(object):
def __init__(self):
self.options = webdriver.FirefoxOptions()
self.options.headless = True
self.driver = webdriver.Firefox(firefox_options=self.options)
self.driver.get("file:///C:/Users/Administrator/Desktop/wanmeishijie/wanmeishijiexiaoshuo/index.htm")
self.driver.find_element_by_xpath('/html/body/div[8]/div/ul/li[1]/span/a').click()
def get_list(self):
title = self.driver.find_element_by_xpath('/html/body/div[6]/h1')
content = self.driver.find_element_by_xpath("/html/body/div[6]/div[2]")
next_href = self.driver.find_element_by_xpath("/html/body/div[6]/div[2]/a[1]").get_attribute("tppabs")
print(title.text)
print(content.text)
print(next_href)
data = {
"title":title.text,
"content":content.text,
"next_href":next_href,
}
f = open('wanmeishijie.txt', 'a', encoding='utf-8')
# for title, content, next_href in data:
f.write(data["title"])
f.write('\n')
f.write(data["content"])
f.write('\n')
f.write('\n')
f.write('\n')
f.close()
self.driver.find_element_by_xpath("/html/body/div[6]/div[2]/a[1]").click()
self.get_list()
if __name__ == '__main__':
a = XS()
a.get_list()
淘宝(笔记本电脑):
import time
from multiprocessing import Pool
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from lxml.html import etree
# apply_async() / map() 方法添加函数任务的时候,需要注意如果是在类中,保证类中的方法不能是实例方法。
class TaoBao(object):
options = webdriver.FirefoxOptions()
options.headless = True
driver = webdriver.Firefox(firefox_options=options)
def __init__(self, pool):
self.pool = pool
self.start_url = 'https://www.taobao.com/'
def start(self):
self.driver.get(self.start_url)
search_input = WebDriverWait(self.driver, 15).until(
lambda driver: driver.find_element_by_id('q'))
search_input.send_keys('笔记本电脑')
self.driver.find_element_by_class_name('btn-search').click()
# 点击进入到列表页之后,需要使用selenium操作滚动条,让页面滚动到底部。
for x in range(1, 11, 2):
height = float(x) / 10
# 根据x的值,计算整个页面高度需要循环的次数
# document.documentElement.scrollTop:当前页面相对于窗口顶部的偏移量
# document.documentElement.scrollHeight: 整个页面的高度,包含可滚动的部分
js = "document.documentElement.scrollTop = document.documentElement.scrollHeight * %f" % height
self.driver.execute_script(js)
time.sleep(0.2)
html = self.driver.page_source
self.parse_list_page(html)
@classmethod
def get_list_page(cls, html):
print('获取下一页源码的方法')
def parse_list_page(self, html):
list_html = etree.HTML(html, parser=etree.HTMLParser(encoding='utf-8'))
# 根据list_html找到下一页的连接,然后将下一页的连接的请求也加入进程池
next_url = 'list_html中提取'
self.pool.apply_async(self.get_list_page, args=(next_url,), callback=self.parse_list_page)
divs = list_html.cssselect('.info-cont')
for div in divs:
detail_url = 'https:'+div.cssselect('.product-title')[0].attrib['href']
self.pool.apply_async(self.get_detail_page, args=(detail_url,), callback=self.parse_detail_page)
@classmethod
def get_detail_page(cls, detail_url):
cls.driver.get(detail_url)
time.sleep(1)
detail_html = etree.HTML(cls.driver.page_source, parser=etree.HTMLParser(encoding='utf-8'))
title = detail_html.cssselect('.panel-head > .spu-title')[0].text
price = detail_html.cssselect('.price > strong')[0].text
return {
'title': title,
'content': price
}
@classmethod
def parse_detail_page(cls, data):
print(data)
if __name__ == '__main__':
pool = Pool()
taobao = TaoBao(pool)
taobao.start()
pool.close()
pool.join()
进程池:
class MyTest(object):
def __init__(self, pool):
self.pool = pool
@classmethod
def one(cls, result):
print('--',result)
@classmethod
def two(cls, x):
time.sleep(3)
print('====',x)
return x+100
def three(self):
# 向进程池中添加异步任务
for x in range(1, 10):
self.pool.apply_async(self.two,args=(x,),callback=self.one)
if __name__ == '__main__':
pool = Pool(4)
obj = MyTest(pool)
obj.three()
pool.close() # 关闭进程池,不再向进程池中添加任务。
pool.join() # 等待子进程执行完毕,再执行主进程的代码
print('程序执行结束了')