1、进程间通信(通过队列使进程间通信)
队列简单使用, 见1-queue代码
进程间使用队列通信
q.empty() 是否为空,用pycharm进行测试
#队列
from multiprocessing import Queue
# 创建一个队列 队列长度可以指定
q = Queue(5) #队列5个长度
#01向队列中添加元素(一个put函数只能插入一个元素)
q.put('李白')
q.put('杜甫')
q.put('白居易')
q.put('杜牧')
q.put('李商隐')# 此时队列是满的,正好是五个元素
'''
# 默认数阻塞的,程序卡在这里,等待队列里面有位置插入进去
#q.put('柳宗元') #程序会卡在这儿,永远等待,直到有空再运行
# 第二个参数为False,不等待,直接报错
#q.put('柳宗元', False) 立马报错,不等待
# 第二个参数为True,等待几秒再报错
# q.put('柳宗元', True, 3) 等待3秒报错
# print(q.full()) 判断队列是否满
# print(q.empty()) 判断队列是否为空
# print(q.qsize()) 查看队列的长度
'''
'''
#02从队列中获取元素
print(q.get()) #获取的为李白,一次只能获取 一个元素
print(q.get())
print(q.get())
print(q.get())
print(q.get()) #获取的是最后一个元素
print(q.get()) # 如果队列中没有元素,那么就会在这里等待
#print(q.get(False)) # 直接报错
#print(q.get(True, 3)) # 等待3秒报错
'''
#进程间使用队列通信
'''
#格式
#第一步 导入库
from multiprocessing import Process #导入进程
from multiprocessing import Queue #导入队列
#函数
def main():
pass
if __name__ == '__main__': #入口程序
main()
'''
'''
from multiprocessing import Process
from multiprocessing import Queue
import time
def main():
#第二步 创建队列和进程
# 需求是 创建写进程和读进程,创建队列,让其通过队列通信
q = Queue(5) #创建队列
pw = Process(target=write, args=(q,))# 创建写进程,将队列作为参数传递进去,传递进去才能使用
pr = Process(target=read, args=(q,))# 创建读进程,将队列传递进去
# 启动进程(此时有一个主进程 两个子进程)
pw.start()
time.sleep(1) #停留几秒 观测效果
pr.start()
# 让主进程等待子进程结束后再结束
pw.join()
pr.join()
print('读写进程交互完毕,主进程和子进程全部结束')
if __name__ == '__main__':
main()
'''
'''
from multiprocessing import Process #导入进程
from multiprocessing import Queue #导入队列
import time
import os
#第三步 写进程‘向队列中添加数据q.put(x)’,读进程‘会向队列中提取数据q.get()’
def write(q):
print('写进程%s开始执行' % os.getpid())
# 向队列中添加数据
for x in ['孙悟空', '凯', '裴擒虎', '梦琪', '张飞']:
print('%s写入队列成功' % x)
q.put(x)
time.sleep(0.5)
def read(q):
print('读进程%s开始执行' % os.getpid())
#只要有数据,读进程会一直在读,提取数据
while 1:
if not q.empty():
print('%s出队成功' % q.get())
time.sleep(1)
else:
break
def main():
# 需求是创建写进程和读进程,创建队列,让其通过队列通信
q = Queue(5) #创建队列
pw = Process(target=write, args=(q,))# 创建写进程,将队列传递进去,传递进去才能使用
pr = Process(target=read, args=(q,))# 创建读进程,将队列传递进去
# 启动进程
pw.start()
time.sleep(1)
pr.start()
# 让主进程等待子进程结束
pw.join()
pr.join()
print('读写进程交互完毕,主进程和子进程全部结束')
if __name__ == '__main__':
main()
'''
# 读进程和写进程有交互 并不是一个执行完再执行另外一个
2、线程(thread)
打开一个word,打开一个qq,打开一个暴风影音,打开快播,这种都是打开了一个进程
每一个进程都会同时干好多事,比如word,可以打字、可以拼写检查等等,qq,可以聊天、可以语音、可以视频。。。,暴风,看视频,音频输出。。。。
在进程里面干的好多事,每一个事就是一个线程
–>线程是进程的基本单位,一个进程至少要有一个线程,如果只有一个线程,这个线程称之为主
线程(main thread),如果创建其它线程,就会有好多子线程
见图形
(一个进程可能由多个线程,执行的时候,进程间会相互的切换,执行每一个进程的时候进程里的线程是基本单位会在线程相互切换)
线程如何创建?
使用到一个模块叫做threading模块
唱歌跳舞例子
面向过程
threading.Thread(target=xxx, name=xxx, args=())
获取线程名字 threading.current_thread().name
主线程名字 MainThread
子线程名字 如果起名字以你名字为准,如果没起,名字默认为Thread-num
面向对象
写一个类继承自 threading.Thread
重写run方法 线程启动执行该方法
重写构造方法,构造方法可以设置name属性为线程名字
【注】构造方法中要手动调用父类方法
面向对象方式给线程传递参数
通过对象属性进行传递,将参数保存到对象属性中
#线程
#唱歌跳舞的例子--->#面向过程创建
#第一步导入threading模块 程序入口 创建主线程
#第二步写主线程(创建一个唱歌的线程,创建一个跳舞的线程)-->启动线程-->主线程和子线程同时结束
#第三步写子线程
'''
#格式
import threading
def main():
pass # 创建一个唱歌线程 # 创建一个跳舞线程
# 那么当前进程中总共有3个线程,还有一个主线程
if __name__ == '__main__':
main()
'''
'''
import threading
def main():
#创建线程也就是创建对象的过程
#创建一个唱歌的线程
#target 目标需要指定 name 给线程取名字 args 传递参数
tsing = threading.Thread(target=sing, name='唱歌', args=('蔡依林', '城堡'))
#穿件一个跳舞的线程
tdance = threading.Thread(target=dance,name='跳舞')
#那么现在就有三个线程一个主线程一个跳舞线程一个唱歌线程
#启动线程
tsing.start()
tdance.start()
#主线程会先执行,再执行子线程
#让主线程等待一下子线程
tsing.join()
tdance.join()
print('主线程和子线程已经全部结束')
if __name__ == '__main__':
main()
'''
'''
import threading
import time
# from threading import Thread
def sing(name, zhuanji):
# 在当前线程获取线程的名字
print('唱歌线程的名字为%s' % threading.current_thread().name)
print('传递过来的参数为%s-%s' % (name, zhuanji))
for x in range(5):
print('我在唱舞娘')
time.sleep(1)
def dance():
# 在当前线程获取线程的名字
print('跳舞线程的名字为%s' % threading.current_thread().name)
for x in range(5):
print('你在扭动身姿')
time.sleep(1)
def main():
# 在当前线程获取线程的名字
print('主线程名字为%s' % threading.current_thread().name)
# 创建一个唱歌线程
tsing = threading.Thread(target=sing, name='唱歌', args=('蔡依林', '城堡'))
# 创建一个跳舞线程
tdance = threading.Thread(target=dance)
# 那么当前进程中总共有3个线程,还有一个主线程
# 启动线程
tsing.start()
tdance.start()
# 让主线程等待子线程结束之后再结束
tsing.join()
tdance.join()
print('主线程和子线程全部结束')
if __name__ == '__main__':
main()
'''
#线程对象 threaded_onject
#线程
#唱歌跳舞的例子----->面向对象创建
#格式
'''
#一个主进程一个唱歌进程
import threading #导入模块
import time
class SingThread(threading.Thread): #类 继承自threading.Thread
def __init__(self,name): #构造方法
super().__init__()
#super(SingThread,self)._init_() python2的方式
#python3的两种方式
#super().__init__() #继承类 super只能调用第一个父类的构造方法--->run方法
#threading.Thread.__init__()#可以调用指定的构造方法
self.name = name
#类属性 run函数
def run(self):
for x in range(5):
print('我在唱舞娘')
time.sleep(1)
def main(): #入口函数
#只有一个线程时就是主线程
tsing = SingThread("唱歌") #创建一个线程 一个主线程一个子线程
tsing.start() #子线程启动,会调用def __init__(self,name)
#主线程在跳舞
for x in range(5):
print('钢管舞high起来')
time.sleep(1)
tsing.join()#主线程等子线程运行结束再运行
print("唱歌跳舞完毕")
if __name__ == '__main__': #入口
main()
'''
'''
#三个进程:一个主进程一个唱歌进程 一个跳舞进程 三个线程同时执行
import threading
import time
class SingThread(threading.Thread):
"""docstring for MyThread"""
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
for x in range(5):
print('我在唱舞娘')
time.sleep(1)
class DanceThread(threading.Thread):
"""docstring for MyThread"""
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
for x in range(5):
print('我在跳舞')
time.sleep(1)
def main():
tsing = SingThread('唱歌')
tdance = DanceThread('跳舞')
tsing.start()
tdance.start()
# 主线程在跳舞
for x in range(5):
print('钢管舞high起来')
time.sleep(1)
tsing.join()
tdance.join()
print('唱歌跳舞完毕')
if __name__ == '__main__':
main()
'''
'''
#测试name
import threading
import time
class SingThread(threading.Thread):
"""docstring for MyThread"""
def __init__(self, name, haha, hehe): #面向对象传参通过对象属性进行传递
super().__init__()
self.name = name
self.haha = haha
self.hehe = hehe
def run(self):
print('线程名字为%s' % threading.current_thread().name)
#面向对象写的时候name是一个属性,把name改了就可以更改name
print(self.haha, self.hehe) #面向对象传参要通过成员属性才能用
for x in range(5):
print('我在唱舞娘')
time.sleep(1)
class DanceThread(threading.Thread):
"""docstring for MyThread"""
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
for x in range(5):
print('我在跳舞')
time.sleep(1)
def main():
tsing = SingThread('唱歌', 100, 200) #面向对象 传参要通过类进行传参数
tdance = DanceThread('跳舞')
tsing.start()
tdance.start()
# 主线程在跳舞
for x in range(5):
print('钢管舞high起来')
time.sleep(1)
tsing.join()
tdance.join()
print('唱歌跳舞完毕')
if __name__ == '__main__':
main()
'''
全局变量
线程之间可以共享全局变量,存在线程安全问题
局部变量
多个线程跑相同的代码,局部变量每一个线程都有自己的一份,是不共享的,全局变量是共享的
总结:进程的全局变量局部变量都不共享,线程只共享全局变量!
#线程之间能否共享全局变量?
#验证程序思路:
#第一创建两个线程
#第二定义一个全局变量
#第三写一个线程去修改这个全局变量,另外一个线程去读取,看读取的是否修改
#第四若读取的是已经修改的值说明两个线程共享一个全局变量,否则说明不共享全局变量
#【注】创建两个线程会有三个线程一个主线程两个子线程这里用两个子线程去验证
#步骤
#首先,导入模块 入口 主线程
'''
import threading
def main():
pass
if __name__ = "__main__":
main()
'''
#然后,定义一个全局变量 创建两个线程 线程启动 线程关闭
'''
import threading
#定义一个全局变量
count = 100
def main():
#创建两个线程
# 创建一个线程,用来修改全局变量
t1 = threading.Thread(target=change, name='修改')
# 创建用来读取全局变量的线程
t2 = threading.Thread(target=read, name='读取')
#线程启动
t1.start()
t2.start()
#让主线程等子线程运行完成一起关闭
t1.join()
t2.join()
print('测试完毕')
if __name__ = "__main__":
main()
'''
#最后,写子线程
'''
import threading
# 定义一个全局变量
count = 100
def change():
#局部变量不能修改全局变量必须 声明修改的是全局变量
global count
#修改全局变量
count += 100
#打印
print('%s线程修改完毕' % threading.current_thread().name)
def read():
name = threading.current_thread().name
print('%s线程读取count值为%s' % (name, count))
def main():
print('主线程第一次读取的值为%s' % count)
# 创建一个线程,用来修改全局变量
t1 = threading.Thread(target=change, name='修改')
# 创建用来读取全局变量的线程
t2 = threading.Thread(target=read, name='读取')
# 启动线程
t1.start()
t2.start()
#主线程等待子线程一起关闭
t1.join()
t2.join()
print('主线程第二次读取的值为%s' % count)
print('测试完毕')
if __name__ == '__main__':
main()
'''
#修改线程修改完毕
#读取线程读取count值为200
#主线程第二次读取的值为200
#测试完毕
#结论是 线程共享全局变量 会引发安全问题?你能用我也能用就像公共厕所,谁先用?
#局部变量
#验证多个线程是否共享局部变量?
#思路
#让多个线程跑同一个代码,当一个进程的局部变量修改了,另外一个进程是否也会修改?
'''
import threading
def main():
# 创建两个线程,但是这两个线程的target是同一个函数
if __name__ == '__main__':
main()
'''
'''
import threading, time
def demo():
for x in range(5):
print('吃鸡小王子')
time.sleep(1)
def main():
# 创建两个线程,但是这两个线程的target是同一个函数
t1 = threading.Thread(target=demo, name='goudan')
t2 = threading.Thread(target=demo, name='yadan')
t1.start()
t2.start()
t1.join()
t2.join()
print('主线程、子线程全部结束')
if __name__ == '__main__':
main()
'''
#两个进程都能执行demo函数,我们需要验证的是
#如果一个线程把ji = 100修改了,当另外的进程运行时是否也会修改
'''
import threading, time
def demo():
#01
#不能直接修改,两个线程都会运行demo函数不知道谁是谁?
#ji = 100
#ji += 100
#print('ji是%s'%ji)
#所以要进行区分
#02
ji = 100
# 通过线程的名字,区分出来两个线程,然后一个修改,一个读取
name = threading.current_thread().name
if name == 'goudan':
ji += 100
print('goudan线程修改完毕')
else:
time.sleep(5)
print('yadan线程读取的值为%s' % ji)
def main():
# 创建两个线程,但是这两个线程的target是同一个函数
t1 = threading.Thread(target=demo, name='goudan')
t2 = threading.Thread(target=demo, name='yadan')
t1.start()
t2.start()
t1.join()
t2.join()
print('主线程、子线程全部结束')
if __name__ == '__main__':
main()
'''
import threading, time
def demo():
ji = 100
name = threading.current_thread().name
if name == 'goudan':
ji += 100
print('goudan线程修改完毕')
else:
time.sleep(5)
print('yadan线程读取的值为%s' % ji)
def main():
t1 = threading.Thread(target=demo, name='goudan')
t2 = threading.Thread(target=demo, name='yadan')
t1.start()
t2.start()
t1.join()
t2.join()
print('主线程、子线程全部结束')
if __name__ == '__main__':
main()
#=====>最终输出结果是不会改
#说明多个线程跑同一个代码,局部变量是不共享的,自己有自己的
线程安全
多个线程同时修改全局变量,查看全局变量的值
#线程的全局变量共享引发的安全问题?
#思路
#多个线程全局变量修改,查看全局变量的值
'''
import threading, time
count = 100
def main():
# 创建线程同时修改count,第一个线程让他先加再减3,第二个线程让他先加再减100
#看最终输出的结果是不是原来的count值
if __name__ == '__main__':
main()
'''
'''
import threading, time
count = 100
def demo(n):
# 让count先加再减n
global count
for x in range(10000000):
count += n
count -= n
#第一个线程在对count先加3后减3 第二个线程,先对count加5后减5重叠
#当第一个进程操作时间比较长时会和第二个线程重叠
#也就是说100-3+5
#需要用for in 让进程1和进程2有重叠
def main():
# 创建线程同时修改count
t1 = threading.Thread(target=demo, args=(3,))
t2 = threading.Thread(target=demo, args=(5,))
t1.start()
t2.start()
t1.join()
t2.join()
# 去查看count的值
print('count=%d' % count)
print('over')
if __name__ == '__main__':
main()
'''
#输出结果为90 不是100 线程拥堵的风险
#用线程锁能解决线程拥堵的问题,但是只能等一个用完另外一个才能去用
'''
import threading, time
count = 100
# 创建一把锁
suo = threading.Lock()
def demo(n):
global count
start = time.time() #获取时间戳
for x in range(1000000):
# 加一把锁
suo.acquire()
count += n
count -= n
# 释放锁
suo.release() #有上锁就有释放锁,不然进程会停在这里
end = time.time() #获取时间戳
print('100次循环时间为%s' % (end - start)) #获取运行时间
def main():
# 创建线程同时修改count
t1 = threading.Thread(target=demo, args=(3,))
t2 = threading.Thread(target=demo, args=(5,))
t1.start()
t2.start()
t1.join()
t2.join()
# 去查看count的值
print('count=%d' % count)
print('over')
if __name__ == '__main__':
main()
'''
线程锁
同一块代码,不同的线程在执行,如果代码被一个线程上锁了,另外的线程走到这里只能等待,等待使用的线程释放锁之后,这个线程立马上锁,然后自己使用
如果线程多了,多个线程也都在等待,如果释放了,多个线程就要抢着去上锁
suo = threading.Lock() 创建锁对象
suo.acquire() 获取锁
suo.release() 释放锁
好处:确保这段关键代码在一定时间内只能由一个线程执行
坏处:降低效率
死锁:见代码
#死锁
import threading
import time
class MyThread(threading.Thread):
#5个线程同时跑 若线程1上了A锁需要上B锁
#线程2上了B锁需要上A锁,但是
#A锁现在线程1用着呢,线程1必须上了B锁后释放B锁才能去释放A锁,之后线程2才能上
#线程2等线程1,走不了,就会占着B锁所以线程1就没办法上B锁
#线程1在等线程2,线程2在等线程1
#死锁
#先上A锁 再上B锁 先释放B锁才能释放A锁
def do1(self):
global resA, resB #修改全局变量
if mutexA.acquire(): #如果A锁上锁成功,打印消息
msg = self.name+' got resA'
print msg
if mutexB.acquire(): #如果B锁上锁成功,打印消息
msg = self.name+' got resB'
print msg
mutexB.release() #解开B锁
mutexA.release() #解开A锁
#先上B锁再上A锁 先释放A锁才能释放B锁
def do2(self):
global resA, resB #修改全局变量
if mutexB.acquire(): #如果B锁上锁成功,打印消息
msg = self.name+' got resB'
print msg
if mutexA.acquire(): #如果A锁上锁成功,打印消息
msg = self.name+' got resA'
print msg
mutexA.release() #释放A锁
mutexB.release() #释放B锁
def run(self): #run方法会调用两个函数
self.do1()
self.do2()
#全局变量
resA = 0
resB = 0
#上了两把锁
mutexA = threading.Lock()
mutexB = threading.Lock()
def test():
for i in range(5):
t = MyThread() #通过类创建多个线程
t.start() #启动线程会调用MyThread(),有一个run方法
#程序入口
if __name__ == '__main__':
test()
ThreadLocal
多线程在跑的时候,由于使用全局变量有风险,但是使用局部变量完全没有风险,所以尽量使用局部变量
使用局部变量也有问题,线程中其它函数使用局部变量的时候,需要来回的传递这个参数,非常麻烦,为了解决这个问题,引入了ThreadLocal
用ThreadLocal创建一个对象,然后使用这个对象即可
见代码
#使用全局变量有死锁的风险,但使用局部变量,需要用的时候就需要当参数来回传递
'''
import threading
def hehe():
print('hehe函数在执行')
def test():
print('test函数在执行')
hehe()
def lala():
print('lala函数在执行')
def demo():
count = 100 #局部变量 只能demo用其他的函数没办法用
#怎么样才能让其他的函数也能用呢?
#以前把count变成全局变量大家都能用,但多线程有风险
#现在,把count作为参数传递才能让其他函数也能使用局部变量count
print('这是线程%s'%threading.current_thread().name)
test()
def main():
t1 = threading.Thread(target=demo, name='goudan')
t1.start()
t1.join()
print('over')
if __name__ == '__main__':
main()
'''
'''
#把count作为参数传递才能让其他函数也能使用局部变量count----很麻烦
import threading
def hehe(count):
print('hehe函数在执行')
print('hehe里面的count为%d'%count) #hehe函数想使用count需要把count作为参数传递
#但其他的函数不需要使用count 只是传递参数
#很麻烦
def test(count):#test函数根本不使用count,它只是起传参的作用
print('test函数在执行')
hehe(count)
def lala(count):
print('lala函数在执行')
def demo():
count = 100
print('这是线程%s'%threading.current_thread().name)
test(count)
lala(count)
def main():
t1 = threading.Thread(target=demo, name='goudan')
t1.start()
t1.join()
print('over')
if __name__ == '__main__':
main()
'''
#Threadlocal可以解决来回传递参数的麻烦
#步骤
#首先创建一个ThreadLocal对象 ll = threading.local()
#然后将其它函数使用到的局部变量保存到这个对象中
#ll.count = count
#ll.index = index
#print('hehe里面的count为%d' % ll.count) 就可以让其他函数来使用局部变量
'''
import threading
# 创建一个ThreadLocal对象
ll = threading.local()
def hehe():
print('hehe函数在执行')
print('hehe里面的count为%d' % ll.count)
def test():
print('test函数在执行')
hehe()
def lala(count):
print('lala函数在执行')
print('lala里面的index为%d' % ll.index)#
def demo():
count = 100
index = 200
# 将count保存在ll这个对象中,将其它函数使用到的局部变量保存到这个对象中
ll.count = count
ll.index = index
print('这是线程%s' % threading.current_thread().name)
test()
lala(count)
def main():
t1 = threading.Thread(target=demo, name='goudan')
t1.start()
t1.join()
print('over')
if __name__ == '__main__':
main()
'''
线程之间使用队列通信
生产者消费者模型
爬虫应用中,一个线程用来爬取数据,一个线程用来处理数据
线程之间使用的队列为
from queue import Queue
进程之间使用的队列为
from multiprocessing import Queue
q = Queue(5)
添加元素
q.put(元素, False)
q.put(元素, True, 3)
q.put(元素)
取出元素
q.get()
q.get(False)
q.get(True, 3)
队列长度
q.qsize()
队列是否满了
q.full()
队列是否为空
q.empty()
#线程之间使用队列通信
#线程之间可以使用全局变量和局部变量进行通信但是不够好---->线程之间使用队列通信
#队列是线程通信的一种方式
'''
#生产者消费者模型
#例子:两个生产线程三个消费线程
from queue import Queue #线程之间使用队列 模块
import threading
import random
import time
# 生产者:生产数据的 生产者可以有多个线程
def produce(q):
while 1: #死循环,一直生产
# 生产一个数据
num = random.randint(100, 999)
# 放入到队列中
q.put(num)
print('生产者生产%s数据生产成功' % num)
time.sleep(2)
# 消费者:消费数据的 消费者也可以有多个线程
def custom(q):
while 1:
# 取出数据
item = q.get()
print('消费者消费了%d数据' % item)
time.sleep(1)
def main():
# 创建队列, 无限长
q = Queue()
# 创建2个生产者线程
for x in range(2):
threading.Thread(target=produce, args=(q,)).start()
# 创建3个消费者线程
for y in range(3):
threading.Thread(target=custom, args=(q,)).start()
if __name__ == '__main__':
main()
'''
练习:
1、多进程拷贝文件夹
进程池进行拷贝
文件夹:10文件文件