版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qd_ltf/article/details/79703860
一、如何使用多线程、线程间通信、线程间事件通信
# -*- coding: utf-8 -*-
import csv
from xml.etree.ElementTree import Element, ElementTree
import requests
from StringIO import StringIO
from threading import Thread, Event
from Queue import Queue # 安全队列,自动加锁
import os, tarfile
def pretty(e, level=0): # 格式美化
if len(e) > 0: # 子元素的个数
e.text = '\n' + '\t' * (level + 1) # 如果有子元素的值后面添加回车,在下一行添加制表符
for child in e:
pretty(child, level + 1) # 递归调用
child.tail = child.tail[:-1] # 下一层循环结束,进入到上一层级格式时,需要减少一人制表符
e.tail = '\n' + '\t' * level # 如果元没有子元素,则在子元素结束符后面添加回车,下一行 添加制表符
class DownloadThread(Thread): # (I/O)操作
def __init__(self, sid, queue):
Thread.__init__(self)
self.sid = sid
# self.url = 'http://table.finance.yahoo.com/table.csv?s=%s.sz' # 测试链接失效
# self.url %= str(sid).rjust(6, '0')
self.url = 'http://pythonscraping.com/files/MontyPythonAlbums.csv' # 暂用此链接代替
self.queue = queue # 传入Queue对象,实现两个线程间的通信息的媒介
def download(self):
response = requests.get(self.url, timeout=3)
if response.ok:
return StringIO(response.content)
# StringIO经常被用来作为字符串的缓存,应为StringIO有个好处,他的有些接口和文件操作是一致的,
# 也就是说用同样的代码,可以同时当成文件操作或者StringIO操作
def run(self):
data = self.download() # 下载数据
self.queue.put((self.sid, data)) # 下载完数据后,将数据压入Queue对象中
class ConverThread(Thread): # CPU密集型操作
def __init__(self, queue, cEvent, tEvent):
Thread.__init__(self)
self.cEvent = cEvent
self.tEvent = tEvent
self.queue = queue # 传入Queue对象,实现两个线程间的通信息的媒介
def csvToXml(self, scsv, fxml): # csv格式转换为xml
reader = csv.reader(scsv)
headers = reader.next()
headers = map(lambda h: h.replace(' ', ''), headers) # 首行
root = Element('Data')
for row in reader:
eRow = Element('Row')
root.append(eRow)
for tag, text in zip(headers, row):
e = Element(tag)
e.text = text
eRow.append(e)
pretty(root) # 美化样式
et = ElementTree(root) # 生成元素树
et.write(fxml)
def run(self):
count = 0
while True:
sid, data = self.queue.get()
# 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。
# 如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
if sid == -1:
self.tEvent.set()
self.cEvent.wait()
break
if data:
with open(str(sid).rjust(6, '0') + '.xml', 'wb') as wf:
self.csvToXml(data, wf) # csv 转换成Xml
count += 1
if count == 3:
self.tEvent.set() # 启动Tar压缩进程
self.cEvent.wait() # 等待Tar压缩进程cEvent.set(),重新启动进程
self.cEvent.clear() # 保证cEvent.wait()重新生效
count = 0
class TarXml(Thread):
def __init__(self, cEvent, tEvent):
Thread.__init__(self)
self.cEvent = cEvent
self.tEvent = tEvent
self.count = 1
self.setDaemon(True) # 设置为守护线程,其它线程结束后,自动退出
def tarXml(self):
tfname = str(self.count).rjust(3, '0') + '.tgz'
with tarfile.open(tfname, 'w:gz') as tf:
for fname in os.listdir('.'):
if fname.endswith('.xml'):
tf.add(fname)
os.remove(fname) # 删除文件
if not tf.members:
os.remove(tfname) # 如果压缩文件为空,则删除
self.count += 1
def run(self):
c=1
while True:
self.tEvent.wait() # 压缩过程,等待转换进程完成一定数量后的tEvent.set()
self.tarXml() # 开始压缩
self.tEvent.clear() # 让tEvent.wait()进程重新生效
self.cEvent.set() # 结束cEvent.wait()
c+=1
queue = Queue()
cEvent = Event() # 事件对象,用来在线程间进行事件通知
tEvent = Event()
dThreads = [DownloadThread(i, queue) for i in xrange(1, 8)] # 生成一个线程的队列
cThread = ConverThread(queue,cEvent,tEvent) # 生成转换线程
tThread = TarXml(cEvent,tEvent) # 生成转换线程
for t in dThreads:
t.start() # 启动下载线程
cThread.start() # 启动转换线程
tThread.start() # 启动转换线程
for t in dThreads:
t.join() # 阻塞函数,直到每个子线程都调用结束,才进到程序下一步
queue.put((-1, None))
二、如何使用线程本地数据
import os,cv2,time,struct,threading
from BaseHTTPServer import HTTPServer,BaseHTTPRequestHandler
from SocketServer import TCPServer,ThreadingTCPServer
from threading import Thread,RLock
from select import select
class JpegStreamer(Thread):
def __init__(self,camera):
Thread.__init__(self)
self.cap = cv2.VideoCapture(camera)
self.lock =RLock()
self.pipes = {}
def register(self):
pr,pw =os.pipe()
self.lock.acquire()
self.pipes[pr]=pw
pw = self.pipes.pop(pr)
self.lock.release()
return pr
def unrgister(self,pr):
self.lock.acquire()
self.pips.pop(pr)
self.lock.release()
pr.close()
pw.close()
def capture(self):
cap = self.cap
while cap.isOpened():
ret,frame = cap.read()
if ret:
ret,data =cv2.imencode('.jpg',frame,(cv2.IMWRITE_JPEG_QUALITY,40))
yield data.tostring()
def send(self,frame):
n= struct.pack('l',len(frame))
self.lock.acquire()
if len(self.pipes):
_,pipes,_ =select([],self.pipes.itervalues(),[],1)
for pipe in pipes:
os.write(pipe,n)
os.write(pipe,frame)
self.lock.release()
def run(self):
for frame in self.capture():
self.send(frame)
class JpegRetriever(object):
def __init__(self,streamer):
self.streamer= streamer
self.pipe =streamer.register()
def retrieve(self):
while True:
ns= os.read(self.pipe,8)
n=struct.unpack('l',ns)[0]
data = os.read(self.pipe,n)
yield data
def cleanup(self):
self.streamer.unregister(self.pipe)
class Handler(BaseHTTPRequestHandler):
retriever=None
@staticmethod
def setJpegRetriever(retriever):
Handler.retriever=retriever
def do_GET(self):
if self.retriever is None:
raise RuntimeError('no retriever')
if self.path !='/':
return
self.send_response(200)
self.send_header('Content-type','multipart/x-mixed-replace;boundary=abcde')
self.end_headers()
for frame in self.retriever.retrieve():
self.send_frame(frame)
def send_frame(self,frame):
self.wfile.write('--abcde\r\n')
self.wfile.write('Content-Type:image/jpeg\r\n')
self.wfile.write('Content-Length:%d\r\n\r\n'%len(frame))
self.wfile.write(frame)
if __name__ == '__main__':
streamer = JpegRetriever(0)
streamer.start()
retriever=JpegRetriever(streamer)
Handler.setJpegRetriever(retriever)
print 'start server ...'
httpd = TCPServer(('',9000),Handler)
httpd.sever_forever()
五、如何使用线程池
# 待补充
六、如何使用多进程
# 待补充