我测试了一下,我用客户端发送了一万个请求,如何服务器出现了这样一个错误
等待活动练级
Traceback (most recent call last):
File "/home/prty/PycharmProjects/python 789/.idea/io多路复用1.py", line 22, in <module>
readable,writeable,exception = select.select(inputs,outputs,inputs)
ValueError: filedescriptor out of range in select()
这意味着着,文件描述符号,fileobj 超过了最大监控
ubuntu,pycharm下用正则一匹配输出 接受地址: ('127.0.0.1', 34860),一共有1021结果,看了看其他人的描述,文件描述符select最多监控1024个,其实这没有错误,因为你查看文件描述符号,你会发现,他是从4开始的,或许是前面的文件描述符号都有用吧
这个错误很明确的说--就是select机制的原因,select监控数组,里面传的值长度是固定的,如果你碰见了这个错误,那一定是开的文件描述符号,要监控的服务太多的
如何处理 ----1 删除一些没有用的文件描述符号监控服务,也就是注销他们,空出一些文件描述符号
2 ----换用epoll poll(linux)或者其他的对文件描述符号没有限制的监听方式
pass : 这下面的代码是抄的,客户端是自己写的,不知道这下面的代码原创谁,已经被转载烂了,也就不介绍了
import socket
import select
import queue
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.setblocking(False)
server.bind(('127.0.0.1', 8001,))
server.listen(5)
# select轮循等待socket集合
inputs = [server]
#select轮循等待写socket的集合
outputs = []
message_queues = {}
# select超时时间
timeout = 29
while inputs:
print('等待活动练级')
# 即监控inputs可读可写,又监控异常
readable,writeable,exception = select.select(inputs,outputs,inputs)
for each in readable:
if each==server:
conn,addr = each.accept()
print("接受地址:",addr)
# 加入
inputs.append(conn)
# 为客户创建队列
message_queues[conn.fileno()] = queue.Queue()
else:
data = each.recv(1024)
if not data:
# 没有数据了就加入消息发送集合
if each in outputs:
# 没有数据就是断开了,断开了就不理你了
outputs.remove(each)
# 监控;列表也不理你了
inputs.remove(each)
# 进行大扫除卫生
del message_queues[each.fileno()]
each.close()
else:
# 他的信息放入他的队列
message_queues[conn.fileno()].put(data)
if each not in outputs:
# 判断是否可写,不重复加入
outputs.append(conn)
for each in writeable:
try:
# get_nowait()会报错
msg = message_queues[each.fileno()].get_nowait()
except queue.Empty:
print('了解',each.getpeername(),'消息队列为空')
outputs.remove(each)
else:
#print("他的数据原路返回",'---{}--'.format(msg.decode()),each.getpeername())
each.send(msg)
# 这里就不需要清楚了,,再来就直接激活这边,断开了那也好
for each in exception:
# print('异常连接: ',s.getpeername())
inputs.remove(socket)
if each in outputs:
outputs.remove(each.fileno())
del message_queues[each.fileno()]
each.close()
import socket
import select
import time
import queue
import os
class Connerror(ValueError):
pass
class Server(object):
def __init__(self):
self.server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
self.server.setblocking(False)
self.server_address = ('localhost',8001)
self.server.bind(self.server_address)
self.server.listen()
# 初始化容器
self.__vessel()
def __vessel(self):
# 初始化容器
self.inputs = [self.server, ]
self.outputs = []
self.message_queues = {}
def filedescriptor(self):
# 描述符满了时,即便 self.list 长度为1021时
for each in self.inputs:
if each==self.server:
# 这个不处理
continue
elif each in self.r_list and each in self.w_list:
# 跳过
continue
else:
each.close()
self.inputs.remove(each)
if each in self.outputs:
self.outputs.remove(each)
if each in self.message_queues:
del self.message_queues[each]
if each in self.r_list:
self.r_list.remove(each)
if each in self.w_list:
self.w_list.remove(each)
if each in self.e_list:
self.e_list.remove(each)
def run(self):
while self.inputs:
# print('服务开始')
try:
self.r_list, self.w_list, self.e_list = select.select(self.inputs, self.outputs, self.inputs)
except ValueError:
self.filedescriptor()
print(len(self.inputs)) # 1022
# print(len(self.outputs)) # 31
# print(len(self.message_queues)) # 1021
# print(len(self.r_list)) # 991
# print(len(self.w_list)) # 30
# print(len(self.e_list)) # 0
if not (self.r_list or self.w_list or self.e_list):
print('timeout')
elif self.r_list:
try:
self.run_r_list()
except BlockingIOError:
pass
elif self.w_list:
self.run_w_list()
else:
if len(self.inputs)==1021:
#我文件描述符号满了
print('文件描述符号满了')
self.filedescriptor()
# 这里 可以试试用于检测
def run_r_list(self):
# 有信息监听信息
for each in self.r_list:
if each==self.server:
conn,addr = each.accept()
print(addr)
conn.setblocking(False)
self.inputs.append(conn)
self.message_queues[conn] = queue.Queue()
# 服务端可以不必要直接把已经完成服务的地址取出来,完全没有必要
else:
try:
data = each.recv(1024)
except:
print('closing',each)
if s in self.outputs:
self.outputs.remove(s)
s.close()
# 这是错误处理,对于客户端连接断开
del self.message_queues[each]
else:
if data:
# print(' received', data, 'from', each.getpeername())
self.message_queues[each].put(data)
# 如果没有在output 则增加到outputs
if each not in self.outputs:
self.outputs.append(each)
def run_w_list(self):
# 遍历可写列表
for each in self.w_list:
if each ==self.server:
raise Exception("服务器怎么变成为可读")
try:
next_msg = self.message_queues[each].get_nowait()
except queue.Empty:
self.outputs.remove(each)
else:
# 信息原路返回
each.send(next_msg)
def run_e_list(self):
# 处理错误列表
for each in self.e_list:
print('exception condition on', s.getpeername())
# 出现错误就彻底移除
self.inputs.remove(each)
if each in self.outputs:
self.outputs.remove(each)
each.close()
# 清楚队列列表
del self.message_queues[each]
if __name__ == '__main__':
s =Server()
s.run()
# 这是一个对select只能监听有限个数文件描述符的处理,我这里选择的是清楚一部分无需监听的客户端,用客户端测试了下,2021的并发,基本能够接受,但有些不稳定,建议还是采用epoll,kqueue,等等