Twisted-底层自定义协议网络框架

Twisted框架

"""
**Twisted定义**
1.python语言编写的事件驱动网络框架,追求服务器程序性能

**支持多种协议**:——(提供客户端和服务器方面的开发工具)
1. 传输层协议 :UDP / TCP/ TLS
2. 应用层协议 :HTTP / FTP

**底层技术实现** —— 高效通信
1.windows — 基于I/O完成端口(IOCP,input/output completion port)技术
2.Linux — 基于epoll 技术 (多路复用I/O接口)

**Twisted 依赖包** —— 鼓励使用异步
1.Zope Interface
2.PyWin32

**基于TCP编程**
1.无需操作 Socket 的 bin、send、receive 等原语
2.直接用 Twisted 的Protocol、Factory 等类进行编程
3.定义他们的子类并重写 connectMade、dataReceived进行事件化的TCP

Factory类 : 创建 Protocol类的工厂,管理Protocol类
Protocol类 :数据传递协议的类

"""

reactor : 反应堆 — 所有事件都会触发 reactor , 然后开启服务,初始化factory, factory 再初始化 protocol 。
factory : 在框架中负责连接,通信时建立连接,以及连接中断得处理。
Protocol : 通信的内容协议
Factory :工厂模式、在这里生成协议。
Transport : 接收数据、服务器端与客户端的数据收发、处理。
在这里插入图片描述

# 框架安装(下载上面的两个包才可以安装)
pip install twisted

广播系统Protocol代码
管理客户端连接(接收客户端连接)

"""
twisted.internet.protocol.Protocpl : 用于管理客户端的连接,内含如下方法:
	1、connectionMade()   链接建立时有Twisted框架进行调用 —— 常用作系统中注册该链接
	2、dataReceived()	 当收到客户端的数据时,由Twisted框架调用
	3、connectionLost()	当链接断开时 由Twisted框架调用 —— 用于清理链接占用的资源
"""

from twisted.internet.protocal import Protocol
clients = []  # 保存客户端连接

# 定义Protocol子类 Spreader,在其中实现需要重载的方法
class Spreader(Protocol):
	def __init__(self,factory):
		self.factory = factory   # 将实例的 factory 属性指向自己
	   
	# 建立连接时
	def connectionMade(self):
		self.factory.numProtocols = self.factory.numProtocols + 1
		self.transport.write(u"欢迎来到Spread Site,您时第 %d 个客户端用户\n"%(self.factory.numProtocols,).encode('utf8'))
		print("new connect:%d"% self.factory.numProtocols)
		clients.append(self)   # 保存到用户连接列表
	
	# 断开连接时
	def connectionLost(self,reason):
		self.factory.numProtocols = self.factory.numProtocols - 1
		clients.remove(self)
		print("lost connect:%d"%self.factory.numProtocols)
	
	# 接收数据时
	# 发送数据transport.write
	def dataReceived(self,data):
		# 如果客户端发来的是 close 则主动关闭连接
		if data == "close":
			self.transport.loseConnection()
			for client in clients:
				# 轮询当前 client 列表所有客户端
				if client != self :
					# 将数据分发给除自己外的所有客户端
					client.transport.write(data)

Factory子类(管理Protocol子类实例)

"""
twisted.internet.protocol.Factory : 管理Protocol子类
	1、Factory.buildProtocol() —— 当有新的客户端连接时,框架调此方法创建新的Protocol子类的实例
	2、Factory.__init__()	——  将客户端计数器 self.numProticols 设置为 0
"""
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

class SpreadFactory(Factory):
	# 初始化,客户端计数
	def __init__(self):
		self.numProtocols = 0 
		
	def buildProtocol(self,addr):
		# 创建上面的 Protocol 子类 Sperader 
		return Spreader(self)

# 定义服务器监听端口
endpoint = TCP4ServerEndpoint(reactor,8007)
# 指定该端口所绑定子类实例
endpoint.listen(SpreadFactory())
# 启动服务(运行twisted.internet.reactor.run())
reactor.run()

广播客户端(TCP)

客户端编程

"""
基于twisted.internet.protocol.Protocol 客户端编程方法 ——  用于定义客户端得连接状态
twisted.internet.protocol.ClientFactory  客户端编程得类
"""
from twisted.internet.protocol import Protocol,ClientFactory

# Protocol子类Echo中维护状态self.connected,通信逻辑的开发
# 函数routine()中通过判断 连接状态 self.connected 决定是否向服务器发送消息,
class Echo(Protocol):
	# 初始无连接
	def __init__(self):
		self.connected = False
	
	# 连接时调用 - 状态变化
	def connectionMade(self):
		self.connected = True
	
	# 关闭连接时调用 - 状态变化
	def connecttionLost(self,reason):
		self.connected = False
	
	# 接收数据时调用
	def dataReceived(self,data):
		print(data.decode('utf-8'))



"""
twisted.internet.protocol.ClientFactory —— 定义子类 (在客户端管理与服务器链接事件)
	1、继承自 Factory类。
	2、除了Factory 中需要重新定义的 __init__()、buildProtocol() 方法,还需要多定义三个方法
		(1)、__init__() —— 初始化方法
		(2)、startedConnecting() —— 建立链接时调用
		(3)、buildProtocol() —— 创建 Protocol 子类
		(4)、clientConnectionLost() —— 断开连接时调用
		(5)、clientConnectionFailed() —— 连接失败时调用
"""
# 定义ClientFactory子类EchoClientFactory,在客户端 管理 与服务器连接相关事件
# ClientFactory 继承自 Factory,但是比 Factory 多了三个方法 startedConnecting、clientConnectionLost、clientConnectionFailed
class EchoClientFactory(ClientFactory):
	# 初始无protocol子类
	def __init__(self):
		self.protocol = None
	
	# 连接建立时调用
	def startedConnecting(self,connector):
		print('Started to connect')
	
	# 建立protecol 实例,调用上面的Protecol子类Echo进行实例化
	def buildProtocol(self.addr):
		print('Conneted')
		# 调用 Echo()类 (连接状态的类)
		self.protocol = Echo()
		return self.protocol
	
	# 断开连接时被调用
	def clientConnectionLost(self,connector,reason):
		print("lost connection.Reason":reason)
		
	# 连接失败时被调用
	def clientConnectionFailed(self,connector,reason):
		print("connection failed.Reason:",reason)

客户端主程序

from twisted.internet import reactor   # reactor 单例事件管理器
import threading
import fileinput
import time
import sys
import datetime

bStop = False


def routine(factory):    # 每隔 5秒 向服务器发送消息
	while not bStop:
		# 判断连接状态(factory.protocol.connected),决定是否向服务器发送消息
		if factory.protocol and factory.protocol.connected:
			factory.protocol.transport.write("hello")


# 程序启动
host = "127.0.0.1"
port = 8007
factory = EchclientFactory()    # 实例化通信类
reactor.connectTCP(host,port,factory)		# 指定需要连接的服务器地址和端口
threading.Thread(target = routine,args =(factory,)).start()   # 启动线程运行routine()函数
reactor.run()		# 挂起运行
bStop = True		# 通知 routine 线程退出

Twisted - UDP

"""
UDP无连接对等通信协议 —— 无客户端和服务端的概念
只需要定义 DatagramProtocol 子类,无需定义 Factory 子类

TCP - 继承自Twisted.internet.proticol.Protocol
UDP - 继承自twisted.internet.protoco.DatagramProtocol
	1、datagramReceived — 定义收到 UDP 报文后如何处理
"""
# UDP客户端
from twisted.internet.protocol import DatagramProtocol

# 定义 UDP 收到报文后如何处理
class Echo(DatagramProtocol):
	def datagramReceived(self,data,(host,port)):
		print("Got data from:%s:%d"%(host,port))
		print(data.decode('utf-8'))

# 实例化Protocol子类
protocol = Echo()

from twisted.internet import reactor   # reactor 单例事件管理器
import threading
import fileinput
import time
import sys
import datetime

# 程序启动
host = "127.0.0.1"
port = 8007

bStop = False
def routine():
	while not bStop:
		# DatagramProtocol.transport.write()   发送数据,此处的 factory 为 DatagramProtocol 子类的实例化
		factory.protocol.transport.write("hello")
			
threading.Thread(target = routine).start()
# 将Echo 实例化后的实例 protocol, 传入监听
reactor.listenUDP(port,protocol)
# 启动事件循环
reactor.run()
bStop = True

UDP Twisted - Socket

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor   # reactor 单例事件管理器

class Echo(DatagramProtocol):
	def datagramReceived(self,data,(host,port)):
		print("Got data from:%s:%d"%(host,port))
		print(data.decode('utf-8'))
		
protocol = Echo()

host = "127.0.0.1"
port = 8007


# 建立普通的socket对象
portSocket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
portSocket.setblocking(False)      # 设置阻塞模式
portSocket.bind((host,port))

reactor.adoptDatagramPort(portSocket.fileno(),socket.AF_INET,protocol)  # 适配普通SOCKET

portSocket.close()    # 在启动reactor之前关闭SOCKET对象
reactor.run()

Connected UDP

"""
UDP可以调用connect()函数,用于限制只对某地址,端口进行通信
调用connect()函数后,就不需要再指定发哦是那个的地址何端口,在Twisted中被称为 Connected UDP
只增加了 self.transport.connect(host,port
"""
from twisted.internet.protocol import DatagramProtocol

host = "127.0.0.1"
port = 8007

class Echo(DatagramProtocol):
	"""
	startProtocol() —— 当Protocol 实例被第一次作为参数传递给listenUDP()时被调用
	stopProtocol() —— 当所有连接都关闭后被调用
	connectionRefused() —— 当数据发送失败时被调用
	"""
	# 当Protocol 实例被第一次作为参数传递给 listenUDP() 时被调用
	def startProtocol(self):
		self.transport.connect(host,port)
		print("connection created")
	
	# 接收数据时被调用
	def datagramReceived(self,data):
		print(data.decode('utf-8'))
	
	# 每次通信失败后被调用
	def connectionRefused(self):
		print("sent failed")

	# 所有连接被关闭后调用
	def stopProtocol(self):
		print("Connection closed")

# 实例化Protocol子类
protocol = Echo()

from twisted.internet import reactor   # reactor 单例事件管理器
import threading,time,sys,datatime

bStop = False
def routine(factory):
	while not bStop:
		factory.protocol.transport.write("hello")
			
threading.Thread(target = routine,args=(factory,)).start()
reactor.listenUDP(port,Echo())
reactor.run()
bStop = True

UDP组播技术

"""
一个终端发送一条消息时,可以有 多个终端接收并进行处理
IPV4中有一个专有的地址范围用于组播,即 224.0.0.0 ~ 239.255.255.255
组播参与者包括发送者和接收者,在实际发送数据之前需要加入该地址范围内的IP地址,
之后组中的所有终端都可以用UDP方式向族中的其他终端进行发送消息
"""
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor

multicase_ip = "224.0.0.1"  # 组播地址
port = 8001    # 端口

class Multicaset(DatagramProtocol):
	# 当Protocol 实例被第一次作为参数传递给 listenUDP() 时被调用
	def startProtocol(self):
		# 加入组播组
		self.transport.joinGroup(multicast_ip,port) 
		# 组播数据 + 组播IP\端口
		self.transport.write('notify',(multicast_ip,port))

	# 接收数据时被调用
	def datagramReceived(self, datagram, address):
        print "Datagram %s received from %s" % (repr(datagram), repr(address))
        if datagram == "Notify":
            self.transport.write("Acknowlege", address)   # 单播回应


reactor.listenMulticast(port, Multicast(), listenMultiple=True)  # 组播监听
# Protocol.transport.leaveGroup()     退出组播函数
reactor.run()   #挂起运行

延时调用

"""
延时(Defer)机制是Twisted框架中实现异步编程的体系,使程序设计可以采用事件驱动的机制,其目的与作用于Tornado的协程类似
Defer 可以看作是一个管理回调函数的对象,开发者可以向该对象中添加回调函数,同时指定该回调函数何时被调用
Defer 管理两种函数  正常处理函数、错误处理函数
"""
from twisted.internet import reactor,defer

d = defer.Deferred()   # 定义Defer实例


-------------"Defer回调函数添加部分"--------------------------

# 正常处理函数
def printSquare(d):      # 正常处理函数
	print("square of %d is %d" %(d,d*d))

# 错误处理函数
def processError(f):
	print("error when process")

d.addCallback(printSquare)      # 添加正常处理回调函数
d.addErrback(processError)		# 处理错误处理回调函数

d.addCallback(printTwice)      # 可以同时添加多个回调函数,执行时 多个回调函数  按顺序执行


-------------"Defer调用阶段"-------------------------

if sys.argv[1] == "call_error":
	f = failure.Failure(Exception("my exception"))
	d.errback(f)       # 调用错误处理函数processError
else:
	d.callback(4)      # 调用正常处理函数 printSquare(4)

Defer对象详解

"""
1、Defer 编程围绕 twisted.internet.defer.Deferred对象展开
2、callback 回调函数名,		*args,**kw  回调函数参数
3、回调函数最少有一个参数 
"""

addCallback(self,callback,*args,**kw)

def process():   # 不能作为回调函数,因为函数中无参数
	pass

def process(d):   # 可以作为回调函数 ,单个参数
	pass

def callback(d= None):  # 可以作为回调函数,默认 单个参数
	pass

def Add(d,num1,num2,num3):   # 可以作为回调函数,多个参数
	return num1+num2+ num3



"""
errback 回调函数名称 		*args,**kw  回调函数参数   至少有一个参数
当函数被调用时,第1个参数 是一个twisted.python.failure.Failure 的对象实例
"""
addErrback(self,errback,*args,**kw)


# 将 回调函数同时作为正常处理函数和错误处理函数添加到 Defer 对象中
addBoth(self,callback,*args,**kw)


# Defer对象来接函数,用于将另外个Defer对象(即参数d)的正确处理函数和错误处理函数分别添加到本Defer对象中
chainDeferred(self,d)

D1 = defer.Deferred()
D2 = defer.Deferred()

# D1 在被调用的时,导致D2对象中的函数链也被调用
D1.chainDeferred(D2)


# 调用正常处理函数,其中result 是传递给第1个正常处理回调函数的参数
callback(self,result)


# 调用错误处理函数,其中fail是传递给第1个错误处理函数的参数
errback(self,fail=None)

# 为Defer 对象调用链暂停与继续。pause() 函数用于暂停Defer对象中对函数链的调用,直到 unpause() 函数被调用后继续
pause(self) / unpause(self)

结合 Defer与reactor

"""
将Defer对象与 reactor 的延时调用机制结合在在一起,可以开发开发功能强大的有异步调用函数
"""

from twisted.internet import reactor,defer

# 正常处理函数
def printSquare(d):
	print("hello" )
	return d

def printTwice(d):
	print("hello1")
	return d

def makeDefer():
	d = defer.Deferred()   # 定义Defer实例
	d.addCallback(printSquare)   # 添加正常处理回调函数
	d.addCallback(printTwice)	# 添加正常处理回调函数
	# 异步调用函数 reactor.callLater
	reactor.calLater(2,d.callback,5)	# 配置延时调用, 2秒后调用callback函数
	# reactor.callLater(delay,callable,*args,**kw)
	# delay : 定义延时调用的秒数,  callable :回调函数名称 , *args,**kw : 参数

makeDefer()
reactor.run()	# 挂起运行
# 程序执行2秒后,才会调用回调函数进行执行,


# 实现定时退出Twisted消息循环,    reactor.stop可以终止reactor.run() 函数
from twisted.internet import reactor
reactor.callLater(4,reactor.stop)
reactor.run()
print("Program finished")		

多线程的使用

"""
Twisted框架为开发者管理两种线程:一种主线程,另一种是辅线程。主线程 reactor.run()运行,而辅线程可以有多个,以线程池的方式呈现,可配置线程池中辅线程的个数。
主线程 :dataReceived()、connectionLose() 等事件处理函数 由Twisted框架在主线程中调用
"""
from twisted.internet import reactor
import MyProtocol

protocol = MyProtocol()

def must_run_in_main_thread(message):
	protocol.send = true
	protocol.transport.write(message)

def run_in_any_thread():
	#  对非线程安全的Twisted内置函数,可用 reactor.callFromThread() 函数使代码运行在主线程中
	# reactor.callFromThread("调用函数名","参数")
	reactor.callFromThread(must_run_in_main_thread,"Good Morning")
	print("the run of must_run_in_main_thread() has been finished")

使代码运行在辅线程中运行

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor

def long_operation(msg):
	import time
	time.sleep(10)
	print("got message before 10 seconds:",msg)

# 定义 DatagramProtocol 子类
class Echo(DatagramProtocol):
	def  datagramReceived(self,data,(host,port)):
		# 调用 long_operation() 函数,使其在辅线程中执行。本调用在主线中立即返回
		reactor.callInThread(long_operation,data.decode('utf-8'))

# 实例化Protocol子类
protocol = Echo()

reactor.listenUDP(8007,protocol)
# 挂起运行
reactor.run()

配置线程池

# Twisted 使用线程池管理所有辅线程
# reactor.suggestThreadPoolsize() 函数定义线程池中线程的数量
reactor.suggestThreadPoolsize(10)   # 辅线程数量为10

安全信道

"""
SSL 也称TLS,为TCP通信加入 SSL信道可以认证通信主题,保证通信内容私密性和完整性
加密信道SSL通信 :保证玩过中传输内容的私密性,第三方及时监控到全部通信比特流,也无法获取真是内容
认证客户端身份 SSL 通信 :提供客户端身份认证,只 允许有 身份证明的客户端与服务器进行通信
"""
# SSL 通信插件 pyOpenSSL 安装
pip install pyopenssl


# **加密信道的SSL通信**
# 在服务器端需要配置服务器密钥文件和服务器证书文件。
# 密钥文件的内容是服务器的通信密钥,
# 服务器证书文件是通信时发送给客户端的通信凭证。

# 服务器端代码
from twisted.internet import ssl,reactor
from twisted.internet.protocol import Factory,Protocol

class EchoServer(Protocol):  # Protocol子类与普通TCP通信一致
	def dataReceived(self,data):
		self.transport.write(data)


if __name__=='__main__':
	factory = Factory()
	factory.protocol = EchoServer
	# 配置密钥文件和证书文件路径
	reactor.listenSSL(8007,factory,ssl.DefaultOpenSSLContextFacctory('../ssl/server.key','../ssl/server.crt'))



# 客户端编码

from twisted.internet import ssl,reactor
from twisted.internet.protocol import ClientFactory

if __name__=='__main__':
	factory = ClientFactory()
	# 配置密钥文件和证书文件路径
	reactor.connectSSL(8007,factory,ssl.ClientContextFacctory()

认证客户端身份 SSL 通信

# 客户端代码
from twisted.internet import ssl,reactor
from twisted.internet.protocol import ClientFactory

# 定义MySSLContext 类 用于配置客户端SSL证书和密钥
class MySSLContext(ssl.ClientContextFactory):
	def getContext(self):
		# SSL 协议版本
		self.method = SSL.SSLv3_METHOD
		ctx = ssl.ClientContextFactory.getContext(self)
		# 客户端证书
		ctx.use_certificate_file('../ssl/client.crt')
		# 客户端密钥
		ctx.use_privatekey_file('../ssl/client.key')
		return ctx

if __name__=='__main__':
	factory = ClientFactory()
	reactor.connectSSL('localhost',8000,factory,MySSLContext())
	reactor.run()


# 服务器端代码
from OpenSSL import SSL
from twisted.internet import ssl,reactor
from twisted.internet.portocol import Factory,Protocol

class Echo(Protocol):
	def dataReceived(self,data)
		self.transport.write(data)

def verifyCallback(connection,x509,errnum,errdepth,ok):
	print( "_verify(ok=%d):"%ok)   # CA 是否匹配
	print( "sunject:",x509.get_subject())   # 客户端证书subject
	print( "issure:",x509.get_issuer())    # 客户端证书发行方
	print( "errnum %s,errdepth %d" % (errnum,errdepth))    # 错误代码
	return True    # 是否允许通信


if __name__ =='__main__':
	factory = Factory()
	factory.protocol = Echo

	myContextFactory = ssl.DefaultOpenSSLContextFactory('../ssl/client.key','../ssl/client.crt')

	ctx = myContextFactory.getContext()
	ctx.set_verify(
			SSL.VERIFY_PEEK | SSL.VERIFY_FALL_IF_NO_PEER_CERT,
			verifyCallback
			)
	# 客户端证书的发行 CA
	cex.load_cerify_locations("../ssl/ca.crt")
	reactor.listenSSL(8000,factoty,myContextFactory)
reactor.run()
	
发布了50 篇原创文章 · 获赞 3 · 访问量 1797

猜你喜欢

转载自blog.csdn.net/weixin_43056654/article/details/104754477