多线程工作原理
多线程示意图
Queue(队列对象)
-
定义
queue是python中的标准库,可以直接from queue import Queue引用;队列是线程间最常用的交换数据的形式
-
python下多线程的思考
对于资源,加锁是个重要的环节。Queue,是线程安全的,因此在满足使用条件下,建议使用队列
-
创建一个“队列”对象
pageQueue = Queue(10)
-
将一个值放入队列中
for page in range(1, 11): pageQueue.put(page)
-
将一个值从队列中取出
pageQueue.get()
Queue队列的常用方法
- put()
- get(block)
- empty()
- full()
- qsize()
Queue用法示例
from queue import Queue
import threading
import time
num = 5
exit_flag = False
def assign_task(q:Queue):
global num
global exit_flag
while True: # 死循环一定要退出
if num == 10:
exit_flag = True
break
q.put('任务%d'%(num))
print('+++++++++新的任务%d分配了+++++++++++'%(num))
num +=1
time.sleep(2)
if __name__ == '__main__':
q = Queue(100)
for i in range(5):
q.put(item = 'task%d'%(i))
# 开启子线程分配任务
t = threading.Thread(target=assign_task,args=(q,))
t.start()
while True:
# if q.empty():
# print( '-------任务退出-------')
# break
if exit_flag:
print('任务完成下班回家!')
break
try:
print('-------------------',q.get(block=False))
except:
pass
队列锁和线程锁
主要用来给方法、代码块加锁。当某个方法或者代码块使用锁时,那么在同一时刻至多仅有有一个线程在执行该段代码。当有多个线程访问同一对象的加锁方法/代码块时,同一时间只有一个线程在执行,其余线程必须要等待当前线程执行完之后才能执行该代码段。但是,其余线程是可以访问该对象中的非加锁代码块的。
import threading
from queue import Queue
import time
exit_flag = False
def sail(q:Queue):
while True:
# if q.empty():
# break
if exit_flag:
break
print('----------线程%s售出票:%d-----------'
%(threading.current_thread().getName(),q.get()))
q.task_done()#任务完成
time.sleep(1)
if __name__ == '__main__':
tickets = Queue(1000)
for i in range(1000):
tickets.put(i)
threads = []
for i in range(100):
t = threading.Thread(target = sail,args=(tickets,))
t.start()
threads.append(t)
# for t in threads:
# t.join()
tickets.join()#等待所有的任务完成,下面代码才会执行
# 任务结束,退出条件反转
exit_flag = True
print('子线程任务全部结束!当前线程是:',threading.current_thread().getName())
多线程爬虫的实现
-
导包
import requests import json from lxml import etree from queue import Queue import threading import time
-
定义变量
# 声明变量 # 爬虫线程存放页码的队列,线程安全 crawlQueue = Queue() # 解析线程存放待解析数据的队列 parseQueue = Queue() # 解析线程是否退出 parseExitFlag = False # url base_url = 'http://www.qiushibaike.com/8hr/page/%d/'
-
创建爬虫线程并启动
# 现在使用多线程爬去前10页数据 url p = 1 p = 2 # 任务 for i in range(1,11): requestQueue.put(i) # 爬虫线程 for i in range(1,4): threadCrawl = ThreadCrawl(requestQueue,i) threadCrawl.start()
-
爬虫线程
class ThreadCrawl(threading.Thread): def __init__(self, queue,id): super().__init__() self.queue = queue self.id = id def run(self): super().run() print('爬虫线程:--------%d--------开始工作!'%(self.id)) self.getHtml() print('爬虫线程:--------%d--------结束工作!'%(self.id)) def getHtml(self): # 该方法要一直执行,直到请求队列中为空 while True: if self.queue.empty(): break try: p = self.queue.get(block = False) # 获得页码,进行网络请求 url = url_base%(p) response = requests.get(url=url,headers = headers,verify = False) html = response.text # 将网络请求获取的数据保存另一个队列中 parseQueue parseQueue.put((html,p)) self.queue.task_done() print('爬虫线程:-------%d-------下载页码:-------%d------的数据'%(self.id,p)) except Exception as e: pass pass
-
创建解析线程并启动
# 解析线程 fp = open('./糗事百科.txt','w',encoding='utf-8') for i in range(1,4): threadParese = ThreadParse(parseQueue,i,fp) threadParese.start()
-
解析线程
class ThreadParse(threading.Thread): def __init__(self, threadId, parseQueue, file): super().__init__() self.threadId = threadId self.parseQueue = parseQueue self.file = file def run(self): super().run() print('--------------------------------------starting parse ', self.threadId) global parseExitFlag while not parseExitFlag: try: item = self.parseQueue.get(False) self.parse(item) parseQueue.task_done() except: pass print('-------------------------------------exiting parse ', self.threadId) def parse(self,item): global parsePage try: xml = etree.HTML(item) result = xml.xpath('//div[contains(@id,"qiushi_tag")]') print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>',parsePage,len(result)) for site in result: img = site.xpath('.//img/@src')[0] user_name = site.xpath('.//h2')[0].text content = site.xpath('.//div[@class = "content"]/span')[0].text.strip() vote = site.xpath('.//i')[0].text comments = site.xpath('.//i')[1].text duanzi = { 'img': img, 'user_name': user_name, 'content': content, 'vote': vote, 'comments': comments} self.file.write(json.dumps(duanzi, ensure_ascii=False) + '\n') print('---------------------------------------parse done Thread = ', self.threadId, 'parsePage = ', parsePage) parsePage += 1 except Exception as e: pass ''' 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。 如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。 如果队列为空且block为False,队列将引发Empty异常。 '''
-
join()锁定线程,确保线程全部执行完毕
# 队列锁添加 requestQueue.join() parseQueue.join()
-
结束任务
# 解析线程可以退出 exitParseFlag = True # 关闭文件 fp.close()
多线程爬虫代码实现
import requests
from bs4 import BeautifulSoup
from queue import Queue
import threading
from threading import Lock
url = 'https://www.dushu.com/book/1175_%d.html'
task_queue = Queue(100)
parse_queue = Queue(100)
headers = {
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3',
'Accept-Encoding':'gzip, deflate, br',
'Accept-Language':'zh-CN,zh;q=0.9',
'Cache-Control':'max-age=0',
'Connection':'keep-alive',
'Cookie':'Hm_lvt_8008bbd51b8bc504162e1a61c3741a9d=1572418328; Hm_lpvt_8008bbd51b8bc504162e1a61c3741a9d=1572418390',
'Host':'www.dushu.com',
'Sec-Fetch-Mode':'navigate',
'Sec-Fetch-Site':'none',
'Sec-Fetch-User':'?1',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.70 Safari/537.36',}
# 解析线程退出的标记
exit_flag = False
# 相当于线程池
class CrawlThread(threading.Thread):
def __init__(self, q_task:Queue,q_parse:Queue) -> None:
super().__init__()
self.q_task = q_task
self.q_parse = q_parse
def run(self) -> None:
super().run()
self.spider()
# 一直干活
def spider(self):
while True:
if self.q_task.empty():
print('+++++++爬虫线程%s执行任务结束+++++++'%(threading.current_thread().getName()))
break
taskId = self.q_task.get()
response = requests.get(url % (taskId), headers = headers)
response.encoding = 'utf-8'
html = response.text
self.q_parse.put((html,taskId))
self.q_task.task_done()
print('------爬虫线程:%s-----执行任务:%d-------'
%(threading.current_thread().getName(),taskId))
# 专心爬虫
def crawl():
for i in range(1,101):
task_queue.put(i)
for i in range(5):
t = CrawlThread(task_queue,parse_queue)
t.start()
class ParseThread(threading.Thread):
def __init__(self,q_parse:Queue,lock:Lock,fp):
super().__init__()
self.q_parse = q_parse
self.lock = lock
self.fp = fp
def run(self):
super().run()
self.parse()
def parse(self):
while True:
if exit_flag:
print('-----------解析线程:%s完成任务退出------------'
%(threading.current_thread().getName()))
break
try:
html,taskId = self.q_parse.get(block=False)
soup = BeautifulSoup(html,'lxml')
books = soup.select('div[class="bookslist"] > ul > li')
print('----------------',len(books))
for book in books:
self.lock.acquire()
book_url = book.find('img').attrs['src']
book_title = book.select('h3 a')[0]['title']
book_author = book.select('p')[0].get_text()
book_describe = book.select('p')[1].get_text()
fp.write('%s\t%s\t%s\t%s\n'%(book_url,book_title,book_author,book_describe))
self.lock.release()
self.q_parse.task_done()
print('**********解析线程:%s完成了第%d页解析任务***********'
%(threading.current_thread().getName(),taskId))
except :
pass
# 专心的负责网页解析,保存
def parse(fp):
lock = Lock()
for i in range(5):
t = ParseThread(parse_queue,lock,fp)
t.start()
if __name__ == '__main__':
crawl()
fp = open('./book.txt','a',encoding='utf-8')
parse(fp)
# 队列join:队列中的任务必须结束,下面才会执行
task_queue.join()
parse_queue.join()
fp.close()
exit_flag = True
print('代码执行到这里!!!!!!!!!!!!!!')