python Thrading demo - 线程
demo_1
# -*- coding: utf-8 -*-
import threading, time
def run(num):
pass
print("subThread({}) is start...".format(threading.current_thread().name))
time.sleep(0.5)
print(num)
time.sleep(0.5)
print("subThread({}) is stop.".format(threading.current_thread().name))
if __name__ == '__main__':
pass
print("mainThreading({}) is starting...".format(threading.current_thread().name))
# create subThreading
"""
target=function
name = subThradingName
args = subThreading input args(tupe)
"""
subThreading = threading.Thread(target=run, name='runThreading', args=(1,))
subThreading.start()
# wait the mainThreading stop,then subThreading is stop
subThreading.join() # if not write this line ,the mainThreading is stop befor subThreading
print("mainThreading({}) is stop.".format(threading.current_thread().name))
demo_2 多线程
- 定义全局变量,便于资源共享
- 不定义线程锁。输出数字混乱
- 定义线程函数,并声明全局变量的使用范围,使用with对锁操作
# -*- coding: utf-8 -*-
import threading
num = 100
def run(n):
global num
for i in range(1000000):
num = num + n
num = num - n
if __name__ == '__main__':
t1 = threading.Thread(target=run, args=(6, ))
t2 = threading.Thread(target=run, args=(9, ))
t1.start()
t2.start()
t1.join()
t2.join()
print("num = {}".format(num))
print("mainThreading({}) is stop.".format(threading.current_thread().name))
demo_3 多线程_Lock
- 定义全局变量,便于资源共享
- 定义线程锁,只有当一个线程结束后,锁才会被释放,锁不被线程共享
- 定义线程函数,并声明全局变量的使用范围,使用with对锁操作
# -*- coding: utf-8 -*-
import threading
num = 100
lock = threading.Lock()
def run(n):
global num
for i in range(1000000):
with lock:
num = num + n
num = num - n
if __name__ == '__main__':
t1 = threading.Thread(target=run, args=(6, ))
t2 = threading.Thread(target=run, args=(9, ))
t1.start()
t2.start()
t1.join()
t2.join()
print("num = {}".format(num))
print("mainThreading({}) is stop.".format(threading.current_thread().name))
demo_4 一起过马路
- 凑够3人才能过马路
bar = threading.Barrier(3)
# -*- coding: utf-8 -*-
import threading, time
# 凑够3人才能过马路
bar = threading.Barrier(3)
def run():
print("{} is starting...".format(threading.current_thread().name))
time.sleep(1)
bar.wait()
print("{} is end.".format(threading.current_thread().name))
if __name__ == '__main__':
for i in range(5):
threading.Thread(target=run).start()
demo_5 Threading.Time
定时线程
# -*- coding: utf-8 -*-
import threading
def run():
print('-*' * 10)
print("runThreading is end.")
if __name__ == '__main__':
print("mainThreading is start...")
timeThreading = threading.Timer(3, run)
timeThreading.start()
timeThreading.join()
print("mainThreading is end.")
多线程爬取demo
业务流
#!/usr/bin/python3
# -*- coding:utf-8 -*-
# @Software : PyCharm
# @CreateTime: 2019-12-23 09:36
# @Author : spider
# @File : pyThread
import sys
import time
import pprint
import json
import re
from lxml import etree
import requests
import threading
from queue import Queue
"""
2类线程:3个下载,3个解析
内容队列:下载线程往队列中put数据,解析线程从队列中get数据
url队列: 下载线程从url队列get数据
写数据:上锁,保证文件不能同时被修改
"""
g_crawl_list = []
g_parse_list = []
class CrawThread(threading.Thread):
def __init__(self, name, page_queue, data_queue):
super(CrawThread, self).__init__()
self.name = name
self.page_queue = page_queue
self.data_queue = data_queue
self.url = r"{}"
self.hreaders = {}
def run(self):
print("{} ---------- crawl_thread start".format(self.name))
while True:
if self.page_queue.empty:
break
# 从队列中取出页码
page = self.page_queue.get()
# 拼接url,发送请求
url = self.url.format(page)
res = requests.get(url, headers=self.hreaders)
if res.ok:
# 将响应内容存放到data_queue
self.data_queue.put(res.text)
print("{} ---------- crawl_thread stop".format(self.name))
class ParserThread(threading.Thread):
def __init__(self, name, data_queue, fp, lock):
super(ParserThread, self).__init__()
self.name = name
self.data_queue = data_queue
self.lock = lock
self.fp = fp
def run(self):
while True:
if self.data_queue.empty():
break
print("{} ---------- parse_thread start".format(self.name))
# 从data_queue中取出一页数据
pageSourceHtml = self.data_queue.get()
# 解析内容
self.parse_content(pageSourceHtml)
print("{} ---------- parse_thread stop".format(self.name))
def parse_content(self, pageSourceHtml):
tree = etree.HTML(pageSourceHtml)
li_list = tree.xpath("//li")
items = []
for oli in li_list:
title = 'title'
imgLink = 'imgLink'
item = {
'title':title,
'imgLink':imgLink,
}
items.append(item)
# write to jsonFile
self.lock.acquire()
self.fp.write(json.dumps(items), ensure_acsii=False)
self.lock.release()
def function():
print("in function...")
print("-=" * 90)
try:
pass
except Exception as e:
print('\nLine_{:0>5d} in {} - {}'.format(
sys._getframe().f_lineno, __file__, e))
finally:
pass
def create_queue():
# 创建 页码 队列
page_queue = Queue()
for page in range(1, 11):
page_queue.put(page)
# 创建 内存 队列
data_queue = Queue()
return page_queue, data_queue
def create_crawl_thread(page_queue, data_queue):
crawl_name = ['crawlThread1', 'crawlThread2', 'crawlThread3']
for name in crawl_name:
tCrawl = CrawThread(name, page_queue, data_queue)
g_crawl_list.append(tCrawl)
def create_parser_thread(data_queue, fp, lock):
crawl_name = ['parseThread1', 'parseThread2', 'parseThread3']
for name in crawl_name:
tParse = ParserThread(name, data_queue, fp, lock)
g_parse_list.append(tParse)
def main():
# 打开文件
fp = open('jian.json', 'a', encoding='utf8')
# 创建锁
lock = threading.Lock()
# 创建队列函数
page_queue, data_queue = create_queue()
# 创建 采集 线程
create_crawl_thread(page_queue, data_queue)
# 创建 解析 线程
create_parser_thread(data_queue, lock, fp)
# 启动所有 采集 线程
for tCrawl in g_crawl_list:
tCrawl.start()
# 启动所有 解析 线程
for tPrase in g_parse_list:
tPrase .start()
# 主线程等待子线程结束
for tCrawl in g_crawl_list:
tCrawl.join()
for tPrase in g_parse_list:
tPrase .join()
if __name__ == '__main__':
print("in startMain...")
print("-=" * 90)
main()