1. 用户通过kill命令来中断
2. 程序满足某种条件中断(比如测试量大于1000则退出)
下面是我的实现:
# -*- coding: utf-8 -*- import re import urllib2 import json import threading import Queue import os import time from time import sleep from threading import Lock from signal import signal,SIGTERM,SIGINT,SIGQUIT class Executor: def __init__(self,size): self.queue = Queue.Queue() self.tasks = [] self.running = True for i in range(size): t = Task(self.queue) t.setDaemon(True) t.start() self.tasks.append(t) self._signal() def _signal(self): signal(SIGTERM,self._exit) signal(SIGINT,self._exit) signal(SIGQUIT,self._exit) def _exit(self,a=None,b=None): print 'clean' self.cancel() def cancel(self): for task in self.tasks: while not task.cancel(): pass self.running = False self.onCancel() def submit(self,call): self.queue.put(call) def join(self): #self.queue.join() queue.join()会阻塞,所以不用 while self.running and not self.queue.empty(): sleep(0.1) if self.cancelTrigger(): self.cancel() def setCancelTrigger(self,cancelTrigger): self.cancelTrigger = cancelTrigger def setOnCancel(self,onCancel): self.onCancel = onCancel class Task(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue self.running = True self.canceled = False def cancel(self): self.canceled=True return self.isCanceled() def isCanceled(self): return self.running==False def run(self): while self.running: call = self.queue.get() call.run() self.queue.task_done() if self.canceled: self.running = False
客户端使用:
host = "http://7.s.duitang.com" thread_count = 10 #并发数 max_count=100 #运行次数 total = 0 fail = 0 avg = 0 lock = Lock() def cancelTrigger(): return total>=max_count def onCancel(): print 'total %s'%total print 'fail %s'%fail print 'avg %s'%(avg/total) if __name__ == "__main__": f = open("napi","r") executor = Executor(thread_count) executor.setCancelTrigger(cancelTrigger) executor.setOnCancel(onCancel) analysis(f.readlines(),executor) executor.join()
在实现的时候比较纠结的点:
1. Task如果不是daemon会导致任务永远不会停止,但是如果Task是daemon线程,main线程结束之后daemon就结束了。所以这时需要实现一个join()来阻塞main线程:
def join(self): #self.queue.join() queue.join()会阻塞,所以不用 while self.running and not self.queue.empty(): sleep(0.1) if self.cancelTrigger(): self.cancel()
2. 线程应该可cancel的,之前是直接修改while isrunning的变量,但这样会导致task其实还没有完成的停止下来。所以对于task我引入两个变量来实现安全的停止。
def cancel(self): for task in self.tasks: while not task.cancel(): pass self.running = False self.onCancel() def cancel(self): self.canceled=True return self.isCanceled() def run(self): while self.running: call = self.queue.get() call.run() self.queue.task_done() if self.canceled: self.running = False
3.对于signal专门写了一片文章记录()