– Start
当多个线程对同一个对象进行读写操作时将引发并发问题, 学过数据库的人都知道并发将导致脏读和丢失更新等错误, 本文将介绍 Python 是如何解决并发问题的.
Lock 类
from threading import Thread
from threading import Lock
# 定义账户类
class Account:
def __init__(self, debit):
self.debit = debit
self.lock = Lock()
def increase(self, amount):
self.lock.acquire()
try:
self.debit = self.debit + amount
finally:
self.lock.release()
# 启动两个线程同时转账
def tansfer(*args):
account, = args
for i in range(1000000):
account.increase(1)
account = Account(0)
args = (account, )
t1 = Thread(target=tansfer, args=args)
t2 = Thread(target=tansfer, args=args)
t1.start()
t2.start()
# 等待 t1 和 t2 结束,打印结果
t1.join()
t2.join()
print(account.debit)
with 语句
def increase(self, amount):
with self.lock:
self.debit = self.debit + amount
RLock 类
from threading import Thread
from threading import RLock
# 定义账户类
class Account:
def __init__(self, debit):
self.debit = debit
self.lock = RLock()
self.credit = 0
def increase(self, amount):
with self.lock:
self.decreaseCredit(amount)
self.debit = self.debit + amount
def decreaseCredit(self, amount):
with self.lock:
self.credit = self.credit - amount
# 启动两个线程同时转账
def tansfer(*args):
account, = args
for i in range(1000000):
account.increase(1)
account = Account(0)
args = (account, )
t1 = Thread(target=tansfer, args=args)
t2 = Thread(target=tansfer, args=args)
t1.start()
t2.start()
# 等待 t1 和 t2 结束,打印结果
t1.join()
t2.join()
print(account.debit)
BoundedSemaphore 类
import threading
from threading import Thread
from threading import BoundedSemaphore
from datetime import datetime
# 定义服务类
class Service:
def __init__(self):
# 同时提供 3 个服务
self.available = BoundedSemaphore(value=3)
def serve(self):
with self.available:
print(f'{datetime.now()} -- serving {threading.current_thread().getName()}')
# 启动 10 个线程
service = Service()
for i in range(10):
Thread(target=lambda service: service.serve(), args=(service, )).start()
Event 类
import threading
from threading import Thread
from threading import Event
from datetime import datetime
def my_task(*args):
event, = args
event.wait() # 等待
print(f'{datetime.now()} -- serving {threading.current_thread().getName()}')
event = Event()
# 启动 10 个线程
for i in range(10):
Thread(target=my_task, args=(event, )).start()
print('Starting all threads')
event.set() # 所有等待线程开始执行
Barrier 类
import threading
import time
from threading import Thread
from threading import Barrier
from datetime import datetime
def initDb(*args):
barrier, = args
print('init DB starting')
time.sleep(2)
print('init DB done')
barrier.wait()
def initCache(*args):
barrier, = args
print('init Cache starting')
time.sleep(2)
print('init Cache done')
barrier.wait()
# 当调用 3 次 wait
barrier = Barrier(3)
# 启动两个线程分别初始化 db 和 cache
Thread(target=initDb, args=(barrier, )).start()
Thread(target=initCache, args=(barrier, )).start()
print('Service is not ready, waiting for init db and cache')
barrier.wait()
print('Service is ready now')
– 更多参见:Python 精萃
– 声 明:转载请注明出处
– Last Updated on 2018-10-06
– Written by ShangBo on 2018-10-06
– End