python使用SocketServer实现网络服务器
SocketServer简化了网络服务器的编写。在进行socket创建时,使用SocketServer可以大量减少创建的步骤,并且SocketServer使用了select,它有四个类:TCPServer, UDPServer, UnixStreamServer和UnixDatagrmServer。这4个类是同步进行处理的,另外通告ForkingMixln和ThreadingMixln类支持异步处理。
它有4个同步类:
TCPServer
UDPServer
UnixStreamServer
UnixDatagramServer
两个Mixin类,用来支持异步。
ForkingMixIn
ThreadingMixIn
组合
class ForkingUDPServer(ForkingMixIn,UDPServer):pass
class ForkingTCPServer(ForkingMixIn,TCPServer):pass
class ThreadingUDPServer(ThreadingMixIn,UDPServer):pass
class ThreadingTCPServer(ThreadingMixIn,TCPServer):pass
1)fork是创建多进程,thread是创建多线程。
2)fork需要操作系统支持,Windows不支持。
3)ThreadingUDPServer与ThreadingTCPServer类中的特有属性:
- daemon_threads=False #默认值是False表示创建的线程都不是daemon线程,改为True表示创建的所有线程都是daemon线程
- block_on_close=False #默认值为Fasle,如果为True,可以设置为守护线程3.7版本可以用
类的继承关系:
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
使用SocketServer的步骤简介
1) 创建服务器的步骤。首先,要创建一个请求处理类,它是BaseRequestHandler的子类并重载它的handle()方法。
2) 实例化一个服务器类,传入服务器的地址和请求处理程序类。
- socketserver.BaseServer(server_address, RequestHandlerClass)
- server_address :服务器绑定的地址信息,是一个元组(ip,prost)
- RequestHandlerClass:必须是BaseRequestHandler的一个子类。
- BaseServer中源代码如下:
def __init__(self, server_address, RequestHandlerClass):
"""构造函数.可以被扩展,但不要重写"""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
# 处理请求的方法,会实例化一个RequestHandlerClass对象
def finish_request(self, request, client_address):
""" 通过实力化一个RequestHandlerClass来完成一个请求 """
self.RequestHandlerClass(request, client_address, self)
3)调用handle_request()(一般是调用其它事件循环或者使用select()或serve_forever())。
继承ThreadingMixln类时需要处理异常关闭。daemon_threads指示服务器是否要等待线程终止,要是线程相互独立,必须要设置为True,默认为False。
无论用什么协议,服务器类具有相同的外部方法和属性。
4)BaseServer中定义的接口常用属性和方法:
server_address | 服务器正在监听的地址和端口,在不同协议格式不一样。Internet协议上是一个元组(“127.0.0.1”,80) |
socket | 服务器正在监听的套接字对象 |
request_queue_size | 请求队列的大小。如果处理单个请求需要很长时间,那么在服务器繁忙时到达的任何请求都会被放入队列中,直到request_queue_size请求为止。一旦队列满了,来自客户机的进一步请求将得到一个“连接被拒绝”错误。默认值通常是5,但是可以被子类覆盖。 |
address_family | 服务器套接字所属的协议族。常见的例子是套接字。AF_INET socket.AF_UNIX。 |
socket_type | 服务器使用的套接字类型;套接字。SOCK_STREAM套接字。SOCK_DGRAM是两个常见的值。 |
timeout | 超时持续时间,以秒为单位度量,如果不需要超时,则为None。如果handle_request()在超时期间没有收到传入的请求,则调用handle_timeout()方法。 |
handle_request() | 处理单个请求,同步执行。这个函数按顺序调用以下方法:get_request()、verify_request()和process_request()。如果处理程序类的用户提供的handle()方法引发异常,将调用服务器的handle_error()方法。如果在超时秒内没有收到任何请求,那么将调用handle_timeout()并返回handle_request()。 |
server_forever(poll_interval=0.5) | 异步执行,处理请求。每隔poll_interval秒轮询一次。 忽略timeout属性,还会调用service_actions(),在ForkingMixIn的子类中定义,可以用来清理僵尸进程。 |
shutdown() | 告诉serve_forever循环停止。并等待他结束。 |
finish_request(request,client_address) | 通过实例化RequestHandlerClass并调用它的handle()方法来处理请求。 |
server_bind() | 由服务器的构造函数调用,以将套接字绑定到所需的地址。可能会被覆盖。 |
verify_request(request,client_address) | 必须返回一个布尔值;如果值为True,请求将被处理,如果值为False,请求将被拒绝。可以重写此函数来实现服务器的访问控制。默认实现总是返回True。 |
5)BaseRequestHandler类是和用户连接的用户请求处理类的基类
构造函数:BaseRequestHandler(request,client_address,server)
服务端Server实例接收用户请求后,最后会实例化这个类。它被初始化时,送入3个构造参数:request, client_address, server自身
- request:是和客户端的连接的socket对象
- client_address:是客户端地址
- server:是TCPServer实例本身
以后就可以在BaseRequestHandler类的实例上使用以下属性:
- self.request:是和客户端的连接的socket对象
- self.client_address:客户端地址
- self.server:TCPServer实例本身
这个类在初始化的时候,它会依次调用3个方法setup(), handle()和finish()。子类可以覆盖这些方法。
通常只需要重载handle。self.request的类型和数据报或流的服务不同。对于流服务,self.request是socket对象;对于数据报服务,self.request是字符串和socket。可以在子类StreamRequestHandler或者DatagramRequesthandler中重载,重写setup和finish,并提供self.rfile和self.wfile属性。self.rfile和self.wfile可以读取或写入,以获得请求数据或将数据返回到客户端。
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self): #每一个连接初始化
pass
def handle(self): #每一次请求处理
pass
def finish(self): #每一个连接清理
pass
测试代码:
[blctrl@main-machine python_dir]$ cat server2.py
#!/bin/env python3
import socketserver
import socket
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
"""
setup():准备请求处理,默认什么都不做,
StreamRequestHandler中会创建文件类似的对象以读写socket。
"""
def setup(self):
super().setup() #可以不调用父类的setup()方法,父类的setup方法什么都没做
print("----setup方法被执行-----")
"""
handle():处理请求。解析传入的请求,处理数据,并发送响应。
默认什么都不做 。常用变量:self.request, self.client_address,self.server。
"""
def handle(self):
super().handle() #可以不调用父类的handler(),方法,父类的handler方法什么都没做
print("-------handler方法被执行----")
print("1:self.server");
print(self.server)
print("\n");
print("2:self.request");
print(self.request) #服务
print("\n");
print("3:self.clent_address");
print(self.client_address) #客户端地址
print("\n");
print("4:self.__dict__");
print(self.__dict__)
print("\n");
print("5:");
print("- " * 50)
print("\n");
print("6:self.server.__dict__");
print(self.server.__dict__)
print("\n");
print("- " * 50)
# 类型说明,sk的类型是socket.socket
sk:socket.socket = self.request
print(sk)
print("waiting for request: ");
data = sk.recv(1024)
print("7:data");
print(data)
print("\n");
print("8:");
sk.send("{}-{}\n".format(sk.getpeername(),data).encode())
print("----------handler end ----------\n")
"""
finish():环境清理。默认什么都不做,如果setup产生异常,不会执行finish()。
"""
def finish(self):
super().finish() #可以不调用父类的finish(),方法,父类的finish方法什么都没做
print("--------finish方法被执行---")
laddr = "127.0.0.1",6666
#注意:参数是MyBaseRequestHandle
tcpserver = socketserver.TCPServer(laddr,MyBaseRequestHandle)
#tcpserver.handle_request() #只接受一个客户端连接
tcpserver.serve_forever() #永久循环执行,可以接受多个客户端连接
- 每个不同的连接上的请求过来后,生成这个连接的socket对象即self.request,客户端地址是self.client_address。
- 将ThreadingTCPServer换成TCPServer,当每个客户端连接进来就会创建一个新的线程。
- ThreadingTCPServer是异步的,可以同时处理多个连接。
- TCPServer是同步的,一个连接处理完了,即一个连接的handle方法执行完了,才能处理另一个连接,且只有主线程。
服务器类型: 5种
BaseServer, TCPServer, UnixStreamServer, UDPServer, UnixDatagramServer。注意:BaseServer不直接对外服务。
服务器对象:
- class SocketServer.BaseServer:这是模块中的所有服务器对象的超类。它定义接口,但是大多数方法未被实现,在子类中进行细化。
- BaseServer.fileno():返回服务器监听套接字的整数文件描述符。通常用来传递给select.select()。以允许一个进程监视多个服务器。
- BaseServer.handle_request():处理单个请求。处理顺序:get_request(), verify_request(), process_request()。如果用户提供handle()方法抛出异常,将调用服务器的handle_error()方法。如果self.timeout内没有收到请求,将调用handle_timeout()并返回handle_request()。
- BaseServer.serve_forever(poll_interval=0.5):处理请求,直接一个明确的shutdown()请求。没poll_intervall秒轮询一次shutdown。忽略self.timeout。如果你需要左周期性的任务,建议放置在其它线程。
- BaseServer.shutdown():高速serve_forever()循环停止并等待其停止。python2.6版本。
- BaseServer.address_family:地址家族,比如socket.AF_INET或socket.AF_UNIX。
- BaseServer.ReqeustHandlerClass:用户提供的请求处理类,这个类为每个请求创建实例。
- BaseServer.server_address:服务器侦听的地址。格式根据协议家族地址各不相同。
- BaseServer.socketSocket:服务器上侦听传入的请求socket对象的服务器。
服务器类支持下面的类变量
- BaseServer.allow_reuse_address:服务器是否允许地址的重用。默认为false,并且可在子类中更改。
- BaseServer.reqeust_queue_size:请求队列的大小。如果单个请求需要很长的时间来处理,服务器忙时请求被放置到队列中,最多可以放request_queue_size个。一旦队列已满,来自客户端的请求将得到"Connection denied"错误。默认通常为5,但可以被子类覆盖。
- BaseServer.socket_type:服务区使用的套接字类型:socket.SOCK_STREAM和socket.SOCK_DGRAM等。
- BaseServer.timeout:超时时间,以秒为单位,或None表示没有超时。如果handle_request()在timeout内没有收到请求,将调用handle_timeout()。
下面方法可以被子类重载,它们对服务器对象的外部用户没有影响。
- BaseServer.finish_request():实际处理RequestHandlerClass发起的请求并调用其handle()方法。
- BaseServer.get_request():接受socket请求,并返回二元组,包含要用于与客户端通信的新socket对象,以及客户端的地址。
- BaseServer.handle_error(request, client_address):如果RequestHandlerClass的handle()方法抛出异常时调用。默认操作是打印traceback到标准输出,并继续处理其它请求。
- BaseServer.handle_timeout():超时处理吗。默认对于forking服务器是收集退出的子进程状态,对于threading服务器,则什么都不做。
- BaseServer.server_active():通过服务器的构造函数来激活服务器。默认的行为只是监听服务器陶杰字。可重载。
- BaseServer.server_bind():通过服务器的构造函数中调用绑定socket到所需的地址。可重载。
- BaseServer.verify_request(request, client_addr):返回一个布尔值,如果该值为True,则该请求将被处理,反之请求将被拒绝。可以重写此功能来实现对服务器的访问控制。默认的实现始终返回True。client_address可以限定客户端,比如只处理指定ip区间的请求。
创建服务器步骤总结:
- 从BaseRequestHandler类派生出子类,并覆盖其handle()方法来创建请求处理程序类,此方法将处理传入请求
- 实例化一个服务器类,传参服务器的地址和请求处理类
- 调用服务器实例的handle_request()或serve_forever()方法
- 调用server_close()关闭套接字
测试案列
服务器为一个时间戳服务器,在接受到客户端发来的数据后,自动回复。
客户端,等待用户输入,回车后向服务器发送用户输入的内容。
分别在python2.7和python3.6下测试。在启动时需要先启动服务器端,再启动客户端。
python2.7下服务器端:
#coding:utf-8
import SocketServer
from time import ctime
print("=====================SocketServer TCP服务器=====================");
HOST = '' #主机号为空白表示可以使用任何可用的地址。
PORT = 6666 #端口号
ADDR = (HOST, PORT)
#StreamRequestHandler实现
class MyRequestHandler(SocketServer.StreamRequestHandler): TCP/UDP服务器的服务处理器
def handle(self): #重写接收响应函数
print('...connect from:', self.client_address)
data = self.rfile.readline().strip()
print(data)
self.wfile.write('[%s] %s' % (ctime(), data))
tcpSerSock = SocketServer.TCPServer(ADDR, MyRequestHandler)
print('等待连接...')
tcpSerSock.serve_forever()
python2.7下客户端:
#coding:utf-8
from socket import *
print("=====================SocketServer TCP客户端=====================");
HOST = '127.0.0.1' #本机测试
PORT = 6666
BUFSIZ = 1024
ADDR = (HOST, PORT)
while True:
tcpCliSock = socket(AF_INET, SOCK_STREAM) #创建客户端流式套接字
tcpCliSock.connect(ADDR) #发起TCP连接
data = raw_input('> ') #接收用户输入
if not data: #如果用户输入为空,直接回车就会发送"",""就是代表false
break
tcpCliSock.send(data+'\n') #客户端发送消息,必须发送字节数组
data = tcpCliSock.recv(BUFSIZ) #接收回应消息,接收到的是字节数组
if not data: #如果接收服务器信息失败,或响应消息为空
break
print(data) #打印回应消息
tcpCliSock.close() #关闭客户端socket
python3.6下
SocketServer模块在python3中已经更名为socketserver。
服务器端代码:
#coding:utf-8
import socketserver
from time import ctime
print("=====================SocketServer TCP服务器=====================");
HOST = '' #主机号为空白表示可以使用任何可用的地址。
PORT = 6666 #端口号
ADDR = (HOST, PORT)
#StreamRequestHandler实现TCP/UDP服务器的服务处理器
class MyRequestHandler(socketserver.StreamRequestHandler):
def handle(self): #重写接收响应函数
print('连接到:', self.client_address)
data = self.rfile.readline().strip()
print(data)
self.wfile.write(bytes('[%s] %s' % (ctime(), data.decode('utf-8')),'utf-8'))
tcpSerSock = socketserver.TCPServer(ADDR, MyRequestHandler)
print('等待连接...')
tcpSerSock.serve_forever()
启动运行服务端程序:
[blctrl@main-machine python_dir]$ chmod +x server.py
[blctrl@main-machine python_dir]$ ./server.py
=====================SocketServer TCP服务器=====================
等待连接...
连接到: ('127.0.0.1', 54896)
b'hello'
连接到: ('127.0.0.1', 54898)
b'world'
连接到: ('127.0.0.1', 54900)
b'good morning'
连接到: ('127.0.0.1', 54902)
客户端代码:
#coding:utf-8
from socket import *
print("=====================SocketServer TCP客户端=====================");
HOST = '127.0.0.1' #本机测试
PORT = 6666
BUFSIZ = 1024
ADDR = (HOST, PORT)
while True:
tcpCliSock = socket(AF_INET, SOCK_STREAM) #创建客户端套接字
tcpCliSock.connect(ADDR) #发起TCP连接
data = input('> ') #接收用户输入
if not data: #如果用户输入为空,直接回车就会发送"",""就是代表false
break
tcpCliSock.send(bytes(data+'\n','utf-8')) #客户端发送消息,必须发送字节数组
buffer = tcpCliSock.recv(BUFSIZ) #接收回应消息,接收到的是字节数组
if not buffer: #如果接收服务器信息失败,或响应消息为空
break
print(str(buffer,'utf-8')) #打印回应消息
tcpCliSock.close() #关闭客户端socket
启动运行客户端程序:
[blctrl@main-machine python_dir]$ vim client.py
[blctrl@main-machine python_dir]$ chmod +x client.py
[blctrl@main-machine python_dir]$ ./client.py
=====================SocketServer TCP客户端=====================
> hello
[Sat Apr 1 23:55:33 2023] hello
> world
[Sat Apr 1 23:55:40 2023] world
> good morning
[Sat Apr 1 23:55:52 2023] good morning
>
模拟一个基于消息的设备
#!/usr/bin/python
#
# 模拟一个基于消息的设备
#
import socketserver
import os
import pprint
import random
import sys
# 对传入的v值进行上下限制,大于10,返回10,小于-10,返回-10,-10和10之间返回传入的v值
def clip(v):
if (v > 10.0): return 10
if (v < -10.0): return -10
return v
# 建立一个字典
status = dict()
# 命令行参数中是否存在--verbose字符串
verbose = "--verbose" in sys.argv
# 一个类,它继承了socketserver.StreamRequestHandler类
class DummyDevice(socketserver.StreamRequestHandler):
def handle(self):
global status
global verbose
# 客户端地址
client = self.client_address[0]
if client in status:
voltage = status[client]['volts']
on = status[client]['on']
else:
voltage = 0
on = False
status[client] = { }
status[client]['volts'] = voltage
status[client]['on'] = on
while True:
line = self.rfile.readline()
line = line.decode()
line = line.strip()
args = line.split()
if(len(line) <= 0):
break
reply = None
if verbose:
print ("> " + line)
if (line == '*IDN?'):
reply = '*IDN US-PAS Instrument, Yoyodyne Inc. -- An ACME Industries subsidiary, S/N:1313'
elif (line == 'ON?'):
reply = 'ON ' + ('1' if on else '0')
elif (line == 'VOLTS?'):
reply = 'VOLTS %.4f' % (voltage)
elif (line == 'CURR?'):
ma = voltage + random.gauss(0, 1) if on else 0
reply = 'CURR %.5g' % (ma)
elif (line == 'LOAD?'):
load = os.getloadavg()
reply = 'LOAD %.5g %.5g %.5g' % (load[0], load[1], load[2])
elif (len(args) > 1):
try:
val = float(args[1])
if (args[0] == 'ON'):
if args[1] == '1':
on = True
reply = 'ON 1'
elif args[1] == '0':
on = False
reply = 'ON 0'
elif (args[0] == 'VOLTS'):
voltage = clip(val)
reply = 'VOLTS ' + str(float(voltage))
status[client]['volts'] = voltage
status[client]['on'] = on
except:
pass
if (reply):
bytes = (reply + '\r\n').encode()
self.wfile.write(bytes)
if verbose:
print ("< " + reply)
class Server(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True
"""
BaseServer.allow_reuse_address:服务器是否允许地址的重用。默认为false,并且可在子类中更改。
"""
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass):
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass)
server = Server(('0.0.0.0', 24742), DummyDevice)
print("Serving on TCP 24742")
print("Terminate with Ctrl-C")
try:
server.serve_forever()
except KeyboardInterrupt:
sys.exit(0)
运行以上python程序:
[blctrl@main-machine python_dir]$ ./simple_instrument.py
Serving on TCP 24742
Terminate with Ctrl-C
新开一个命令行窗口,用nc命令进行测试:
[blctrl@main-machine ~]$ nc localhost 24742
*IDN? # 输入:查询设备名称
*IDN US-PAS Instrument, Yoyodyne Inc. -- An ACME Industries subsidiary, S/N:1313 # 应答
ON 0 # 输入:关闭设备
ON 0 # 应答
ON 1 # 输入:打开设备
ON 1 # 应答
VOLTS 5.0 # 输入:设置电压,范围-10~10V
VOLTS 5.0 # 应答
ON? # 输入:查询状态
ON 1 # 应答
VOLTS? # 输入:查询电压设定
VOLTS 5.0000 # 应答
CURR? # 输入:查询电流
CURR 6.079 # 应答
LOAD? # 输入:查询CPU负载
LOAD 0.03 0.13 0.16 # 应答