一、Python操作MySQL数据库
利用Python语言操作数据库,需要先下载pymysql,由于我之前下载了Anaconda并配置了系统变量,直接在命令行输出:
conda install pymysql
如果没有安装过Anaconda,可通过以下命令行安装:
pip install pymysql
安装完毕后,通过以下代码访问并操作数据库MySQL。
import pymysql
# user为数据库用户名,password为登录密码,db为目标数据库名
conn = pymysql.connect(host="127.0.0.1",user="root",password="123456",db="easyvideo")
cs = conn.cursor()
cs.execute("select * from admin")
for i in cs:
print("当前是第" + str(cs.rownumber) + "行")
print("id:" + i[0]) #输出数据库表中对应该行的id
print("username:" + i[1]) #输出数据库表中对应该行的username
二、Python多进程
创建进程的multiprocessing.Process类
我们来看看这个类的原型:
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
参数说明:
- target:表示调用对象,一般为函数,也可以为类。
- args:表示调用对象的置参数元组。
- kwargs:表示调用对象的字典。
- name:为进程的别名。
- group:参数不使用,可忽略。
类提供的常用方法:
- is_alive():返回进程是否是激活的。
- join([timeout]) :可以等待子进程结束后再继续往下运行,通常用于进程间的同步。进一步地解释,哪个子进程调用了join方法,主进程就要等该子进程执行完后才能继续向下执行。
- run() :代表进程执行的任务函数,可被重写。
- start() :激活进程。
- terminate():终止进程。
属性:
- authkey:字节码,进程的谁密钥.
- daemon:父进程终止后自动终止而不会等待子进程,且自己不能产生新进程,必须在start()之前设置。
- exitcode:退出码,进程在运行时为None,如果为–N,表示被信号N结束。
- name:获取进程名称.
- pid:进程id。
multiprocessing模块提供了一个创建进程的Process类,其创建进程有两种方法:
- 创建一个Process类的实例,并指定目标任务函数。
- 自定义一个类,并继承Process类,重写其init ()方法和run ()方法。
首先我们使用第一种方法创建两个进程,并与单进程运行的时间做比较:
import multiprocessing
import os
import time
#子进程执行的代码
def child_process(num):
result = 0
for i in range(num * 10000000):
result += i
print("进程为:{0:d}".format(os.getpid()))
if __name__ == '__main__':
print("父进程为:{0:d}".format(os.getpid()))
t0 = time.time()
child_process(5)
child_process(5)
t1 = time.time()
print("顺序执行耗时:{0:.2f}".format(t1 - t0))
p1 = multiprocessing.Process(target=child_process,args=(5,))
p2 = multiprocessing.Process(target=child_process,args=(5,))
t2 = time.time()
p1.start()
p2.start()
p1.join()
p2.join()
t3 = time.time()
print("多进程执行耗时:{0:.2f}".format(t3 - t2))
上面的代码首先定义了一个千万次数据累加的耗时函数,先通过单进程顺序执行两个耗时函数,然后输出所用的时间;接着通过多进程并发执行,并指定目标函数为child_process,执行完成后打印耗时。其运行结果如下所示:
很明显发现,通过多进程执行同样的耗时函数,所用时间更少。
我们再用第二种方法对上面的耗时函数进行测试:
import multiprocessing
import os
import time
class MyProcess(multiprocessing.Process):
def __init__(self,num):
super().__init__()
self.num = num
#子进程执行的代码
def run(self):
result = 0;
for i in range(self.num * 10000000):
result += i
print("进程为:{0:d}".format(os.getpid()))
if __name__ == '__main__':
print("父进程为:{0:d}".format(os.getpid()))
p1 = MyProcess(5)
p2 = MyProcess(5)
t1 = time.time()
#进程p1,p2调用start()时,自动调用其run()方法
p1.start()
p2.start()
p1.join()
p2.join()
t2 = time.time()
print("多进程执行耗时:{0:.2f}".format(t2 - t1))
运行结果如下:
daemon属性
import multiprocessing
import os
import time
# 子进程要执行的代码
def child_process(delay):
print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":子进程执行开始。")
print("sleep {0:d}s".format(delay))
time.sleep(delay)
print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":子进程执行结束。")
if __name__=='__main__':
print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":父进程执行开始。")
p1 = multiprocessing.Process(target=child_process, args=(3,))
#设置 daemon属性为True
p1.daemon = True
p1.start()
# p1.join() #如果此行代码被注释,那么父进程不会等待子进程而提前结束,子进程会因为父进程的结束而结束
print(time.strftime('%Y-%m-%d %H:%M:%S',time.gmtime()) + ":父进程执行结束。")
如果p1.join()注释,则结果为:
如果p1.join()保留,则结果为:
在多线程模型中,默认情况下daemon=False,主线程会等待子线程退出然后再退出。而如果将多进程的daemon设置为True时,主线程不会等待子线程,直接退出,而此时子线程会随着主线程的退出而退出。为避免这种情况,主线程中需要对子线程进行join,等待子线程执行完毕后再退出。
并发控制之Semaphore
Semaphore用来控制对共享资源的访问数量,即每一时刻允许同时执行的最大进程数。
import multiprocessing
import time
def f(s, i):
s.acquire()
print(time.strftime('%H:%M:%S',time.gmtime()),multiprocessing.current_process().name + " 获得锁运行");
time.sleep(i)
print(time.strftime('%H:%M:%S',time.gmtime()),multiprocessing.current_process().name + " 释放锁结束");
s.release()
if __name__ == "__main__":
s = multiprocessing.Semaphore(3)
for i in range(5):
p = multiprocessing.Process(target = f, args=(s, 2))
p.start()
运行结果如下:
可以看出,由于我设置了s = multiprocessing.Semaphore(3),所以同一时刻最多有三个进程执行。
进程同步之Lock
在某些情况下某些时刻,我们只需要一个进程访问某个资源,这时我们就需要使用锁Lock。
不加锁:
import multiprocessing
import time
def work1():
num = 4;
while num > 1:
print(time.strftime('%H:%M:%S',time.gmtime()) + " work1")
time.sleep(1)
num -= 1
def work2():
num = 4;
while num > 1:
print(time.strftime('%H:%M:%S',time.gmtime()) + " work2")
time.sleep(1)
num -= 1
if __name__ == '__main__':
p1 = multiprocessing.Process(target=work1)
p2 = multiprocessing.Process(target=work2)
p1.start()
p2.start()
运行结果如下:
可以看出,同一时刻不同的work被输出,每个子进程各自打印自己的信息,在实际应用中,容易造成信息混乱,这时就要用到Lock,保证同一时刻只有一个进程执行。
加锁:
import multiprocessing
import time
def work1(lock):
with lock:
num = 4;
while num > 1:
print(time.strftime('%H:%M:%S',time.gmtime()) + " work1")
time.sleep(1)
num -= 1
def work2(lock):
lock.acquire()
num = 4;
while num > 1:
print(time.strftime('%H:%M:%S',time.gmtime()) + " work2")
time.sleep(1)
num -= 1
lock.release()
if __name__ == '__main__':
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=work1,args=(lock,))
p2 = multiprocessing.Process(target=work2,args=(lock,))
p1.start()
p2.start()
运行结果如下:
每一个子进程函数中都加了锁Lock:首先初始化一个锁的实例lock = multiprocessing.Lock(),然后在需要独占的代码前后加锁:先获取锁,即调用lock.acquire()方法,运行完成后释放锁,即调用lock.release()方法;也可以简单地使用上下文关键字with (见work1的代码)。
进程池Pool
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process类动态生成多个进程。但如果是生成上百个、上千个目标,手动地去限制进程数量太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。
import multiprocessing
import time
def task(name):
print(f"{time.strftime('%H:%M:%S')}:{name} 开始执行")
time.sleep(3)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in range(10):
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
pool.apply_async(func = task, args=(i,))
pool.close()
pool.join()
print("hello")
运行结果如下:
从上面结果可以看出,同一时刻,只有线程池中的三个进程执行。
进程同步之Event
Event用来实现进程间同步通信。
import multiprocessing
import time
def wait_for_event(e):
e.wait() #等待
time.sleep(1)
# 唤醒后清除Event状态,为后续继续等待
e.clear()
print(f"{time.strftime('%H:%M:%S')} 进程 A: 我们是兄弟,我等你...")
e.wait()
print(f"{time.strftime('%H:%M:%S')} 进程 A: 好的,是兄弟一起走")
def wait_for_event_timeout(e, t):
e.wait() #等待
time.sleep(1)
# 唤醒后清除Event状态,为后续继续等待
e.clear()
print(f"{time.strftime('%H:%M:%S')} 进程 B: 好吧,最多等你 {t} 秒")
e.wait(t)
print(f"{time.strftime('%H:%M:%S')} 进程 B: 我继续往前走了")
if __name__ == "__main__":
e = multiprocessing.Event()
w1 = multiprocessing.Process(target=wait_for_event, args=(e,))
w2 = multiprocessing.Process( target=wait_for_event_timeout, args=(e, 5) )
w1.start()
w2.start()
# 主进程发话
print(f"{time.strftime('%H:%M:%S')} 主进程: 谁等我下,我需要 8 s 时间")
# 唤醒等待的进程
e.set()
time.sleep(8)
print(f"{time.strftime('%H:%M:%S')} 主进程: 好了,我赶上了")
# 再次唤醒等待的进程
e.set()
w1.join()
w2.join()
print(f"{time.strftime('%H:%M:%S')} 主进程:退出")
上面的代码定义了两个进程函数,一个是等待事件发生函数,一个等待事件发生并设置了超时时间的函数。主进程调用事件的set()方法唤醒等待事件的进程,事件唤醒后调用clear()方法清除事件的状态,并重新等待,以此达到进程的同步。运行结果如下:
优先级队列Queue
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
put方法插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。
import multiprocessing
import time
def ProducerA(q):
count = 1
while True:
q.put(f"冷饮 {count}")
print(f"{time.strftime('%H:%M:%S')} A 放入:[冷饮 {count}]")
count +=1
time.sleep(1)
def ConsumerB(q):
while True:
print(f"{time.strftime('%H:%M:%S')} B 取出 [{q.get()}]")
time.sleep(2)
if __name__ == '__main__':
q = multiprocessing.Queue(maxsize=5)
p = multiprocessing.Process(target=ProducerA,args=(q,))
c = multiprocessing.Process(target=ConsumerB,args=(q,))
p.start()
c.start()
p.join()
c.join()
上面的代码定义了生产者函数和消费者函数,设置其队列的最大容量是5,生产者生产冷饮,消费者取出冷饮消费,当队列满时,生产者等待,当队列空时,消费者等待。他们放入和取出的速度可能不一致,但使用Queue可以让生产者和消费者有条不紊的一直进程下去,运行结果如下所示:
数据交换Pipe
有时候,我们需要将一个进程的输出作为另一个进程的输入,multiprocessing.Pipe()方法返回一个管道的两个端口,端口1的输入可作为另一个端口2的输出。如果反过来,让端口2的输出作为端口1的输入,这就是全双工管道,默认是全双工管道,如果想设置半双工管理,只需要给方法 Pipe()传递参数duplex = False即可。
Pipe()方法返回的对象具有发送消息send()方法和接收消息recv()方法,如果没有消息可接收, recv()方法会一直阻塞。如果管道已经被关闭,那么 recv()方法会抛出异常。
import multiprocessing
import time
def task1(pipe):
for i in range(4):
str = f"task1-{i}"
print(f"{time.strftime('%H:%M:%S')} task1 发送:{str}")
pipe.send(str)
time.sleep(2)
for i in range(4):
print(f"{time.strftime('%H:%M:%S')} task1 接收: { pipe.recv() }")
def task2(pipe):
for i in range(4):
print(f"{time.strftime('%H:%M:%S')} task2 接收: { pipe.recv() }")
time.sleep(1)
for i in range(4):
str = f"task2-{i}"
print(f"{time.strftime('%H:%M:%S')} task2 发送:{str}")
pipe.send(str)
if __name__ == "__main__":
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=task1, args=(pipe[0],))
p2 = multiprocessing.Process(target=task2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
上面的代码定义了两个子进程函数,task1先发送4条消息,再接收消息,task2先接收消息,再发送消息,运行结果如下: