多进程
#coding: utf-8
import multiprocessing
import os, time, random
def Lee():
print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
start = time.time()
time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
end = time.time()
print 'Task Lee, runs %0.2f seconds.' %(end - start)
def Marlon():
print "\nRun task Marlon-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 40)
end=time.time()
print 'Task Marlon runs %0.2f seconds.' %(end - start)
def Allen():
print "\nRun task Allen-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print 'Task Allen runs %0.2f seconds.' %(end - start)
def Frank():
print "\nRun task Frank-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print 'Task Frank runs %0.2f seconds.' %(end - start)
if __name__=='__main__':
function_list= [Lee, Marlon, Allen, Frank]
print "parent process %s" %(os.getpid())
pool=multiprocessing.Pool(4) # 一定要写在主进程,不然没法运行生成子进程
for func in function_list:
pool.apply_async(func) #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
print 'Waiting for all subprocesses done...'
pool.close()
pool.join() #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
print 'All subprocesses done.'
多线程
# 线程的暂停和终止; 也可以使用psutil多线程进行控制
#!/usr/bin/env python
# coding: utf-8
import threading
import time
class Job(threading.Thread):
def __init__(self, *args, **kwargs):
super(Job, self).__init__(*args, **kwargs)
self.__flag = threading.Event() # 用于暂停线程的标识
self.__flag.set() # 设置为True
self.__running = threading.Event() # 用于停止线程的标识
self.__running.set() # 将running设置为True
def run(self):
while self.__running.isSet():
self.__flag.wait() # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回
print(time.time())
time.sleep(1)
def pause(self):
self.__flag.clear() # 设置为False, 让线程阻塞
time.sleep(3)
def resume(self):
self.__flag.set() # 设置为True, 让线程停止阻塞
def stop(self):
self.__flag.set() # 将线程从暂停状态恢复, 如何已经暂停的话
self.__running.clear() # 设置为False
a = Job()
a.start()
time.sleep(3)
a.pause()
a.resume()
a.pause()
a.stop()
# 多线程
lock = threading.Lock()
threads = []
nloops = 256
# start threads
for i in range(nloops):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
# wait for all
for i in range(nloops):
threads[i].join()
多协程
from gevent.pool import Pool
from gevent import monkey; monkey.patch_all()
import gevent
# 方式1
jobs = []
for i in list(range(1,200)):
jobs.append(gevent.spawn(pocScan, dictMerged))
gevent.joinall(jobs)
# 方式2
pool = Pool(200)
for dictMerged in madeTask:
pool.add( gevent.spawn(pocScan, dictMerged) )
pool.join()
异步处理
# asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持;从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。
# @asyncio.coroutine
# def hello():
# print('Hello world! (%s)' % threading.currentThread())
# yield from asyncio.sleep(1)
# print('Hello again! (%s)' % threading.currentThread())
# loop = asyncio.get_event_loop()
# tasks = [hello(), hello()]
# loop.run_until_complete(asyncio.wait(tasks))
# loop.close()
# @asyncio.coroutine
# def wget(host):
# print('wget %s...' % host)
# connect = asyncio.open_connection(host, 80)
# reader, writer = yield from connect
# header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
# writer.write(header.encode('utf-8'))
# yield from writer.drain()
# while True:
# line = yield from reader.readline()
# if line == b'\r\n':
# break
# print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# # Ignore the body, close the socket
# writer.close()
# loop = asyncio.get_event_loop()
# tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
# loop.run_until_complete(asyncio.wait(tasks))
# loop.close()
# async def wget(host):
# print('wget %s...' % host)
# connect = asyncio.open_connection(host, 80)
# reader, writer = await connect
# header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
# writer.write(header.encode('utf-8'))
# await writer.drain()
# while True:
# line = await reader.readline()
# if line == b'\r\n':
# break
# print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# # Ignore the body, close the socket
# writer.close()
# loop = asyncio.get_event_loop()
# tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
# loop.run_until_complete(asyncio.wait(tasks))
# loop.close()
# async def works(x):
# print("sleep : "+str(x), threading.currentThread())
# await asyncio.sleep(9)
# # 由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环
# # 把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行
# # await sleep(1)
# loop = asyncio.get_event_loop()
# tasks = [works(i) for i in list(range(2000))]
# loop.run_until_complete(asyncio.wait(tasks))
# loop.close()