基于线程开发一个FTP服务器,要点分析

用Python开发一个基于线程的FTP服务器,主要功能如下:

-  1. 用户加密认证
-  2. 允许同时多用户登录
-  3. 每个用户有自己的家目录 ,且只能访问自己的家目录
-  4. 对用户进行磁盘配额,每个用户的可用空间不同
-  5. 允许用户在ftp server上随意切换目录
-  6. 允许用户查看当前目录下文件
-  7. 允许上传和下载文件,保证文件一致性(md5)
-  8. 文件传输过程中显示进度条
-  9. 附加功能:支持文件的断点续传
-   10. 在之前开发的FTP基础上,开发支持多并发的功能 
-  11. 不能使用SocketServer模块,必须自己实现多线程 
-  12. 必须用到队列Queue模块,实现线程池 
-  13. 允许配置最大并发数,比如允许只有 10 个并发用户
 

1,用户加密认证

1
2
3
4
  这个肯定需要用到configparser 和hashlib模块,用md5进行加密,服务端与用户端
进行交互前,肯定需要进行认证,在服务端进行认证,客户端需要发送用户名及密码,但
是为了安全起见,服务端数据库中的密码应该是加密后的密文,客户端登陆认证时也应该
发送密文到服务端,服务端接受到密文与数据库中对应的密文进行比较。

  

2,查看自己的当前目录下的文件

1
2
  这个只需要写一个 dir 就ok
简单的说,使用configparse模块就可以完成

  

3,文件传输中显示进度条

1
2
3
下载的进度条比较好实现,我们可以从服务端受到将要下载的文件的大小,
 
上传的进度条,我们可以利用文件操作的tell()方法,获取当前指针位置(字节)

4,在之前的基础上实现多并发的功能

1
2
3
4
5
6
     并发,是伪并行,,即看起来是同时运行的,单个CPU + 多道技术就可以实现并发
 
     多道技术概念回顾:内存中同时存入多道(多个)程序,cpu从一个进程快速切换
到另外一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu
只能执行一个任务,但在 1 秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,
即伪并发,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)

5,不能使用SocketServer模块,必须自己实现多线程 

1、服务器启动socket监听端口
2、服务器内部利用while循环监视句柄的变化
3、客户端请求
4、服务器为这个请求分配线程或进程(底层调用select)。
  SocketServer模块有两个方法ThreadingTCPServerForkingTCPServer,分别创建线程或者进程

ThreadingTCPServer源码分析(ForkingTCPServer方法类似

类继承关系图

服务器启动程序后:
1、执行 TCPServer.__init__ 方法,创建服务端Socket对象并绑定 IP 和 端口
     解释:class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass,ThreadingTCPServer先要执行__init__构造函数,ThreadingMixIn里没有,就去TCPServer类里去找
2、 执行 BaseServer.__init__ 方法,将 自定义的继承自SocketServer.BaseRequestHandler 的类 MyRequestHandle赋值给self.RequestHandlerClass
    解释:TCPServer构造方法中包含BaseServer.__init__(self, server_address, RequestHandlerClass),所以要把自己定义的类传给BaseServer的构造方法
3、执行 BaseServer.server_forever 方法,While 循环一直监听是否有客户端请求到达 ...
    解释:serve_forever是BaseServer类中的方法,里面有个while循环,一直调用select.select(),r, w, e =_eintr_retry(select.select, [self], [], [],poll_interval)
客户端接入:
4、执行 BaseServer._handle_request_noblock方法
    解释:serve_forever的while循环里有一个判断if self in r:self._handle_request_noblock(),客户端连接句柄发生变化就会把句柄放到r列表里,所以,触发了_handle_request_noblock()。
5、执行 ThreadingMixIn.process_request 方法,创建一个 “线程” 用来处理请求
    解释:调用process_request方法时,从继承类广度优先原则,所以它先调用ThreadingMixIn类中的process_request
6、执行 ThreadingMixIn.process_request_thread 方法
    解释:t = threading.Thread(target = self.process_request_thread,args = (request, client_address))多线程模块方法,调用self.process_request_thread,此时才真正启动了线程。
7、执行 BaseServer.finish_request 方法,执行  self.RequestHandlerClass()  即:执行 自定义 MyRequestHandler 的构造方法(自动调用基类BaseRequestHandler的构造方法,在该构造方法中又会调用自定义的MyRequestHandler的 handle方法)
    解释:连接创建完成,此时开始执行handle方法中的内容,开始和客户端交互,执行完,后面再执行shutdown_request方法关闭连接。

源码精简

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import  socket
import  threading
import  select
def  process(request, client_address):
     print  request,client_address
     conn =  request
     conn.sendall( '欢迎致电 10086,请输入1xxx,0转人工服务.' )
     flag =  True
     while  flag:
         data =  conn.recv( 1024 )
         if  data = =  'exit' :
             flag =  False
         elif  data = =  '0' :
             conn.sendall( '通过可能会被录音.balabala一大推' )
         else :
             conn.sendall( '请重新输入.' )
sk =  socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(( '127.0.0.1' , 8002 ))
sk.listen( 5 )
while  True :
     r, w, e =  select.select([sk,],[],[], 1 )
     print  'looping'
     if  sk in  r:
         print  'get request'
         request, client_address =  sk.accept()
         t =  threading.Thread(target = process, args = (request, client_address))
         t.daemon =  False
         t.start()
sk.close()

6,必须用到队列Queue模块,实现线程池

Python标准模块-concurrent.futures:https://docs.python.org/dev/library/concurrent.futures.html

1 -1介绍

 
1
2
3
4
5
6
7
concurrent.futures模块提供了高度封装的异步调用接口
 
ThreadPoolExecutor:线程池,提供异步调用
 
ProcessPoolExecutor: 进程池,提供异步调用
 
Both implement the same interface, which  is  defined by the abstract Executor  class .

1-2 基本方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
submit(fn,  * args,  * * kwargs)
异步提交任务
 
map (func,  * iterables, timeout = None , chunksize = 1 )
取代 for 循环submit的操作
 
shutdown(wait = True )
相当于进程池的pool.close() + pool.join()操作
 
wait = True ,等待池内所有任务执行完毕回收完资源后才继续
 
wait = False ,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和 map 必须在shutdown之前
 
result(timeout = None )
取得结果
 
add_done_callback(fn)
回调函数

2,进程池

介绍:

1
2
3
4
5
6
7
The ProcessPoolExecutor  class  is  an Executor subclass that uses a pool of
processes to execute calls asynchronously. ProcessPoolExecutor uses the
multiprocessing module, which allows it to side - step the Global Interpreter
  Lock but also means that only picklable objects can be executed  and  returned.
 
class  concurrent.futures.ProcessPoolExecutor(max_workers = None , mp_context = None )
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers  is  None  or  not  given, it will default to the number of processors on the machine. If max_workers  is  lower  or  equal to  0 , then a ValueError will be raised.

用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from  concurrent.futures  import  ThreadPoolExecutor,ProcessPoolExecutor
 
import  os,time,random
def  task(n):
     print ( '%s is runing'  % os.getpid())
     time.sleep(random.randint( 1 , 3 ))
     return  n * * 2
 
if  __name__  = =  '__main__' :
 
     executor = ProcessPoolExecutor(max_workers = 3 )
 
     futures = []
     for  in  range ( 11 ):
         future = executor.submit(task,i)
         futures.append(future)
     executor.shutdown( True )
     print ( '+++>' )
     for  future  in  futures:
         print (future.result())

3,线程池

介绍:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ThreadPoolExecutor  is  an Executor subclass that uses a pool of threads
  to execute calls asynchronously.
class  concurrent.futures.ThreadPoolExecutor(max_workers = None , thread_
name_prefix = '')
An Executor subclass that uses a pool of at most max_workers threads to
execute calls asynchronously.
 
Changed  in  version  3.5 : If max_workers  is  None  or  not  given, it will default
to the number of processors on the machine, multiplied by  5 , assuming that
ThreadPoolExecutor  is  often used to overlap I / O instead of CPU work  and  the
  number of workers should be higher than the number of workers  for  ProcessPoolExecutor.
 
New  in  version  3.6 : The thread_name_prefix argument was added to allow
  users to control the threading.Thread names  for  worker threads created by
  the pool  for  easier debugging.

  

用法:

1
把ProcessPoolExecutor换成ThreadPoolExecutor,其余用法全部相同

猜你喜欢

转载自www.cnblogs.com/peterhuang1977/p/9572689.html