Python 线程同步

– Start
当多个线程对同一个对象进行读写操作时将引发并发问题, 学过数据库的人都知道并发将导致脏读和丢失更新等错误, 本文将介绍 Python 是如何解决并发问题的.

Lock 类

from threading import Thread
from threading import Lock


# 定义账户类
class Account:
    def __init__(self, debit):
        self.debit = debit
        self.lock = Lock()

    def increase(self, amount):
        self.lock.acquire()
        try:
            self.debit = self.debit + amount
        finally:
            self.lock.release()


# 启动两个线程同时转账
def tansfer(*args):
    account, = args
    for i in range(1000000):
        account.increase(1)

account = Account(0)
args = (account, )

t1 = Thread(target=tansfer, args=args)
t2 = Thread(target=tansfer, args=args)

t1.start()
t2.start()


# 等待 t1 和 t2 结束,打印结果
t1.join()
t2.join()
print(account.debit)

with 语句

    def increase(self, amount):
        with self.lock:
            self.debit = self.debit + amount

RLock 类

from threading import Thread
from threading import RLock


# 定义账户类
class Account:
    def __init__(self, debit):
        self.debit = debit
        self.lock = RLock()
        self.credit = 0

    def increase(self, amount):
        with self.lock:
            self.decreaseCredit(amount)
            self.debit = self.debit + amount

    def decreaseCredit(self, amount):
        with self.lock:
            self.credit = self.credit - amount

# 启动两个线程同时转账
def tansfer(*args):
    account, = args
    for i in range(1000000):
        account.increase(1)

account = Account(0)
args = (account, )

t1 = Thread(target=tansfer, args=args)
t2 = Thread(target=tansfer, args=args)

t1.start()
t2.start()


# 等待 t1 和 t2 结束,打印结果
t1.join()
t2.join()
print(account.debit)

BoundedSemaphore 类

import threading
from threading import Thread
from threading import BoundedSemaphore
from datetime import datetime

# 定义服务类
class Service:
    def __init__(self):
        # 同时提供 3 个服务
        self.available = BoundedSemaphore(value=3)

    def serve(self):
        with self.available:
            print(f'{datetime.now()} -- serving {threading.current_thread().getName()}')


# 启动 10 个线程
service = Service()
for i in range(10):
    Thread(target=lambda service: service.serve(), args=(service, )).start()

Event 类

import threading
from threading import Thread
from threading import Event
from datetime import datetime


def my_task(*args):
    event, = args
    event.wait() # 等待
    print(f'{datetime.now()} -- serving {threading.current_thread().getName()}')


event = Event()


# 启动 10 个线程
for i in range(10):
    Thread(target=my_task, args=(event, )).start()

print('Starting all threads')
event.set() # 所有等待线程开始执行

Barrier 类

import threading
import time
from threading import Thread
from threading import Barrier
from datetime import datetime


def initDb(*args):
    barrier, = args
    print('init DB starting')
    time.sleep(2)
    print('init DB done')
    barrier.wait()


def initCache(*args):
    barrier, = args
    print('init Cache starting')
    time.sleep(2)
    print('init Cache done')
    barrier.wait()

# 当调用 3 次 wait
barrier = Barrier(3)

# 启动两个线程分别初始化 db 和 cache
Thread(target=initDb, args=(barrier, )).start()
Thread(target=initCache, args=(barrier, )).start()


print('Service is not ready, waiting for init db and cache')
barrier.wait()
print('Service is ready now')

– 更多参见:Python 精萃
– 声 明:转载请注明出处
– Last Updated on 2018-10-06
– Written by ShangBo on 2018-10-06
– End

猜你喜欢

转载自blog.csdn.net/shangboerds/article/details/82955160