Twisted框架
"""
**Twisted定义**
1.python语言编写的事件驱动网络框架,追求服务器程序性能
**支持多种协议**:——(提供客户端和服务器方面的开发工具)
1. 传输层协议 :UDP / TCP/ TLS
2. 应用层协议 :HTTP / FTP
**底层技术实现** —— 高效通信
1.windows — 基于I/O完成端口(IOCP,input/output completion port)技术
2.Linux — 基于epoll 技术 (多路复用I/O接口)
**Twisted 依赖包** —— 鼓励使用异步
1.Zope Interface
2.PyWin32
**基于TCP编程**
1.无需操作 Socket 的 bin、send、receive 等原语
2.直接用 Twisted 的Protocol、Factory 等类进行编程
3.定义他们的子类并重写 connectMade、dataReceived进行事件化的TCP
Factory类 : 创建 Protocol类的工厂,管理Protocol类
Protocol类 :数据传递协议的类
"""
reactor : 反应堆 — 所有事件都会触发 reactor , 然后开启服务,初始化factory, factory 再初始化 protocol 。
factory : 在框架中负责连接,通信时建立连接,以及连接中断得处理。
Protocol : 通信的内容协议
Factory :工厂模式、在这里生成协议。
Transport : 接收数据、服务器端与客户端的数据收发、处理。
# 框架安装(下载上面的两个包才可以安装)
pip install twisted
广播系统Protocol代码
管理客户端连接(接收客户端连接)
"""
twisted.internet.protocol.Protocpl : 用于管理客户端的连接,内含如下方法:
1、connectionMade() 链接建立时有Twisted框架进行调用 —— 常用作系统中注册该链接
2、dataReceived() 当收到客户端的数据时,由Twisted框架调用
3、connectionLost() 当链接断开时 由Twisted框架调用 —— 用于清理链接占用的资源
"""
from twisted.internet.protocal import Protocol
clients = [] # 保存客户端连接
# 定义Protocol子类 Spreader,在其中实现需要重载的方法
class Spreader(Protocol):
def __init__(self,factory):
self.factory = factory # 将实例的 factory 属性指向自己
# 建立连接时
def connectionMade(self):
self.factory.numProtocols = self.factory.numProtocols + 1
self.transport.write(u"欢迎来到Spread Site,您时第 %d 个客户端用户\n"%(self.factory.numProtocols,).encode('utf8'))
print("new connect:%d"% self.factory.numProtocols)
clients.append(self) # 保存到用户连接列表
# 断开连接时
def connectionLost(self,reason):
self.factory.numProtocols = self.factory.numProtocols - 1
clients.remove(self)
print("lost connect:%d"%self.factory.numProtocols)
# 接收数据时
# 发送数据transport.write
def dataReceived(self,data):
# 如果客户端发来的是 close 则主动关闭连接
if data == "close":
self.transport.loseConnection()
for client in clients:
# 轮询当前 client 列表所有客户端
if client != self :
# 将数据分发给除自己外的所有客户端
client.transport.write(data)
Factory子类(管理Protocol子类实例)
"""
twisted.internet.protocol.Factory : 管理Protocol子类
1、Factory.buildProtocol() —— 当有新的客户端连接时,框架调此方法创建新的Protocol子类的实例
2、Factory.__init__() —— 将客户端计数器 self.numProticols 设置为 0
"""
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor
class SpreadFactory(Factory):
# 初始化,客户端计数
def __init__(self):
self.numProtocols = 0
def buildProtocol(self,addr):
# 创建上面的 Protocol 子类 Sperader
return Spreader(self)
# 定义服务器监听端口
endpoint = TCP4ServerEndpoint(reactor,8007)
# 指定该端口所绑定子类实例
endpoint.listen(SpreadFactory())
# 启动服务(运行twisted.internet.reactor.run())
reactor.run()
广播客户端(TCP)
客户端编程
"""
基于twisted.internet.protocol.Protocol 客户端编程方法 —— 用于定义客户端得连接状态
twisted.internet.protocol.ClientFactory 客户端编程得类
"""
from twisted.internet.protocol import Protocol,ClientFactory
# Protocol子类Echo中维护状态self.connected,通信逻辑的开发
# 函数routine()中通过判断 连接状态 self.connected 决定是否向服务器发送消息,
class Echo(Protocol):
# 初始无连接
def __init__(self):
self.connected = False
# 连接时调用 - 状态变化
def connectionMade(self):
self.connected = True
# 关闭连接时调用 - 状态变化
def connecttionLost(self,reason):
self.connected = False
# 接收数据时调用
def dataReceived(self,data):
print(data.decode('utf-8'))
"""
twisted.internet.protocol.ClientFactory —— 定义子类 (在客户端管理与服务器链接事件)
1、继承自 Factory类。
2、除了Factory 中需要重新定义的 __init__()、buildProtocol() 方法,还需要多定义三个方法
(1)、__init__() —— 初始化方法
(2)、startedConnecting() —— 建立链接时调用
(3)、buildProtocol() —— 创建 Protocol 子类
(4)、clientConnectionLost() —— 断开连接时调用
(5)、clientConnectionFailed() —— 连接失败时调用
"""
# 定义ClientFactory子类EchoClientFactory,在客户端 管理 与服务器连接相关事件
# ClientFactory 继承自 Factory,但是比 Factory 多了三个方法 startedConnecting、clientConnectionLost、clientConnectionFailed
class EchoClientFactory(ClientFactory):
# 初始无protocol子类
def __init__(self):
self.protocol = None
# 连接建立时调用
def startedConnecting(self,connector):
print('Started to connect')
# 建立protecol 实例,调用上面的Protecol子类Echo进行实例化
def buildProtocol(self.addr):
print('Conneted')
# 调用 Echo()类 (连接状态的类)
self.protocol = Echo()
return self.protocol
# 断开连接时被调用
def clientConnectionLost(self,connector,reason):
print("lost connection.Reason":reason)
# 连接失败时被调用
def clientConnectionFailed(self,connector,reason):
print("connection failed.Reason:",reason)
客户端主程序
from twisted.internet import reactor # reactor 单例事件管理器
import threading
import fileinput
import time
import sys
import datetime
bStop = False
def routine(factory): # 每隔 5秒 向服务器发送消息
while not bStop:
# 判断连接状态(factory.protocol.connected),决定是否向服务器发送消息
if factory.protocol and factory.protocol.connected:
factory.protocol.transport.write("hello")
# 程序启动
host = "127.0.0.1"
port = 8007
factory = EchclientFactory() # 实例化通信类
reactor.connectTCP(host,port,factory) # 指定需要连接的服务器地址和端口
threading.Thread(target = routine,args =(factory,)).start() # 启动线程运行routine()函数
reactor.run() # 挂起运行
bStop = True # 通知 routine 线程退出
Twisted - UDP
"""
UDP无连接对等通信协议 —— 无客户端和服务端的概念
只需要定义 DatagramProtocol 子类,无需定义 Factory 子类
TCP - 继承自Twisted.internet.proticol.Protocol
UDP - 继承自twisted.internet.protoco.DatagramProtocol
1、datagramReceived — 定义收到 UDP 报文后如何处理
"""
# UDP客户端
from twisted.internet.protocol import DatagramProtocol
# 定义 UDP 收到报文后如何处理
class Echo(DatagramProtocol):
def datagramReceived(self,data,(host,port)):
print("Got data from:%s:%d"%(host,port))
print(data.decode('utf-8'))
# 实例化Protocol子类
protocol = Echo()
from twisted.internet import reactor # reactor 单例事件管理器
import threading
import fileinput
import time
import sys
import datetime
# 程序启动
host = "127.0.0.1"
port = 8007
bStop = False
def routine():
while not bStop:
# DatagramProtocol.transport.write() 发送数据,此处的 factory 为 DatagramProtocol 子类的实例化
factory.protocol.transport.write("hello")
threading.Thread(target = routine).start()
# 将Echo 实例化后的实例 protocol, 传入监听
reactor.listenUDP(port,protocol)
# 启动事件循环
reactor.run()
bStop = True
UDP Twisted - Socket
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor # reactor 单例事件管理器
class Echo(DatagramProtocol):
def datagramReceived(self,data,(host,port)):
print("Got data from:%s:%d"%(host,port))
print(data.decode('utf-8'))
protocol = Echo()
host = "127.0.0.1"
port = 8007
# 建立普通的socket对象
portSocket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
portSocket.setblocking(False) # 设置阻塞模式
portSocket.bind((host,port))
reactor.adoptDatagramPort(portSocket.fileno(),socket.AF_INET,protocol) # 适配普通SOCKET
portSocket.close() # 在启动reactor之前关闭SOCKET对象
reactor.run()
Connected UDP
"""
UDP可以调用connect()函数,用于限制只对某地址,端口进行通信
调用connect()函数后,就不需要再指定发哦是那个的地址何端口,在Twisted中被称为 Connected UDP
只增加了 self.transport.connect(host,port
"""
from twisted.internet.protocol import DatagramProtocol
host = "127.0.0.1"
port = 8007
class Echo(DatagramProtocol):
"""
startProtocol() —— 当Protocol 实例被第一次作为参数传递给listenUDP()时被调用
stopProtocol() —— 当所有连接都关闭后被调用
connectionRefused() —— 当数据发送失败时被调用
"""
# 当Protocol 实例被第一次作为参数传递给 listenUDP() 时被调用
def startProtocol(self):
self.transport.connect(host,port)
print("connection created")
# 接收数据时被调用
def datagramReceived(self,data):
print(data.decode('utf-8'))
# 每次通信失败后被调用
def connectionRefused(self):
print("sent failed")
# 所有连接被关闭后调用
def stopProtocol(self):
print("Connection closed")
# 实例化Protocol子类
protocol = Echo()
from twisted.internet import reactor # reactor 单例事件管理器
import threading,time,sys,datatime
bStop = False
def routine(factory):
while not bStop:
factory.protocol.transport.write("hello")
threading.Thread(target = routine,args=(factory,)).start()
reactor.listenUDP(port,Echo())
reactor.run()
bStop = True
UDP组播技术
"""
一个终端发送一条消息时,可以有 多个终端接收并进行处理
IPV4中有一个专有的地址范围用于组播,即 224.0.0.0 ~ 239.255.255.255
组播参与者包括发送者和接收者,在实际发送数据之前需要加入该地址范围内的IP地址,
之后组中的所有终端都可以用UDP方式向族中的其他终端进行发送消息
"""
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
multicase_ip = "224.0.0.1" # 组播地址
port = 8001 # 端口
class Multicaset(DatagramProtocol):
# 当Protocol 实例被第一次作为参数传递给 listenUDP() 时被调用
def startProtocol(self):
# 加入组播组
self.transport.joinGroup(multicast_ip,port)
# 组播数据 + 组播IP\端口
self.transport.write('notify',(multicast_ip,port))
# 接收数据时被调用
def datagramReceived(self, datagram, address):
print "Datagram %s received from %s" % (repr(datagram), repr(address))
if datagram == "Notify":
self.transport.write("Acknowlege", address) # 单播回应
reactor.listenMulticast(port, Multicast(), listenMultiple=True) # 组播监听
# Protocol.transport.leaveGroup() 退出组播函数
reactor.run() #挂起运行
延时调用
"""
延时(Defer)机制是Twisted框架中实现异步编程的体系,使程序设计可以采用事件驱动的机制,其目的与作用于Tornado的协程类似
Defer 可以看作是一个管理回调函数的对象,开发者可以向该对象中添加回调函数,同时指定该回调函数何时被调用
Defer 管理两种函数 正常处理函数、错误处理函数
"""
from twisted.internet import reactor,defer
d = defer.Deferred() # 定义Defer实例
-------------"Defer回调函数添加部分"--------------------------
# 正常处理函数
def printSquare(d): # 正常处理函数
print("square of %d is %d" %(d,d*d))
# 错误处理函数
def processError(f):
print("error when process")
d.addCallback(printSquare) # 添加正常处理回调函数
d.addErrback(processError) # 处理错误处理回调函数
d.addCallback(printTwice) # 可以同时添加多个回调函数,执行时 多个回调函数 按顺序执行
-------------"Defer调用阶段"-------------------------
if sys.argv[1] == "call_error":
f = failure.Failure(Exception("my exception"))
d.errback(f) # 调用错误处理函数processError
else:
d.callback(4) # 调用正常处理函数 printSquare(4)
Defer对象详解
"""
1、Defer 编程围绕 twisted.internet.defer.Deferred对象展开
2、callback 回调函数名, *args,**kw 回调函数参数
3、回调函数最少有一个参数
"""
addCallback(self,callback,*args,**kw)
def process(): # 不能作为回调函数,因为函数中无参数
pass
def process(d): # 可以作为回调函数 ,单个参数
pass
def callback(d= None): # 可以作为回调函数,默认 单个参数
pass
def Add(d,num1,num2,num3): # 可以作为回调函数,多个参数
return num1+num2+ num3
"""
errback 回调函数名称 *args,**kw 回调函数参数 至少有一个参数
当函数被调用时,第1个参数 是一个twisted.python.failure.Failure 的对象实例
"""
addErrback(self,errback,*args,**kw)
# 将 回调函数同时作为正常处理函数和错误处理函数添加到 Defer 对象中
addBoth(self,callback,*args,**kw)
# Defer对象来接函数,用于将另外个Defer对象(即参数d)的正确处理函数和错误处理函数分别添加到本Defer对象中
chainDeferred(self,d)
D1 = defer.Deferred()
D2 = defer.Deferred()
# D1 在被调用的时,导致D2对象中的函数链也被调用
D1.chainDeferred(D2)
# 调用正常处理函数,其中result 是传递给第1个正常处理回调函数的参数
callback(self,result)
# 调用错误处理函数,其中fail是传递给第1个错误处理函数的参数
errback(self,fail=None)
# 为Defer 对象调用链暂停与继续。pause() 函数用于暂停Defer对象中对函数链的调用,直到 unpause() 函数被调用后继续
pause(self) / unpause(self)
结合 Defer与reactor
"""
将Defer对象与 reactor 的延时调用机制结合在在一起,可以开发开发功能强大的有异步调用函数
"""
from twisted.internet import reactor,defer
# 正常处理函数
def printSquare(d):
print("hello" )
return d
def printTwice(d):
print("hello1")
return d
def makeDefer():
d = defer.Deferred() # 定义Defer实例
d.addCallback(printSquare) # 添加正常处理回调函数
d.addCallback(printTwice) # 添加正常处理回调函数
# 异步调用函数 reactor.callLater
reactor.calLater(2,d.callback,5) # 配置延时调用, 2秒后调用callback函数
# reactor.callLater(delay,callable,*args,**kw)
# delay : 定义延时调用的秒数, callable :回调函数名称 , *args,**kw : 参数
makeDefer()
reactor.run() # 挂起运行
# 程序执行2秒后,才会调用回调函数进行执行,
# 实现定时退出Twisted消息循环, reactor.stop可以终止reactor.run() 函数
from twisted.internet import reactor
reactor.callLater(4,reactor.stop)
reactor.run()
print("Program finished")
多线程的使用
"""
Twisted框架为开发者管理两种线程:一种主线程,另一种是辅线程。主线程 reactor.run()运行,而辅线程可以有多个,以线程池的方式呈现,可配置线程池中辅线程的个数。
主线程 :dataReceived()、connectionLose() 等事件处理函数 由Twisted框架在主线程中调用
"""
from twisted.internet import reactor
import MyProtocol
protocol = MyProtocol()
def must_run_in_main_thread(message):
protocol.send = true
protocol.transport.write(message)
def run_in_any_thread():
# 对非线程安全的Twisted内置函数,可用 reactor.callFromThread() 函数使代码运行在主线程中
# reactor.callFromThread("调用函数名","参数")
reactor.callFromThread(must_run_in_main_thread,"Good Morning")
print("the run of must_run_in_main_thread() has been finished")
使代码运行在辅线程中运行
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
def long_operation(msg):
import time
time.sleep(10)
print("got message before 10 seconds:",msg)
# 定义 DatagramProtocol 子类
class Echo(DatagramProtocol):
def datagramReceived(self,data,(host,port)):
# 调用 long_operation() 函数,使其在辅线程中执行。本调用在主线中立即返回
reactor.callInThread(long_operation,data.decode('utf-8'))
# 实例化Protocol子类
protocol = Echo()
reactor.listenUDP(8007,protocol)
# 挂起运行
reactor.run()
配置线程池
# Twisted 使用线程池管理所有辅线程
# reactor.suggestThreadPoolsize() 函数定义线程池中线程的数量
reactor.suggestThreadPoolsize(10) # 辅线程数量为10
安全信道
"""
SSL 也称TLS,为TCP通信加入 SSL信道可以认证通信主题,保证通信内容私密性和完整性
加密信道SSL通信 :保证玩过中传输内容的私密性,第三方及时监控到全部通信比特流,也无法获取真是内容
认证客户端身份 SSL 通信 :提供客户端身份认证,只 允许有 身份证明的客户端与服务器进行通信
"""
# SSL 通信插件 pyOpenSSL 安装
pip install pyopenssl
# **加密信道的SSL通信**
# 在服务器端需要配置服务器密钥文件和服务器证书文件。
# 密钥文件的内容是服务器的通信密钥,
# 服务器证书文件是通信时发送给客户端的通信凭证。
# 服务器端代码
from twisted.internet import ssl,reactor
from twisted.internet.protocol import Factory,Protocol
class EchoServer(Protocol): # Protocol子类与普通TCP通信一致
def dataReceived(self,data):
self.transport.write(data)
if __name__=='__main__':
factory = Factory()
factory.protocol = EchoServer
# 配置密钥文件和证书文件路径
reactor.listenSSL(8007,factory,ssl.DefaultOpenSSLContextFacctory('../ssl/server.key','../ssl/server.crt'))
# 客户端编码
from twisted.internet import ssl,reactor
from twisted.internet.protocol import ClientFactory
if __name__=='__main__':
factory = ClientFactory()
# 配置密钥文件和证书文件路径
reactor.connectSSL(8007,factory,ssl.ClientContextFacctory()
认证客户端身份 SSL 通信
# 客户端代码
from twisted.internet import ssl,reactor
from twisted.internet.protocol import ClientFactory
# 定义MySSLContext 类 用于配置客户端SSL证书和密钥
class MySSLContext(ssl.ClientContextFactory):
def getContext(self):
# SSL 协议版本
self.method = SSL.SSLv3_METHOD
ctx = ssl.ClientContextFactory.getContext(self)
# 客户端证书
ctx.use_certificate_file('../ssl/client.crt')
# 客户端密钥
ctx.use_privatekey_file('../ssl/client.key')
return ctx
if __name__=='__main__':
factory = ClientFactory()
reactor.connectSSL('localhost',8000,factory,MySSLContext())
reactor.run()
# 服务器端代码
from OpenSSL import SSL
from twisted.internet import ssl,reactor
from twisted.internet.portocol import Factory,Protocol
class Echo(Protocol):
def dataReceived(self,data)
self.transport.write(data)
def verifyCallback(connection,x509,errnum,errdepth,ok):
print( "_verify(ok=%d):"%ok) # CA 是否匹配
print( "sunject:",x509.get_subject()) # 客户端证书subject
print( "issure:",x509.get_issuer()) # 客户端证书发行方
print( "errnum %s,errdepth %d" % (errnum,errdepth)) # 错误代码
return True # 是否允许通信
if __name__ =='__main__':
factory = Factory()
factory.protocol = Echo
myContextFactory = ssl.DefaultOpenSSLContextFactory('../ssl/client.key','../ssl/client.crt')
ctx = myContextFactory.getContext()
ctx.set_verify(
SSL.VERIFY_PEEK | SSL.VERIFY_FALL_IF_NO_PEER_CERT,
verifyCallback
)
# 客户端证书的发行 CA
cex.load_cerify_locations("../ssl/ca.crt")
reactor.listenSSL(8000,factoty,myContextFactory)
reactor.run()