7. 多线程并发相关问题和解决技巧

一. 如何使用多线程

实际案例

https://intrinio.com/tutorial/web_api

我们通过上述网站提供的API获取了股市信息的CSV数据,
现在要下载大量CSV数据文件, 并将其转化为xml文件

如何使用线程来提高下载并处理的效率?

解决方案

  • 使用标准库 threading.Thread 创建线程,在每一个线程中下载并转换一只股票数据
import requests
import base64
from io import StringIO
import csv
from xml.etree.ElementTree import ElementTree, Element, SubElement

USERNAME = b'7f304a2df40829cd4f1b17d10cda0304'
PASSWORD = b'aff978c42479491f9541ace709081b99'

def download_csv(page_number):
    print('download csv data [page=%s]' % page_number)
    url = "http://api.intrinio.com/prices.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s" % page_number
    auth = b'Basic ' + base64.b64encode(b'%s:%s' % (USERNAME, PASSWORD))
    headers = {'Authorization' : auth}
    response = requests.get(url, headers=headers)

    if response.ok:
        return StringIO(response.text)

def csv_to_xml(csv_file, xml_path):
    print('Convert csv data to %s' % xml_path)
    reader = csv.reader(csv_file)
    headers = next(reader)

    root = Element('Data')
    root.text = '\n\t'
    root.tail = '\n'

    for row in reader:
        book = SubElement(root, 'Row')
        book.text = '\n\t\t'
        book.tail = '\n\t'

        for tag, text in zip(headers, row):
            e = SubElement(book, tag)
            e.text = text
            e.tail = '\n\t\t'
        e.tail = '\n\t'

    ElementTree(root).write(xml_path, encoding='utf8')

def download_and_save(page_number, xml_path):
    # IO
    csv_file = None
    while not csv_file:
        csv_file = download_csv(page_number)
    # CPU
    csv_to_xml(csv_file, 'data%s.xml' % page_number)

from threading import Thread
class MyThread(Thread):
    def __init__(self, page_number, xml_path):
        super().__init__()
        self.page_number = page_number
        self.xml_path = xml_path

    def run(self):
        download_and_save(self.page_number, self.xml_path)

if __name__ == '__main__':
    import time
    t0 = time.time()
    thread_list = []
    for i in range(1, 6):
        t = MyThread(i, 'data%s.xml' % i)
      # t = Thread(target=download_and_save, args=(1, 'data%s.xml' % i))

        t.start()
        thread_list.append(t)

    for t in thread_list:
        t.join()    # 主线程 join等待  子线程完成

    print(time.time() - t0)
    print('main thread end.')

二. 如何线程中通信

实际案例

在上一节中,  我们从 intrinio.com 下载多只股票的csv数据, 并将其转换为 xml 文件

在python中由于全局解释器锁 (GIL)的存在, 多线程CPU密集型操作并不能提高执行效率, 我们修改程序架构:

1. 使用多个 DownloadThread 线程进行下载(I/O)
2. 使用多个 ConvertThread  线程进行转换(CPU)
3. 下载线程把下载数据安全地传递给转换线程

GIL

  • 在每个进程中, 存在一把GIL, 该进程中的线程间共享GIL
  • 多线程进行时, 只有有GIL的那个线程能运行
  • 通过线程间快速 传递GIL, 达到表象上的多线程, 其实同一时间只有一个线程在工作

解决方案

  • 使用标准库中的 queue.Queue, 它是一个线程安全的队列

  1. Download 线程把下载数据放入队列
  2. Convert 线程从队列里提取数据
import requests
import base64
from io import StringIO
import csv
from xml.etree.ElementTree import ElementTree, Element, SubElement
from threading import Thread

USERNAME = b'7f304a2df40829cd4f1b17d10cda0304'
PASSWORD = b'aff978c42479491f9541ace709081b99'

# class MyThread(Thread):
#     def __init__(self, page_number, xml_path):
#         super().__init__()
#         self.page_number = page_number
#         self.xml_path = xml_path
#
#     def run(self):
#         download_and_save(self.page_number, self.xml_path)

class DownloadThread(Thread):
    def __init__(self, page_number, queue):
        super().__init__()
        self.page_number = page_number
        self.queue = queue

    def run(self):
        csv_file = None
        while not csv_file:
            csv_file = self.download_csv(self.page_number)
        self.queue.put((self.page_number, csv_file))   # 放入队列

    def download_csv(self, page_number):
        print('download csv data [page=%s]' % page_number)
        url = "http://api.intrinio.com/prices.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s" % page_number
        auth = b'Basic ' + base64.b64encode(b'%s:%s' % (USERNAME, PASSWORD))
        headers = {'Authorization' : auth}
        response = requests.get(url, headers=headers)

        if response.ok:
            return StringIO(response.text)

class ConvertThread(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:  # 无限循环地从队列中  获取数据
            page_number, csv_file = self.queue.get()
            self.csv_to_xml(csv_file, 'data%s.xml' % page_number)

    def csv_to_xml(self, csv_file, xml_path):
        print('Convert csv data to %s' % xml_path)
        reader = csv.reader(csv_file)
        headers = next(reader)

        root = Element('Data')
        root.text = '\n\t'
        root.tail = '\n'

        for row in reader:
            book = SubElement(root, 'Row')
            book.text = '\n\t\t'
            book.tail = '\n\t'

            for tag, text in zip(headers, row):
                e = SubElement(book, tag)
                e.text = text
                e.tail = '\n\t\t'
            e.tail = '\n\t'

        ElementTree(root).write(xml_path, encoding='utf8')


from queue import Queue   # collections中的queue 不带锁, 不是线程安全的

if __name__ == '__main__':
    queue = Queue()  # 队列实例
    thread_list = []
    for i in range(1, 6):
        t = DownloadThread(i, queue)
        t.start()
        thread_list.append(t)

    convert_thread = ConvertThread(queue)
    convert_thread.start()

    for t in thread_list:
        t.join()
    print('main thread end.')

三. 如何在线程间进行事件通知?

实际案例

在上一节中, 我们从 intrinio.com 下载多只股票的 csv 数据, 并将其转换为 xml 文件。


额外需求:
实现一个线程 TarThread, 将转换出的 xml 文件压缩打包, 比如转换线程每生产出100个 xml 文件, 
就通知打包线程将它们打包成一个 xxx.tgz 文件, 并删除 xml 文件。 
打包完成后, 打包线程反过来通知转换线程, 转换线程继续转换。

解决方案

线程间的事件通知, 可以使用标准库中 Threading.Event

  1. 等待事件一端调用wait, 等待事件
  2. 通知事件一端调用 set, 通知事件

代码

import requests
import base64
from io import StringIO
import csv
from xml.etree.ElementTree import ElementTree, Element, SubElement
from threading import Thread
from queue import Queue
import tarfile
import os

USERNAME = b'7f304a2df40829cd4f1b17d10cda0304'
PASSWORD = b'aff978c42479491f9541ace709081b99'

class DownloadThread(Thread):
    def __init__(self, page_number, queue):
        super().__init__()
        self.page_number = page_number
        self.queue = queue

    def run(self):
        csv_file = None
        while not csv_file:
            csv_file = self.download_csv(self.page_number)
        self.queue.put((self.page_number, csv_file))

    def download_csv(self, page_number):
        print('download csv data [page=%s]' % page_number)
        url = "http://api.intrinio.com/prices.csv?ticker=AAPL&hide_paging=true&page_size=100&page_number=%s" % page_number
        auth = b'Basic ' + base64.b64encode(b'%s:%s' % (USERNAME, PASSWORD))
        headers = {'Authorization' : auth}
        response = requests.get(url, headers=headers)

        if response.ok:
            return StringIO(response.text)

class ConvertThread(Thread):
    def __init__(self, queue, c_event, t_event):
        super().__init__()
        self.queue = queue
        self.c_event = c_event
        self.t_event = t_event

    def run(self):
        count = 0
        while True:
            page_number, csv_file = self.queue.get()
            if page_number == -1:
                self.c_event.set()
                self.t_event.wait()
                break

            self.csv_to_xml(csv_file, 'data%s.xml' % page_number)
            count += 1
            if count == 2: # 有了2个xml文件, 就通知开始打包
                count = 0
                # 通知转换完成
                self.c_event.set()
                
                # 等待打包完成
                self.t_event.wait()
                self.t_event.clear()
                

    def csv_to_xml(self, csv_file, xml_path):
        print('Convert csv data to %s' % xml_path)
        reader = csv.reader(csv_file)
        headers = next(reader)

        root = Element('Data')
        root.text = '\n\t'
        root.tail = '\n'

        for row in reader:
            book = SubElement(root, 'Row')
            book.text = '\n\t\t'
            book.tail = '\n\t'

            for tag, text in zip(headers, row):
                e = SubElement(book, tag)
                e.text = text
                e.tail = '\n\t\t'
            e.tail = '\n\t'

        ElementTree(root).write(xml_path, encoding='utf8')

class TarThread(Thread):  # 守护线程
    def __init__(self, c_event, t_event):
        super().__init__(daemon=True)   # daemon 表示 是 守护线程, 主线程结束时, 该线程自动结束
        self.count = 0
        self.c_event = c_event
        self.t_event = t_event

    def run(self):
        while True:
            # 等待足够的xml
            self.c_event.wait()   # 等待 转换完成的 通知
            self.c_event.clear()  # 为了 下次还能使用event 要clear
            
            print('DEBUG')
            # 打包
            self.tar_xml()

            # 通知打包完成
            self.t_event.set()

    def tar_xml(self):
        self.count += 1
        tfname = 'data%s.tgz' % self.count
        print('tar %s...' % tfname)
        tf = tarfile.open(tfname, 'w:gz')
        for fname in os.listdir('.'):
            if fname.endswith('.xml'):
                tf.add(fname)
                os.remove(fname)
        tf.close()

        if not tf.members:
            os.remove(tfname)

from threading import Event

if __name__ == '__main__':
    queue = Queue()
    c_event= Event()  # 转换事件
    t_event= Event()  # 打包事件
    thread_list = []
    for i in range(1, 15):
        t = DownloadThread(i, queue)
        t.start()
        thread_list.append(t)

    convert_thread = ConvertThread(queue, c_event, t_event)
    convert_thread.start()

    tar_thread = TarThread(c_event, t_event)
    tar_thread.start()
    
    # 等待下载线程结束
    for t in thread_list:
        t.join()

    # 通知Convert线程退出
    queue.put((-1, None))

    # 等待转换线程结束
    convert_thread.join()
    print('main thread end.')


四.如何使用线程本地数据?

实际案例

我们实现了一个 web 视频监控服务器, 服务器端采集摄像头数据,客户端使用浏览器通过 http 请求接收数据。
服务器使用推送的方式 (multipart/x-mixed-replace) 一直使用一个 tcp 连接向客户端传递数据。
这种方式将持续占用一个线程, 导致单线程服务器无法处理多客户端请求。

改写程序, 在每个线程中处理一个客户端请求, 支持多客户端访问。

解决方案

  • threading.local 函数可以创建线程本地数据空间, 其下属性对每个线程独立存在

五. 如何使用线程池?

实际案例

上一节实现了一个多线程 web 视频监控服务器,
由于我们服务器资源有限(CPU,内存, 带宽), 
需要对请求连接数(线程数)做限制, 避免因资源耗尽而瘫痪。

可以使用线程池,替代原来的每次请求创建线程。
即 提前创建固定数量的线程, 用的时候去里面取, 避免创建过多的线程。

解决方法

使用标准库中concurrent.futures 下的ThreadPoolExecutor 
对象的 submit和map方法可以用来启动线程池中线程执行任务

使用方法

import threading, time, random

def f(a, b):
  print(threading.current_thread().name, ":", a, b)
  time.sleep(random.randint(5, 10))
  return a*b

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(3)
future = executor.submit(f, 2, 3)   # 会返回future对象

future.reslut()  # 可以得到6  , 但这是一个阻塞 函数, 他会等的爱   f执行完


# 让线程池中的 每个线程都执行 f函数   可以使用map 实现

executor.map(f, range(1,6), range(2,7))  # 即1,2    2,3    3,4  。。。。。为参数

# map返回生成器, 可以用list包裹, 得到每个线程的结果
list(executor.map(f, range(1,6), range(2,7)))   # [2,6,12,20,30]

综合实现web监控服务器

import os, cv2, time, struct, threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import TCPServer, ThreadingTCPServer
from threading import Thread, RLock
from select import select

class JpegStreamer(Thread):
    def __init__(self, camera):
        super().__init__()
        self.cap = cv2.VideoCapture(camera)
        self.lock = RLock()
        self.pipes = {}

    def register(self):
        pr, pw = os.pipe()
        self.lock.acquire()
        self.pipes[pr] = pw
        self.lock.release()
        return pr

    def unregister(self, pr):
        self.lock.acquire()
        pw = self.pipes.pop(pr)
        self.lock.release()
        os.close(pr)
        os.close(pw)

    def capture(self):
        cap = self.cap
        while cap.isOpened():
            ret, frame = cap.read()
            if ret:
                ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
                yield data.tostring()

    def send_frame(self, frame):
        n = struct.pack('l', len(frame))
        self.lock.acquire()
        if len(self.pipes):
            _, pipes, _ = select([], self.pipes.values(), [], 1)
            for pipe in pipes:
                os.write(pipe, n)
                os.write(pipe, frame)
        self.lock.release()

    def run(self):
        for frame in self.capture():
            self.send_frame(frame)

class JpegRetriever:
    def __init__(self, streamer):
        self.streamer = streamer
        self.local = threading.local()

    def retrieve(self):
        while True:
            ns = os.read(self.local.pipe, 8)
            n = struct.unpack('l', ns)[0]
            data = os.read(self.local.pipe, n)
            yield data

    def __enter__(self):
        if hasattr(self.local, 'pipe'):
            raise RuntimeError()

        self.local.pipe = streamer.register()
        return self.retrieve()

    def __exit__(self, *args):
        self.streamer.unregister(self.local.pipe)
        del self.local.pipe
        return True

class WebHandler(BaseHTTPRequestHandler):
    retriever = None

    @staticmethod
    def set_retriever(retriever):
        WebHandler.retriever = retriever

    def do_GET(self):
        if self.retriever is None:
            raise RuntimeError('no retriver')

        if self.path != '/':
            return

        self.send_response(200) 
        self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=jpeg_frame')
        self.end_headers()

        with self.retriever as frames:
            for frame in frames:
                self.send_frame(frame)

    def send_frame(self, frame):
        sh  = b'--jpeg_frame\r\n'
        sh += b'Content-Type: image/jpeg\r\n'
        sh += b'Content-Length: %d\r\n\r\n' % len(frame)
        self.wfile.write(sh)
        self.wfile.write(frame)

from concurrent.futures import ThreadPoolExecutor
class ThreadingPoolTCPServer(ThreadingTCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, thread_n=100):
        super().__init__(server_address, RequestHandlerClass, bind_and_activate=True)

        self.executor = ThreadPoolExecutor(thread_n)

    def process_request(self, request, client_address):
        self.executor.submit(self.process_request_thread, request, client_address)

if __name__ == '__main__':
    # 创建Streamer,开启摄像头采集。
    streamer = JpegStreamer(0)
    streamer.start()

    # http服务创建Retriever
    retriever = JpegRetriever(streamer)
    WebHandler.set_retriever(retriever)

    # 开启http服务器
    HOST = 'localhost'
    PORT = 9000
    print('Start server... (http://%s:%s)' % (HOST, PORT))
    httpd = ThreadingPoolTCPServer((HOST, PORT), WebHandler, thread_n=3)
    #httpd = ThreadingTCPServer((HOST, PORT), WebHandler)
    httpd.serve_forever()


六. 如何使用多进程

实际案例

由于python中全局解释器锁GIL的存在, 在任意时刻只允许一个线程在解释器中运行。
因此python的多线程不适合处理cpu密集型的任务。


想要处理cpu密集型的任务, 可以使用多进程模型。

解决方案

使用标准库中的'multiprocessing.Process', 它可以启动子进程执行任务。
操作接口, 进程间通信, 进程间同步等都与'Threading.Thread'类似。

判断是否为水仙花数

from threading import Thread
from multiprocessing import Process
from queue import Queue as Thread_Queue   # 线程的queue
from multiprocessing import Queue as Process_Queue   # 进程的queue

def is_armstrong(n):
    a, t = [], n
    while t:
        a.append(t % 10)
        t //= 10
    k = len(a)
    return sum(x ** k for x in a) == n

def find_armstrong(a, b, q=None):
    res = [x for x in range(a, b) if is_armstrong(x)]
    if q:
        q.put(res)
    return res

def find_by_thread(*ranges):
    q = Thread_Queue()
    workers = []
    for r in ranges:
        a, b = r
        t = Thread(target=find_armstrong, args=(a, b, q))
        t.start()
        workers.append(t)

    res = []
    for _ in range(len(ranges)):
        res.extend(q.get())

    return res

def find_by_process(*ranges):
    q = Process_Queue()
    workers = []
    for r in ranges:
        a, b = r
        t = Process(target=find_armstrong, args=(a, b, q))
        t.start()
        workers.append(t)

    res = []
    for _ in range(len(ranges)):
        res.extend(q.get())

    return res


    
if __name__ == '__main__':
    import time
    t0 = time.time()
    #res = find_by_thread([10000000, 15000000], [15000000, 20000000], 
    #                     [20000000, 25000000], [25000000, 30000000]) 
    res = find_by_process([10000000, 15000000], [15000000, 20000000], 
                         [20000000, 25000000], [25000000, 30000000]) 
    print(res)
    print(time.time() - t0)

猜你喜欢

转载自blog.csdn.net/weixin_41207499/article/details/83652565