sailan
1. 通过启动跟自己机器cpu相同的进程和线程数,验证GIL的存在
from threading import Thread
import time
def task():
time.sleep(20)
if __name__ == '__main__':
for i in range(10):
p = Thread(target=task)
p.start()
from multiprocessing import Process
import time
def task():
time.sleep(200)
if __name__ == '__main__':
for i in range(6):
p = Process(target=task)
p.start()
2.测试计算密集型和io密集型执行时间
计算数据量小的话 线程会稍块
io 密集 线程更快更好
import time
from threading import Thread
from multiprocessing import Process
def task():
n = 0
for i1 in range(100000000):
n += i1
def task1():
time.sleep(2)
计算密集型
#线程
if __name__ == '__main__':
l1 = []
a = time.time()
for i in range(10):
t = Thread(target=task)
l1.append(t)
t.start()
for i in l1:
i.join()
print(time.time()-a) # 114.71118760108948
# 进程
if __name__ == '__main__':
l1 = []
a = time.time()
for i in range(10):
t = Process(target=task)
l1.append(t)
t.start()
for i in l1:
i.join()
print(time.time()-a) # 23.2220938205719
3. 通过递归锁解决课上死锁现象
from threading import Thread, RLock
import time
mutex1 = mutex2 = RLock()
def task1(name1):
if name == 'sailan':
return
mutex1.acquire()
print(f'{name1}获得了1锁')
mutex2.acquire()
print(f'{name1}获得了2锁')
print(f'恭喜{name1}凑集双锁矮一公分')
mutex1.release()
print(f'{name1}释放了1锁')
mutex2.release()
print(f'{name1}释放了2锁')
def task2(name2):
if name == 'sailan':
return
mutex2.acquire()
print(f'{name2}获得了2锁')
time.sleep(2)
mutex1.acquire()
print(f'{name2}获得了1锁')
print(f'恭喜{name2}再次凑集双锁长高一米')
mutex1.release()
print(f'{name2}释放了1锁')
mutex2.release()
print(f'{name2}释放了2锁')
if __name__ == '__main__':
l1 = ['sailan', 'egon', 'lxx']
for name in l1:
t1 = Thread(target=task1, args=(name,))
t2 = Thread(target=task2, args=(name,))
t1.start()
t2.start()
4. 通过event事件,实现两个线程,一个线程读文件前一半,写入另一个文件,另一个线程读后一半写入文件
from threading import Thread, Event
import os
event = Event()
size = os.path.getsize('我就是力量的化身')
n = size // 2
def read_first():
with open('我就是力量的化身', 'r', encoding='utf-8') as f:
a11 = f.read(n//3)
with open('拉斯维尼', 'a', encoding='utf-8') as f:
f.write(a11)
event.set()
def read_last():
event.wait()
with open('我就是力量的化身', 'r', encoding='utf-8') as f1:
f1.seek(n, 0)
a1 = f1.read()
with open('拉斯维尼', 'a', encoding='utf-8') as f:
f.write(a1)
if __name__ == '__main__':
t1 = Thread(target=read_last)
t2 = Thread(target=read_first)
t1.start()
t2.start()
5. 把上面的4的代码,通过线程池实现
from concurrent.futures import ThreadPoolExecutor
import os
import time
pool = ThreadPoolExecutor(2)
size = os.path.getsize('我就是力量的化身')
n = size // 2
def read_first():
with open('我就是力量的化身', 'r', encoding='utf-8') as f:
a1 = f.read(n // 3)
with open('拉斯维尼', 'a', encoding='utf-8') as f:
f.write(a1)
def read_last():
with open('我就是力量的化身', 'r', encoding='utf-8') as f1:
f1.seek(n, 0)
a1 = f1.read()
with open('拉斯维尼', 'a', encoding='utf-8') as f:
f.write(a1)
if __name__ == '__main__':
a = pool.submit(read_first)
time.sleep(0.1)
if a.done():
pool.submit(read_last)