python Thrading demo - 线程

python Thrading demo - 线程

demo_1


# -*- coding: utf-8 -*-

import threading, time


def run(num):
    pass
    print("subThread({}) is start...".format(threading.current_thread().name))

    time.sleep(0.5)
    print(num)
    time.sleep(0.5)

    print("subThread({}) is stop.".format(threading.current_thread().name))


if __name__ == '__main__':
    pass
    print("mainThreading({}) is starting...".format(threading.current_thread().name))

    # create subThreading
    """
    target=function
    name = subThradingName
    args = subThreading input args(tupe)
    """
    subThreading = threading.Thread(target=run, name='runThreading', args=(1,))
    subThreading.start()

    # wait the mainThreading stop,then subThreading is stop
    subThreading.join() # if not write this line ,the mainThreading is stop befor subThreading

    print("mainThreading({}) is stop.".format(threading.current_thread().name))

demo_2 多线程

  1. 定义全局变量,便于资源共享
  2. 不定义线程锁。输出数字混乱
  3. 定义线程函数,并声明全局变量的使用范围,使用with对锁操作
# -*- coding: utf-8 -*-

import threading

num = 100

def run(n):
    global num

    for i in range(1000000):
        num = num + n
        num = num - n


if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=(6, ))
    t2 = threading.Thread(target=run, args=(9, ))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print("num = {}".format(num))
    print("mainThreading({}) is stop.".format(threading.current_thread().name))

demo_3 多线程_Lock

  1. 定义全局变量,便于资源共享
  2. 定义线程锁,只有当一个线程结束后,锁才会被释放,锁不被线程共享
  3. 定义线程函数,并声明全局变量的使用范围,使用with对锁操作
# -*- coding: utf-8 -*-

import threading

num = 100
lock = threading.Lock()

def run(n):
    global num

    for i in range(1000000):
        with lock:
            num = num + n
            num = num - n


if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=(6, ))
    t2 = threading.Thread(target=run, args=(9, ))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print("num = {}".format(num))
    print("mainThreading({}) is stop.".format(threading.current_thread().name))

demo_4 一起过马路

  1. 凑够3人才能过马路
    bar = threading.Barrier(3)
# -*- coding: utf-8 -*-

import threading, time

# 凑够3人才能过马路
bar = threading.Barrier(3)


def run():
    print("{} is starting...".format(threading.current_thread().name))

    time.sleep(1)
    bar.wait()

    print("{} is end.".format(threading.current_thread().name))


if __name__ == '__main__':
    for i in range(5):
        threading.Thread(target=run).start()

demo_5 Threading.Time

定时线程

# -*- coding: utf-8 -*-

import threading

def run():
    print('-*' * 10)
    print("runThreading is end.")


if __name__ == '__main__':
    print("mainThreading is start...")
    timeThreading = threading.Timer(3, run)
    timeThreading.start()
    timeThreading.join()
    print("mainThreading is end.")




多线程爬取demo

业务流

业务流

#!/usr/bin/python3
# -*- coding:utf-8 -*-
# @Software  : PyCharm
# @CreateTime: 2019-12-23 09:36
# @Author    : spider
# @File      : pyThread

import sys
import time
import pprint
import json
import re
from lxml import etree

import requests

import threading
from queue import Queue

"""
2类线程:3个下载,3个解析
内容队列:下载线程往队列中put数据,解析线程从队列中get数据
url队列: 下载线程从url队列get数据
写数据:上锁,保证文件不能同时被修改
"""

g_crawl_list = []
g_parse_list = []

class CrawThread(threading.Thread):
    def __init__(self, name, page_queue, data_queue):
        super(CrawThread, self).__init__()
        self.name = name
        self.page_queue = page_queue
        self.data_queue = data_queue
        self.url = r"{}"
        self.hreaders = {}

    def run(self):
        print("{} ---------- crawl_thread start".format(self.name))
        while True:
            if self.page_queue.empty:
                break
            # 从队列中取出页码
            page = self.page_queue.get()

            # 拼接url,发送请求
            url = self.url.format(page)
            res = requests.get(url, headers=self.hreaders)
            if res.ok:
                # 将响应内容存放到data_queue
                self.data_queue.put(res.text)
        print("{} ---------- crawl_thread stop".format(self.name))



class ParserThread(threading.Thread):
    def __init__(self, name, data_queue, fp, lock):
        super(ParserThread, self).__init__()
        self.name = name
        self.data_queue = data_queue
        self.lock = lock
        self.fp = fp

    def run(self):
        while True:
            if self.data_queue.empty():
                break
        print("{} ---------- parse_thread start".format(self.name))
        # 从data_queue中取出一页数据
        pageSourceHtml = self.data_queue.get()
        # 解析内容
        self.parse_content(pageSourceHtml)
        print("{} ---------- parse_thread stop".format(self.name))

    def parse_content(self, pageSourceHtml):
        tree = etree.HTML(pageSourceHtml)
        li_list = tree.xpath("//li")

        items = []
        for oli in li_list:
            title = 'title'
            imgLink = 'imgLink'
            item = {
                'title':title,
                'imgLink':imgLink,
            }
            items.append(item)
        # write to jsonFile
        self.lock.acquire()
        self.fp.write(json.dumps(items), ensure_acsii=False)
        self.lock.release()

def function():
    print("in function...")
    print("-=" * 90)
    try:
        pass
    except Exception as e:
        print('\nLine_{:0>5d} in {} - {}'.format(
            sys._getframe().f_lineno, __file__, e))
    finally:
        pass

def create_queue():
    # 创建 页码 队列
    page_queue = Queue()
    for page in range(1, 11):
        page_queue.put(page)

    # 创建 内存 队列
    data_queue = Queue()
    return page_queue, data_queue

def create_crawl_thread(page_queue, data_queue):
    crawl_name = ['crawlThread1', 'crawlThread2', 'crawlThread3']
    for name in crawl_name:
        tCrawl = CrawThread(name, page_queue, data_queue)
        g_crawl_list.append(tCrawl)

def create_parser_thread(data_queue, fp, lock):
    crawl_name = ['parseThread1', 'parseThread2', 'parseThread3']
    for name in crawl_name:
        tParse = ParserThread(name, data_queue, fp, lock)
        g_parse_list.append(tParse)

def main():
    # 打开文件
    fp = open('jian.json', 'a', encoding='utf8')
    # 创建锁
    lock = threading.Lock()
    # 创建队列函数
    page_queue, data_queue = create_queue()

    # 创建 采集 线程
    create_crawl_thread(page_queue, data_queue)
    # 创建 解析 线程
    create_parser_thread(data_queue, lock, fp)

    # 启动所有 采集 线程
    for tCrawl in g_crawl_list:
        tCrawl.start()
    # 启动所有 解析 线程
    for tPrase in g_parse_list:
        tPrase .start()
        
    # 主线程等待子线程结束
    for tCrawl in g_crawl_list:
        tCrawl.join()
    for tPrase in g_parse_list:
        tPrase .join()



if __name__ == '__main__':
    print("in startMain...")
    print("-=" * 90)
    main()
发布了85 篇原创文章 · 获赞 27 · 访问量 16万+

猜你喜欢

转载自blog.csdn.net/qq_22038327/article/details/102550990