线程--管理单个进程内的并发操作
使用线程可以让程序在相同的进程空间中并发地运行多个操作。
内容目录
- 线程对象 threading.Thread
- 确定当前线程 getName()
- 守护线程与非守护线程 daemon
- 枚举所有线程 enumerate()
- 子类化线程
- 定时器线程 Timer()
- 线程之间的信号 Event
- 控制对资源的访问 Lock
- 重入锁 RLock
- 锁的上下文管理器with lock
- 同步线程 Condition Barrier
- 限制对资源的并发访问 Semaphore
- 线程特殊数据 local()
1.线程对象 threading.Thread
使用Thread
最简单的方法是用目标函数实例化它,并调用start()来让它开始工作。
import threading
def worker():
"""thread worker function"""
print('Worker')
threads = []
for i in range(5):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
结果:
Worker
Worker
Worker
Worker
Worker
生成一个线程并传递参数来告诉它该做什么工作。任何类型的对象都可以作为参数传递给线程。下面这个例子传递一个数字,然后线程就会打印出来。
import threading
def worker(num):
"""thread worker function"""
print(f'Worker: {num}')
threads = []
for i in range(5):
t = threading.Thread(target=worker,args=(i,))
threads.append(t)
t.start()
结果:
Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4
2.确定当前线程 getName()
使用参数来识别或命名线程是很麻烦且没有必要的。每一个线程实例都有一个带有默认值的名称,可以随着线程的创建而改变。
import threading
import time
def worker():
print(threading.current_thread().getName(), 'Starting')
time.sleep(0.2)
print(threading.current_thread().getName(), 'Exiting')
def my_service():
print(threading.current_thread().getName(), 'Starting')
time.sleep(0.3)
print(threading.current_thread().getName(), 'Exiting')
t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # use default name
w.start()
w2.start()
t.start()
结果:"Thread-1"对应没有命名的w2
worker Starting
Thread-1 Starting
my_service Starting
Thread-1 Exiting
worker Exiting
my_service Exiting
大多数程序不使用打印来调试。logging模块支持使用%(threadName)s
在每个日志消息中嵌入线程名称,并且也是线程安全的。
import logging
import threading
import time
def worker():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def my_service():
logging.debug('Starting')
time.sleep(0.3)
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] (%(threadName)-10s) %(message)s',
)
t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # use default name
w.start()
w2.start()
t.start()
结果:
[DEBUG] (worker ) Starting
[DEBUG] (Thread-1 ) Starting
[DEBUG] (my_service) Starting
[DEBUG] (worker ) Exiting
[DEBUG] (Thread-1 ) Exiting
[DEBUG] (my_service) Exiting
3.守护线程与非守护线程 daemon
守护 ---- 不阻塞主程序
非守护 -- 阻塞主程序
到目前为止,示例程序都是等所有线程都完成了它们的工作才退出,线程默认是非守护的。
import threading
import time
import logging
def daemon():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def non_daemon():
logging.debug('Starting')
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True) # 守护,不阻塞主程序
t = threading.Thread(name='non-daemon', target=non_daemon) # 默认非守护,阻塞主程序
d.start()
t.start()
结果:由于d是守护线程,主程序不必等待d完成就可以退出。
(daemon ) Starting
(non-daemon) Starting
(non-daemon) Exiting
要想使主程序等待守护线程完成任务后才能退出,可以使用join():
import threading
import time
import logging
def daemon():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def non_daemon():
logging.debug('Starting')
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
d.join()
t.join()
结果:
(daemon ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon ) Exiting
另外,d.join(0.1)表示等待d 0.1秒。
4.枚举所有线程 enumerate()
enumerate()返回一个活动线程实例列表
import random
import threading
import time
import logging
def worker():
"""thread worker function"""
pause = random.randint(1, 5) / 10
logging.debug('sleeping %0.2f', pause)
time.sleep(pause)
logging.debug('ending')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(3):
t = threading.Thread(target=worker, daemon=True)
t.start()
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is main_thread:
continue
logging.debug('joining %s', t.getName())
t.join()
结果:
(Thread-1 ) sleeping 0.40
(Thread-2 ) sleeping 0.50
(Thread-3 ) sleeping 0.10
(MainThread) joining Thread-1
(Thread-3 ) ending
(Thread-1 ) ending
(MainThread) joining Thread-2
(Thread-2 ) ending
(MainThread) joining Thread-3
5.子类化线程
在启动时,一个线程执行一些基本的初始化,然后调用它的run()方法,该方法调用传递给构造函数的目标函数。现在要创建线程的子类,覆盖run()来做任何必要的事情。
import threading
import logging
class MyThread(threading.Thread):
def run(self):
logging.debug('running')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
t = MyThread()
t.start()
结果:
(Thread-1 ) running
(Thread-2 ) running
(Thread-3 ) running
(Thread-4 ) running
(Thread-5 ) running
实例化的时候带参数:
import threading
import logging
class MyThreadWithArgs(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
super().__init__(group=group, target=target, name=name,
daemon=daemon)
self.args = args
self.kwargs = kwargs
def run(self):
logging.debug('running with %s and %s',
self.args, self.kwargs)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
t.start()
结果:
(Thread-1 ) running with (0,) and {'a': 'A', 'b': 'B'}
(Thread-2 ) running with (1,) and {'a': 'A', 'b': 'B'}
(Thread-3 ) running with (2,) and {'a': 'A', 'b': 'B'}
(Thread-4 ) running with (3,) and {'a': 'A', 'b': 'B'}
(Thread-5 ) running with (4,) and {'a': 'A', 'b': 'B'}
6.定时器线程 Timer()
threading.Timer()创建延时线程,可取消,取消后不在执行。
import threading
import time
import logging
def delayed():
logging.debug('worker running')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
t1 = threading.Timer(0.3, delayed) # 线程t1要在0.3s后执行
t1.setName('t1')
t2 = threading.Timer(0.3, delayed) # 线程t2要在0.3s后执行
t2.setName('t2')
logging.debug('starting timers')
t1.start()
t2.start()
logging.debug('waiting before canceling %s', t2.getName())
time.sleep(0.2)
logging.debug('canceling %s', t2.getName())
t2.cancel() # t2取消了,t2不会执行了
logging.debug('done')
结果:可以看到t2没有执行
(MainThread) starting timers
(MainThread) waiting before canceling t2
(MainThread) canceling t2
(MainThread) done
(t1 ) worker running
7.线程之间的信号 Event
尽管使用多线程的目的是并发地运行单独的操作,但是有时候,能够在两个或多个线程中同步操作也很重要。事件对象是安全地在线程之间进行通信的一种简单方法。
python线程的事件(Event)用于主线程控制其他线程的执行,事件主要提供了三个方法wait、clear、set,
一个事件管理一个内部标志flag,调用者可以用set()和clear()方法来控制它。其他线程可以使用wait()暂停直到flag被设置,有效地阻止进度,直到允许继续为止。
import logging
import threading
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
logging.debug('wait_for_event starting')
event_is_set = e.wait()
logging.debug('event set: %s', event_is_set)
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
while not e.is_set():
logging.debug('wait_for_event_timeout starting')
event_is_set = e.wait(t)
logging.debug('event set: %s', event_is_set)
if event_is_set:
logging.debug('processing event')
else:
logging.debug('doing other work')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
e = threading.Event()
t1 = threading.Thread(
name='block',
target=wait_for_event,
args=(e,),
)
t1.start()
t2 = threading.Thread(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
t2.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(0.3)
e.set()
logging.debug('Event is set')
结果:
(block ) wait_for_event starting
(nonblock ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(MainThread) Event is set
(nonblock ) event set: True
(block ) event set: True
(nonblock ) processing event
8.控制对资源的访问 Lock
除了同步线程的操作之外,还必须能够控制对共享资源的访问,以防止污染或遗漏数据。Python的内置数据结构(列表、字典等)是线程安全的,在Python中实现的其他数据结构,或者像整数和浮点数这样的简单类型,都没有这种保护。使用Lock
防止同时访问一个对象。
import logging
import random
import threading
import time
class Counter:
def __init__(self, start=0):
self.lock = threading.Lock() # 创建锁
self.value = start
def increment(self):
logging.debug('Waiting for lock')
self.lock.acquire() # 上锁
try:
logging.debug('Acquired lock')
self.value = self.value + 1
finally:
self.lock.release() # 释放锁
def worker(c):
for i in range(2):
pause = random.random()
logging.debug('Sleeping %0.02f', pause)
time.sleep(pause)
c.increment()
logging.debug('Done')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
counter = Counter() # 实例化
for i in range(2):
t = threading.Thread(target=worker, args=(counter,))
t.start()
logging.debug('Waiting for worker threads')
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is not main_thread:
t.join()
logging.debug('Counter: %d', counter.value)
结果:在这个例子中,worker()函数增加一个Counter实例,它管理一个锁,以防止两个线程同时改变其内部状态。如果没有使用锁,那么就有可能丢失value属性的更改。
(Thread-1 ) Sleeping 0.80
(Thread-2 ) Sleeping 0.70
(MainThread) Waiting for worker threads
(Thread-2 ) Waiting for lock
(Thread-2 ) Acquired lock
(Thread-2 ) Sleeping 0.01
(Thread-2 ) Waiting for lock
(Thread-2 ) Acquired lock
(Thread-2 ) Done
(Thread-1 ) Waiting for lock
(Thread-1 ) Acquired lock
(Thread-1 ) Sleeping 0.63
(Thread-1 ) Waiting for lock
(Thread-1 ) Acquired lock
(Thread-1 ) Done
(MainThread) Counter: 4
由于lock.acquire()
会阻塞其他进程,可以用have_it = lock.acquire(0)
的方式无阻塞的尝试获取锁, lock_holder()在保持和释放锁之间的循环,worker()无阻塞地进行尝试上锁。
import logging
import threading
import time
def lock_holder(lock):
logging.debug('Starting')
while True:
lock.acquire()
try:
logging.debug('Holding')
time.sleep(0.5)
finally:
logging.debug('Not holding')
lock.release()
time.sleep(0.5)
def worker(lock):
logging.debug('Starting')
num_tries = 0
num_acquires = 0
while num_acquires < 3:
time.sleep(0.5)
logging.debug('Trying to acquire')
have_it = lock.acquire(0)
try:
num_tries += 1
if have_it:
logging.debug('Iteration %d: Acquired',
num_tries)
num_acquires += 1
else:
logging.debug('Iteration %d: Not acquired',
num_tries)
finally:
if have_it:
lock.release()
logging.debug('Done after %d iterations', num_tries)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
holder = threading.Thread(
target=lock_holder,
args=(lock,),
name='LockHolder',
daemon=True,
)
holder.start()
worker = threading.Thread(
target=worker,
args=(lock,),
name='Worker',
)
worker.start()
结果:
(LockHolder) Starting
(LockHolder) Holding
(Worker ) Starting
(LockHolder) Not holding
(Worker ) Trying to acquire
(Worker ) Iteration 1: Acquired
(LockHolder) Holding
(Worker ) Trying to acquire
(Worker ) Iteration 2: Not acquired
(LockHolder) Not holding
(Worker ) Trying to acquire
(Worker ) Iteration 3: Acquired
(LockHolder) Holding
(Worker ) Trying to acquire
(Worker ) Iteration 4: Not acquired
(LockHolder) Not holding
(Worker ) Trying to acquire
(Worker ) Iteration 5: Acquired
(Worker ) Done after 5 iterations
9.重入锁 RLock
正常的Lock对象不能被多次获取,即使是相同的线程。如果一个锁被同一个调用链中的多个函数访问,那么这会带来不希望的副作用。
import threading
lock = threading.Lock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))
结果:lock.acquire(0),0超时防止阻塞
First try : True
Second try: False
在这种情况下,来自同一线程的独立代码需要“重新获得”锁,而是使用RLock。
import threading
lock = threading.RLock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))
结果:
First try : True
Second try: True
10.锁的上下文管理器with lock
锁实现上下文管理器API,并且与with语句兼容。使用消除了显式获取和释放锁的需要
import threading
import logging
def worker_with(lock):
with lock:
logging.debug('Lock acquired via with')
def worker_no_with(lock):
lock.acquire()
try:
logging.debug('Lock acquired directly')
finally:
lock.release()
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))
w.start()
nw.start()
结果:这两个函数workerwith()和workernowith()以等价的方式管理锁。
(Thread-1 ) Lock acquired via with
(Thread-2 ) Lock acquired directly
11.同步线程 Condition Barrier
除了使用 Event
之外,另一种同步线程的方法是使用一个Condition
对象。因为Condition
使用Lock
,所以它可以绑定到共享资源,允许多个线程等待资源更新。在这个例子中, consumer()
线程在运行之前等待Condition
设置。producer()
线程负责设置Condition
,并通知其他线程可以继续执行了。
import logging
import threading
import time
def consumer(cond):
"""wait for the condition and use the resource"""
logging.debug('Starting consumer thread')
with cond:
cond.wait()
logging.debug('Resource is available to consumer')
def producer(cond):
"""set up the resource to be used by the consumer"""
logging.debug('Starting producer thread')
with cond:
logging.debug('Making resource available')
cond.notifyAll()
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-2s) %(message)s',
)
condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
args=(condition,))
p = threading.Thread(name='p', target=producer,
args=(condition,))
c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()
结果,这里用了with
,显式地用acquire() 和 release()也可以。
2018-05-01 15:39:49,561 (c1) Starting consumer thread
2018-05-01 15:39:49,761 (c2) Starting consumer thread
2018-05-01 15:39:49,962 (p ) Starting producer thread
2018-05-01 15:39:49,962 (p ) Making resource available
2018-05-01 15:39:49,963 (c1) Resource is available to consumer
2018-05-01 15:39:49,964 (c2) Resource is available to consumer
Barrier是另一个线程同步机制
threading.Barrier(parties, action=None, timeout=None)
构建Barrier对象,parties 指定参与方数目,timeout是wait方法未指定时超时的默认值。
n_waiting 当前在栅栏中等待的线程数
parties 通过栅栏所需的线程数
wait(timeout=None) 等待通过栅栏,返回0到线程数-1的整数(barrier_id),每个线程返回不同。如果wait方法设置了超时,并超时发送,栅栏将处于broken状态。
import threading
import time
def worker(barrier):
print(threading.current_thread().name,
'waiting for barrier with {} others'.format(
barrier.n_waiting))
worker_id = barrier.wait()
print(threading.current_thread().name, 'after barrier',
worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS)
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for t in threads:
print(t.name, 'starting')
t.start()
time.sleep(0.1)
for t in threads:
t.join()
结果:在这个例子中,Barrier被配置为阻塞,直到三个线程正在等待。当条件满足时,所有的线程都会在同一时间通过控制点。
worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-0 after barrier 0
worker-1 after barrier 1
abort() 将Barrie置于broken状态,等待中的线程或者调用等待方法的线程都会抛出threading.BrokenBarrieError异常
import threading
import time
def worker(barrier):
print(threading.current_thread().name,
'waiting for barrier with {} others'.format(
barrier.n_waiting))
try:
worker_id = barrier.wait()
except threading.BrokenBarrierError:
print(threading.current_thread().name, 'aborting')
else:
print(threading.current_thread().name, 'after barrier',
worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS + 1)
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for t in threads:
print(t.name, 'starting')
t.start()
time.sleep(0.5)
barrier.abort()
for t in threads:
t.join()
结果:期望阻塞的数量比实际线程数多一个,这样就都阻塞了,这时候用abort()就中止了所以线程
worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-0 aborting
worker-1 aborting
worker-2 aborting
12.限制对资源的并发访问 Semaphore
有时可能需要允许多个工作线程同时访问一个资源,但要限制总数。例如,连接池支持同时连接,但数目可能是固定的,或者一个网络应用可能支持固定数目的并发下载。这些连接就可以使用Semaphore来管理。
import logging
import random
import threading
import time
class ActivePool:
def __init__(self):
super(ActivePool, self).__init__()
self.active = []
self.lock = threading.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
logging.debug('Running: %s', self.active)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
logging.debug('Running: %s', self.active)
def worker(s, pool):
logging.debug('Waiting to join the pool')
# with上下文
with s:
name = threading.current_thread().getName()
pool.makeActive(name)
time.sleep(0.1)
pool.makeInactive(name)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-2s) %(message)s',
)
pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
t = threading.Thread(
target=worker,
name=str(i),
args=(s, pool),
)
t.start()
结果:可以看到每次最多同时有2个线程运行,添加新线程要在其中有线程完成工作之后,否则会等待。
2018-05-01 18:18:46,359 (0 ) Waiting to join the pool
2018-05-01 18:18:46,360 (0 ) Running: ['0']
2018-05-01 18:18:46,360 (1 ) Waiting to join the pool
2018-05-01 18:18:46,361 (1 ) Running: ['0', '1']
2018-05-01 18:18:46,361 (2 ) Waiting to join the pool
2018-05-01 18:18:46,362 (3 ) Waiting to join the pool
2018-05-01 18:18:46,460 (0 ) Running: ['1']
2018-05-01 18:18:46,460 (2 ) Running: ['1', '2']
2018-05-01 18:18:46,461 (1 ) Running: ['2']
2018-05-01 18:18:46,461 (3 ) Running: ['2', '3']
2018-05-01 18:18:46,560 (2 ) Running: ['3']
2018-05-01 18:18:46,561 (3 ) Running: []
13.线程特殊数据 local()
虽然有些资源需要被锁定,以便多个线程可以使用它们,但也需要保护其他资源,以便将它们隐藏在不拥有它们的线程中。local()
类创建了一个能够在单独的线程中隐藏值的对象,其他线程看不到,尽管名字一样。
import random
import threading
import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('No value yet')
else:
logging.debug('value=%s', val)
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)
for i in range(2):
t = threading.Thread(target=worker, args=(local_data,))
t.start()
结果:
(MainThread) No value yet
(MainThread) value=1000
(Thread-1 ) No value yet
(Thread-1 ) value=64
(Thread-2 ) No value yet
(Thread-2 ) value=99
为了初始化设置,所有的线程都以相同的值开始,使用一个子类并在init()中设置属性。
import random
import threading
import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('No value yet')
else:
logging.debug('value=%s', val)
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
class MyLocal(threading.local):
def __init__(self, value):
super().__init__()
logging.debug('Initializing %r', self)
self.value = value
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
local_data = MyLocal(1000)
show_value(local_data)
for i in range(2):
t = threading.Thread(target=worker, args=(local_data,))
t.start()
结果:利用 初始化 给每个调用local_data实例的线程赋初值。
(MainThread) Initializing <__main__.MyLocal object at 0x00000000026014C8>
(MainThread) value=1000
(Thread-1 ) Initializing <__main__.MyLocal object at 0x00000000026014C8>
(Thread-1 ) value=1000
(Thread-1 ) value=52
(Thread-2 ) Initializing <__main__.MyLocal object at 0x00000000026014C8>
(Thread-2 ) value=1000
(Thread-2 ) value=94