基于线程的并行
定义一个线程
使用线程最简单的方式是,用一个目标函数实例化一个Thread然后调用start()方法启动它。
-
-
Target:当线程启动的时候要执行的函数
-
name:线程的名字,默认会分配一个唯一的名字
-
args: 传递target的参数,要适用tuple类型
-
1 import threading 2 #导入内置的threading模块 3 4 5 def function(i): 6 print("function called by thread %i\n" % i) 7 return 8 9 threads = [] 10 11 for i in range(5): 12 t = threading.Thread(target=function, args=(i,)) 13 #使用目标函数function初始化一个线程对象Thread, 14 #还传入用于打印的一个参数 15 #线程被创建之后并不会马上运行,需要手动调用start(), join()让调用它的线程一直 16 #等待直到执行结束(即阻塞调用它的主线程,t线程执行结束,主线程才会继续执行) 17 18 threads.append(t) 19 t.start() 20 21 for thread in threads: 22 thread.join()
如何确定当前的线程
每一个Thread实例创建的时候都有一个带默认的名字,并且可以修改的。在服务端通常一个服务进程都有多个线程服务,负责不同的操作,这时命名线程是很有用的。
1 import threading 2 import time 3 4 def first_function(): 5 print(threading.currentThread().getName() + str(' is Starting ')) 6 time.sleep(2) 7 print(threading.currentThread().getName() + str(' is Exiting ')) 8 return 9 def second_function(): 10 print(threading.currentThread().getName() + str(' is Starting ')) 11 time.sleep(2) 12 print(threading.currentThread().getName() + str(' is Exiting ')) 13 return 14 def thrid_function(): 15 print(threading.currentThread().getName() + str(' is Starting ')) 16 time.sleep(2) 17 print(threading.currentThread().getName() + str(' is Exiting ')) 18 return 19 20 if __name__ == "__main__": 21 t1 = threading.Thread(name='first_function', target=first_function) 22 t2 = threading.Thread(name='second_function', target=second_function) 23 t3 = threading.Thread(target=thrid_function) 24 # 使用目标函数实例化线程,同时传入name参数,作为线程的名字,如果不传入这个参数,将使用默认的参数。 25 # 默认输出的线程的名字是Thread-1 26 t1.start() 27 t2.start() 28 t3.start() 29 t1.join() 30 t2.join() 31 t3.join()
实现一个线程
# 如何实现一个线程 # 使用threading 模块实现一个新的线程,需要三步: # 定义一个Thread类 # 重写__init__(self, [,args])方法,可以添加额外的参数 # 需要重写run(self, [,args])方法来实现线程要做的事情 #threading 模块是创建和管理线程的首选形式,每一个线程都通过一个继承Thread类, #重写run()方法,来实现逻辑,这个方法就是线程的入口,在主程序中,我们创建来多个myThread #的类型,然后执行start()方法,启动它们。 #调用Thread.__init__构造方法是必须的,通过ta我们可以给线程定义 #一些名字或分组之类的属性,调用start()之后线程变为活跃状态。并且持续直到run()结束。或者中间出现异常。 #所有的线程都执行完成之后,程序结束。 import threading import time import _thread exitFlag = 0 class myThread(threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threading self.name = name self.counter = counter def run(self): print("Starting " + self.name) self.print_time(self.name, self.counter, 5) print("Exiting " + self.name) @staticmethod def print_time(threadName, delay, counter): while counter: if exitFlag: _thread.exit() time.sleep(delay) print("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) thread1.start() thread2.start() thread1.join() thread2.join() print("Exiting Main Thread")
使用lock进行线程同步
当两个或以上那对共享内存的操作发生在并发线程中,并且至少有一个可以改变数据,又没有同步机制的条件下,就会产生竞争条件,可能会导致执行无效代码,bug,或异常行为。
竞争条件最简单的解决方法是使用锁。锁的操作非常简单,当一个线程需要访问部分共享内存时,它必须先获得锁才能访问。此线程对这部分共享资源使用完成之后,该线程必须释放锁。然后其他线程就可以拿到这个锁并访问这部分资源了。
需要保证,在同一时刻只有一个线程允许访问共享内存。
缺点,当不同的线程要求得到一个锁时,死锁就会发生,这时程序不可能继续执行,因为它们互相拿着对方需要的锁。
两个并发的线程(A, B),需要资源1和资源2,假设线程A需要资源1,线程B需要资源2.在这种情况下,两个线程都使用各自的锁,目前为止没有冲突。现在假设,在对方释放锁之前,线程A需要资源2的锁,线程B需要资源1的锁,没有资源线程不会继续执行。鉴于目前两个资源都是被占用的。而且在对方的锁释放之前都处于等待且不释放锁的状态。这是死锁的典型情况。
因此使用锁解决同步问题是一个存在潜在问题的方案。
import threading # 锁有两种状态: locked和unlocked # 有两个方法来操作锁: acquire() release() # 如果状态是unlocked,可以调用acquire()将状态改为locked # 如果状态是locked,acquire()会被block直到另一个线程调用release()释放锁 # 如果状态是unlocked,调用release()将导致runtimeError异常 # 如果状态是locked,可以调用release()将状态改为unlocked shared_resource_with_lock = 0 shared_resource_with_no_lock = 0 COUNT = 100000 shared_resource_lock = threading.Lock() def increment_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock += 1 shared_resource_lock.release() def decrement_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock -= 1 shared_resource_lock.release() def increment_without_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock += 1 def decrement_without_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock -= 1 if __name__ == "__main__": t1 = threading.Thread(target=increment_with_lock) t2 = threading.Thread(target=decrement_with_lock) t3 = threading.Thread(target=increment_without_lock) t4 = threading.Thread(target=decrement_without_lock) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join() print("the value of shared variable with lock management is %s" % shared_resource_with_lock) print("the value of shared variable with race condition is %s" % shared_resource_with_no_lock)