ValueError: filedescriptor out of range in select()

我测试了一下,我用客户端发送了一万个请求,如何服务器出现了这样一个错误
等待活动练级
Traceback (most recent call last):
  File "/home/prty/PycharmProjects/python 789/.idea/io多路复用1.py", line 22, in <module>
    readable,writeable,exception = select.select(inputs,outputs,inputs)
ValueError: filedescriptor out of range in select()
这意味着着,文件描述符号,fileobj 超过了最大监控
ubuntu,pycharm下用正则一匹配输出 接受地址: ('127.0.0.1', 34860),一共有1021结果,看了看其他人的描述,文件描述符select最多监控1024个,其实这没有错误,因为你查看文件描述符号,你会发现,他是从4开始的,或许是前面的文件描述符号都有用吧

这个错误很明确的说--就是select机制的原因,select监控数组,里面传的值长度是固定的,如果你碰见了这个错误,那一定是开的文件描述符号,要监控的服务太多的
如何处理 ----1 删除一些没有用的文件描述符号监控服务,也就是注销他们,空出一些文件描述符号
2 ----换用epoll poll(linux)或者其他的对文件描述符号没有限制的监听方式

pass : 这下面的代码是抄的,客户端是自己写的,不知道这下面的代码原创谁,已经被转载烂了,也就不介绍了

import socket
import select
import queue

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.setblocking(False)
server.bind(('127.0.0.1', 8001,))
server.listen(5)

# select轮循等待socket集合
inputs = [server]
#select轮循等待写socket的集合
outputs = []
message_queues = {}
# select超时时间
timeout = 29

while inputs:
    print('等待活动练级')
    # 即监控inputs可读可写,又监控异常
    readable,writeable,exception = select.select(inputs,outputs,inputs)
    for each in readable:
        if each==server:
            conn,addr = each.accept()
            print("接受地址:",addr)
            # 加入
            inputs.append(conn)
            # 为客户创建队列
            message_queues[conn.fileno()] = queue.Queue()
        else:
            data = each.recv(1024)

            if not data:
                # 没有数据了就加入消息发送集合
                if each in outputs:
                    # 没有数据就是断开了,断开了就不理你了
                    outputs.remove(each)
                # 监控;列表也不理你了
                inputs.remove(each)
                # 进行大扫除卫生
                del message_queues[each.fileno()]
                each.close()
            else:
                # 他的信息放入他的队列
                message_queues[conn.fileno()].put(data)
                if each not in outputs:
                    # 判断是否可写,不重复加入
                    outputs.append(conn)

    for each in writeable:
        try:
            # get_nowait()会报错
            msg = message_queues[each.fileno()].get_nowait()
        except queue.Empty:
            print('了解',each.getpeername(),'消息队列为空')
            outputs.remove(each)
        else:
            #print("他的数据原路返回",'---{}--'.format(msg.decode()),each.getpeername())
            each.send(msg)
            # 这里就不需要清楚了,,再来就直接激活这边,断开了那也好

    for each in exception:
       # print('异常连接: ',s.getpeername())
        inputs.remove(socket)
        if each in outputs:
            outputs.remove(each.fileno())
        del message_queues[each.fileno()]
        each.close()

import socket
import select
import time
import queue
import os


class Connerror(ValueError):
    pass

class Server(object):

    def __init__(self):
        self.server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

        self.server.setblocking(False)
        self.server_address = ('localhost',8001)

        self.server.bind(self.server_address)
        self.server.listen()
        # 初始化容器
        self.__vessel()

    def __vessel(self):
        # 初始化容器
        self.inputs = [self.server, ]
        self.outputs = []
        self.message_queues = {}

    def filedescriptor(self):
        # 描述符满了时,即便 self.list 长度为1021时
        for each in self.inputs:
            if each==self.server:
                # 这个不处理
                continue
            elif each in self.r_list and each in self.w_list:
                # 跳过
                continue
            else:
                each.close()
                self.inputs.remove(each)
                if each in self.outputs:
                    self.outputs.remove(each)
                if each in self.message_queues:
                    del self.message_queues[each]
                if each in self.r_list:
                    self.r_list.remove(each)
                if each in self.w_list:
                    self.w_list.remove(each)
                if each in self.e_list:
                    self.e_list.remove(each)

    def run(self):
        while self.inputs:
            # print('服务开始')
            try:
                self.r_list, self.w_list, self.e_list = select.select(self.inputs, self.outputs, self.inputs)
            except ValueError:
                self.filedescriptor()
                print(len(self.inputs)) # 1022
                # print(len(self.outputs)) # 31
                # print(len(self.message_queues)) # 1021
                # print(len(self.r_list)) # 991
                # print(len(self.w_list)) # 30
                # print(len(self.e_list)) # 0
            if not (self.r_list or self.w_list or self.e_list):
                print('timeout')
            elif self.r_list:
                try:
                    self.run_r_list()
                except BlockingIOError:
                    pass
            elif self.w_list:
                self.run_w_list()
            else:
                if len(self.inputs)==1021:
                    #我文件描述符号满了
                    print('文件描述符号满了')
                    self.filedescriptor()
                # 这里 可以试试用于检测


    def run_r_list(self):
        # 有信息监听信息
        for each in self.r_list:
            if each==self.server:
                conn,addr = each.accept()
                print(addr)
                conn.setblocking(False)
                self.inputs.append(conn)
                self.message_queues[conn] = queue.Queue()
                # 服务端可以不必要直接把已经完成服务的地址取出来,完全没有必要
            else:
                try:
                    data = each.recv(1024)
                except:
                    print('closing',each)
                    if s in self.outputs:
                        self.outputs.remove(s)
                        s.close()
                        # 这是错误处理,对于客户端连接断开
                        del self.message_queues[each]
                else:
                    if data:
                        # print(' received', data, 'from', each.getpeername())
                        self.message_queues[each].put(data)
                        # 如果没有在output 则增加到outputs
                        if each not in self.outputs:
                            self.outputs.append(each)

    def run_w_list(self):
        # 遍历可写列表
        for each in self.w_list:
            if each ==self.server:
                raise Exception("服务器怎么变成为可读")
            try:
                next_msg = self.message_queues[each].get_nowait()
            except queue.Empty:
                self.outputs.remove(each)
            else:
                # 信息原路返回
                each.send(next_msg)


    def run_e_list(self):
        # 处理错误列表
        for each in self.e_list:
            print('exception condition on', s.getpeername())
            # 出现错误就彻底移除
            self.inputs.remove(each)
            if each in self.outputs:
                self.outputs.remove(each)
            each.close()
            # 清楚队列列表
            del self.message_queues[each]


if __name__ == '__main__':
    s =Server()
    s.run()

# 这是一个对select只能监听有限个数文件描述符的处理,我这里选择的是清楚一部分无需监听的客户端,用客户端测试了下,2021的并发,基本能够接受,但有些不稳定,建议还是采用epoll,kqueue,等等

猜你喜欢

转载自blog.csdn.net/qq_42395490/article/details/82346158