The architecture is:
Server side function:
1. receive file name and file length, create the file. create new thread to handle the data transfer
2. receive file data and write to file.
3. close file.
Client side function:
1. setup socket connection and send file name and file length
2. send file data
3. close socket and exit
About epoll:
Each packet could trigger epoll event. it means during the file data transfer, each data packet will trigger the epoll event.
Server side code file_server.py:
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import socket
import select
import os
import threading
def handle(fd,conn,epoll):
base_path = '/home/cdsvlab/log-dir'
print 'connected...'
pre_data = conn.recv(1024)
print pre_data
if len(pre_data) == 0:
print "zero data"
epoll.modify(fd,select.EPOLLHUP)
return False
#获取请求方法、文件名、文件大小
file_name,file_size = pre_data.split('|')
print file_name
print file_size
#print "file_size is %d" % int(file_size)
# 防止粘包,给客户端发送一个信号。
conn.sendall('nothing')
recv_size = 0
#上传文件路径拼接
file_dir = os.path.join(base_path,file_name)
f = file(file_dir,'wb')
Flag = True
while Flag:
#未上传完毕,
if int(file_size)>recv_size:
print "package ....recv_size %d" % recv_size
data = conn.recv(1024)
recv_size+=len(data)
#写入文件
if len(data)>0:
f.write(data)
#上传完毕,则退出循环
else:
print "file write done, recv_size = %d" % recv_size
recv_size = 0
Flag = False
print 'upload successed.'
f.close()
epoll.modify(fd,select.EPOLLHUP)
return True
class file_server:
def __init__(self):
self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_address = ("127.0.0.1", 9998)
self.epoll = select.epoll()
self.fd_to_socket = {}
self.fd_to_thread = {}
def listen(self):
self.serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.serversocket.bind(self.server_address)
self.serversocket.listen(10)
print "服务器启动成功,监听IP:" , self.server_address
def server_epoll(self):
flag_global = 0
#服务端设置非阻塞
self.serversocket.setblocking(False)
#超时时间
timeout = -1
self.epoll.register(self.serversocket.fileno(), select.EPOLLIN)
self.fd_to_socket[self.serversocket.fileno()]=self.serversocket
server_on = True
while server_on:
print "等待活动连接......"
events = self.epoll.poll()
if not events:
print "epoll超时无活动连接,重新轮询......"
continue
print "有" , len(events), "个新事件,开始处理......"
for fd, event in events:
print "new event %d of fd %d" % (event,fd)
event_socket = self.fd_to_socket[fd]
if event_socket == self.serversocket:
connection, address = self.serversocket.accept()
print "新连接:" , address
'''Attention: remove unblocking mode set on the accepted socket.
thread could block to wait for data'''
#connection.setblocking(False)
self.epoll.register(connection.fileno(), select.EPOLLIN)
self.fd_to_socket[connection.fileno()] = connection
self.fd_to_thread[connection.fileno()] = 0
elif event & select.EPOLLHUP:
print 'client close'
self.epoll.unregister(fd)
self.fd_to_socket[fd].close()
del self.fd_to_socket[fd]
self.fd_to_thread[fd] = 0
elif event & select.EPOLLIN:
#start to write file.
print "fd is %d,conn.fileno() is %d" % (fd,connection.fileno())
if self.fd_to_thread[fd] == 0:
working_thread=threading.Thread(target=handle,args=(fd,self.fd_to_socket[fd],self.epoll))
self.fd_to_thread[fd]=working_thread
working_thread.start()
self.fd_to_thread[fd] = 1
elif event & select.EPOLLOUT:
print "not support out"
def epoll_end(self):
self.epoll.unregister(self.serversocket.fileno())
self.epoll.close()
self.serversocket.close()
if __name__=='__main__':
auto_file_server=file_server()
auto_file_server.listen()
auto_file_server.server_epoll()
auto_file_server.epoll_end()
Client side code file_client.py:
#!/usr/bin/env python
#coding:utf-8
import socket
import sys
import os
class file_client:
def __init__(self):
self.ip_port = ('127.0.0.1',9998)
self.sk = socket.socket()
def open_connection(self):
self.sk.connect(self.ip_port)
def send_file(self,path):
# 根据路径获取文件名
try:
file_name = os.path.basename(path)
except:
print "wrong path %s" % path
# 获取文件大小
try:
file_size=os.stat(path).st_size
except:
print "wrong file size %s" % path
# 发送文件名 和 文件大小
self.sk.send(file_name+'|'+str(file_size))
# 为了防止粘包,将文件名和大小发送过去之后,等待服务端收到,直到从服务端接受一个信号(说明服务端已经收到)
self.sk.recv(1024)
send_size = 0
try:
f= file(path,'rb')
except:
print "open file %s failed" % path
Flag = True
while Flag:
print "file_size=%d sent_size=%d" % (file_size,send_size)
if send_size + 1024 >file_size:
data = f.read(file_size-send_size)
Flag = False
else:
data = f.read(1024)
send_size+=1024
self.sk.send(data)
print "send file done"
f.close()
def close_connection(self):
self.sk.close()
print "close socket"
if __name__=='__main__':
client = file_client()
client.open_connection()
input = raw_input('path:')
client.send_file(input)
client.close_connection()
This code could work but during the thread receiving data packets, these data packets still triggers epoll event as:
new event 1 of fd 6 // epoll log
package ....recv_size 4114432 // thread log
fd is 6,conn.fileno() is 6 // epoll log
package ....recv_size 4115456 // thread log
To avoid this, we could 1) unregister thread socket from epoll 2) not use epoll.
Here is the code of unregister thread socket from epoll:
file_server_threading.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import socket
import select
import os
import threading
def handle(fd,server):
base_path = '/home/cdsvlab/log-dir'
print 'connected...'
server.epoll.unregister(fd)
conn = server.fd_to_socket[fd]
while True:
print "fd %d is waiting for data" % fd
pre_data = conn.recv(1024)
print pre_data
if pre_data == "server_off":
break
#获取请求方法、文件名、文件大小
file_name,file_size = pre_data.split('|')
print file_name
print file_size
# 防止粘包,给客户端发送一个信号。
conn.sendall('nothing')
recv_size = 0
#上传文件路径拼接
file_dir = os.path.join(base_path,file_name)
f = file(file_dir,'wb')
Flag = True
while Flag:
#未上传完毕,
if int(file_size)>recv_size:
print "package ....recv_size %d" % recv_size
data = conn.recv(1024)
recv_size+=len(data)
#写入文件
if len(data)>0:
f.write(data)
#上传完毕,则退出循环
else:
print "file write done, recv_size = %d" % recv_size
recv_size = 0
Flag = False
print 'upload successed.'
f.close()
conn.close()
del server.fd_to_socket[fd]
print "fd %d is gone" % fd
class file_server:
def __init__(self):
self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_address = ("127.0.0.1", 9998)
self.epoll = select.epoll()
self.fd_to_socket = {}
self.fd_to_thread = {}
def listen(self):
self.serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.serversocket.bind(self.server_address)
self.serversocket.listen(10)
print "服务器启动成功,监听IP:" , self.server_address
def server_epoll(self):
flag_global = 0
#服务端设置非阻塞
self.serversocket.setblocking(False)
#超时时间
timeout = -1
self.epoll.register(self.serversocket.fileno(), select.EPOLLIN)
self.fd_to_socket[self.serversocket.fileno()]=self.serversocket
server_on = True
while server_on:
print "等待活动连接......"
events = self.epoll.poll()
if not events:
print "epoll超时无活动连接,重新轮询......"
continue
print "有" , len(events), "个新事件,开始处理......"
for fd, event in events:
print "new event %d of fd %d" % (event,fd)
event_socket = self.fd_to_socket[fd]
if event_socket == self.serversocket:
connection, address = self.serversocket.accept()
print "新连接:" , address
'''Attention: remove unblocking mode set on the accepted socket.
'''
#connection.setblocking(False)
self.epoll.register(connection.fileno(), select.EPOLLIN)
self.fd_to_socket[connection.fileno()] = connection
elif event & select.EPOLLHUP:
print 'client close'
self.epoll.unregister(fd)
self.fd_to_socket[fd].close()
del self.fd_to_socket[fd]
elif event & select.EPOLLIN:
#start to write file.
print "fd is %d,conn.fileno() is %d" % (fd,connection.fileno())
working_thread=threading.Thread(target=handle,args=(fd,self))
working_thread.start()
elif event & select.EPOLLOUT:
print "not support out"
def epoll_end(self):
self.epoll.unregister(self.serversocket.fileno())
self.epoll.close()
self.serversocket.close()
if __name__=='__main__':
auto_file_server=file_server()
auto_file_server.listen()
auto_file_server.server_epoll()
auto_file_server.epoll_end()
Client side code file_client_threading.py:
#!/usr/bin/env python
#coding:utf-8
import socket
import sys
import os
class file_client:
def __init__(self):
self.ip_port = ('127.0.0.1',9998)
self.sk = socket.socket()
def open_connection(self):
self.sk.connect(self.ip_port)
def send_file(self,path):
# 根据路径获取文件名
try:
file_name = os.path.basename(path)
except:
print "wrong path %s" % path
# 获取文件大小
try:
file_size=os.stat(path).st_size
except:
print "wrong file size %s" % path
# 发送文件名 和 文件大小
self.sk.send(file_name+'|'+str(file_size))
# 为了防止粘包,将文件名和大小发送过去之后,等待服务端收到,直到从服务端接受一个信号(说明服务端已经收到)
self.sk.recv(1024)
send_size = 0
try:
f= file(path,'rb')
except:
print "open file %s failed" % path
Flag = True
while Flag:
print "file_size=%d sent_size=%d" % (file_size,send_size)
if send_size + 1024 >file_size:
data = f.read(file_size-send_size)
Flag = False
else:
data = f.read(1024)
send_size+=1024
self.sk.send(data)
print "send file done"
f.close()
def close_connection(self):
self.sk.close()
print "close socket"
def close_server(self):
self.sk.send("server_off");
if __name__=='__main__':
client = file_client()
client.open_connection()
while True:
input = raw_input('path:')
if input == 'server_off':
client.close_server()
elif input == 'exit':
client.close_connection()
break;
else:
#client.open_connection()
client.send_file(input)
#client.close_connection()
Sure, we could remove epoll at all for this use case.