lesson19-改进聊天室
#视频1-改进聊天室
//19改进聊天室server1.py:
1 from multiprocessing import Pool, cpu_count 2 from multiprocessing.pool import ThreadPool 3 from multiprocessing import Manager 4 import socket 5 from datetime import datetime 6 7 #发消息给所有连接的客户端 8 def send_data(proxy_dict, proxy_queue): 9 while True: 10 data = proxy_queue.get() #从队列中拿要发给客户的消息 11 12 print('即将发送的数据:', data) 13 for conn in proxy_dict.values(): 14 conn.send(data) 15 16 17 18 def worker_thread(conn,username, proxy_queue): 19 while True: 20 recv_data = conn.recv(1024) 21 if recv_data: 22 time = datetime.now().strftime("%Y/%m/%d %H:%M:%S") 23 data = "{name} {time} \n {data}".format(name=username, time=time, data = recv_data.decode()) 24 print(data) 25 proxy_queue.put(data.encode()) 26 else: 27 conn.close() 28 data = "{}退出".format(username).encode() 29 proxy_queue.put(data) 30 break 31 32 33 def worker_process(server, proxy_dict, proxy_queue): 34 thread_pool = ThreadPool( cpu_count()*2 ) #通常分配2倍CPU个数的线程 35 while True: 36 conn, _ = server.accept() #生成对等套接字 37 38 username = conn.recv(1024).decode() #客户吧名字发过来了 39 proxy_dict[username] = conn 40 41 data = "{}登录".format(username).encode() 42 43 proxy_queue.put(data) #把数据丢到队列中去 44 45 thread_pool.apply_async(worker_thread, args=(conn,username, proxy_queue)) 46 #异步提交 47 48 49 if __name__ == '__main__': 50 51 server = socket.socket() 52 server.bind(('127.0.0.1', 8080)) 53 server.listen(1000) 54 55 mgr = Manager() 56 proxy_dict = mgr.dict() #保存连接的客户端, 名字当做键,对等套接字当做值 57 proxy_queue = mgr.Queue() #消息队列 58 59 n = cpu_count() 60 process_pool = Pool(n) 61 62 63 64 for i in range(n-1): 65 process_pool.apply_async(worker_process, args=(server, proxy_dict, proxy_queue)) 66 process_pool.apply_async(send_data, args=(proxy_dict, proxy_queue)) 67 68 69 process_pool.close() 70 process_pool.join() 71 72
------------------------------------------------------------------------------------
#考虑名字重名-改进
//19改进聊天室server2:
1 from multiprocessing import Pool, cpu_count,Manager 2 from multiprocessing.pool import ThreadPool 3 import socket 4 from datetime import datetime 5 #程序里防止重名 6 #从队列中拿出数据,发给所有连接上的客户端 7 def send_data(dict_proxy, queue_proxy): 8 while True: 9 data = queue_proxy.get() 10 print(data.decode()) 11 for conn in dict_proxy.values(): 12 conn.send(data) 13 14 15 16 def worker_thread(connection, addr, dict_proxy, queue_proxy): 17 while True: 18 try: 19 recv_data = connection.recv(1024) 20 if recv_data: 21 time = datetime.now().strftime("%Y/%m/%d %H:%M:%S") 22 data = "{addr} {time} \n \t{data}".format(addr=addr, time = time, data=recv_data.decode()) 23 queue_proxy.put(data.encode()) #把消息添加到到队列中 24 else: 25 raise Exception 26 except: 27 dict_proxy.pop(addr) #从字典中删掉退出的客户端 28 data = '{}退出'.format(addr) 29 queue_proxy.put(data.encode()) #把退出消息添加到队列中 30 connection.close() 31 break 32 33 34 35 def login(username,conncetion, thread_pool, dict_proxy, queue_proxy ): 36 dict_proxy.setdefault(username, conncetion) # 把套接字加入字典中 37 38 conncetion.send("恭喜你,登陆成功".encode()) 39 40 data = '{}登录'.format(username) 41 queue_proxy.put(data.encode()) # 将用户登录消息添加到队列中 42 thread_pool.apply_async(worker_thread, args=(conncetion, username, dict_proxy, queue_proxy)) 43 44 45 def login_try(conncetion,thread_pool, dict_proxy,queue_proxy, data): 46 conncetion.send(data) 47 username = conncetion.recv(1024).decode() 48 if username not in dict_proxy: 49 login(username, conncetion, thread_pool, dict_proxy, queue_proxy) 50 else: 51 data = "用户名已被使用,请重新输入!".encode() 52 login_try(conncetion, thread_pool, dict_proxy, queue_proxy, data) 53 54 55 def worker_process(server, dict_proxy, queue_proxy): 56 thread_pool = ThreadPool( cpu_count()*2 ) 57 while True: 58 conncetion, remote_address = server.accept() 59 data = "请输入用户名!".encode() 60 login_try(conncetion, thread_pool, dict_proxy, queue_proxy, data) 61 62 63 if __name__ == '__main__': 64 65 server = socket.socket() 66 server.bind(('127.0.0.1', 8080)) 67 server.listen(1000) 68 69 mgr = Manager() 70 dict_proxy = mgr.dict() #用来保存连接上来的客户端, 71 queue_proxy = mgr.Queue() #把客户端发过来的消息通过队列传递 72 73 n = cpu_count() #打印当前电脑的cpu核数 74 process_pool = Pool(n) 75 for i in range(n-1): #充分利用CPU,为每一个CPU分配一个进程 76 process_pool.apply_async(worker_process, args=(server, dict_proxy, queue_proxy)) #把server丢到两个进程里面 77 78 process_pool.apply_async(send_data, args=(dict_proxy, queue_proxy)) #用一个进程去收发消息 79 80 process_pool.close() 81 process_pool.join()
------------------------------------------------------------------------------------
//19改进聊天室client3.py
1 import socket 2 import threading 3 4 5 client = socket.socket() 6 client.connect(('127.0.0.1', 8080)) 7 8 def recv_data(): 9 while True: 10 data = client.recv(1024) 11 print(data.decode()) 12 13 14 username = input("输入你的用户名:") 15 client.send(username.encode()) 16 17 thread = threading.Thread(target=recv_data, daemon=True) 18 thread.start() 19 20 21 while True: 22 send_data = input('') 23 if send_data == 'quit': 24 print('自quit断开连接') 25 client.close() 26 break 27 else: 28 client.send(send_data.encode()) 29
#运行:
server:
小明登录
小李登录
小李 2019/04/07 21:51:15
你好
小明 2019/04/07 21:51:20
重中之重
小李退出
小明退出
client1:
输入你的用户名:小明
小明
请输入用户名!
恭喜你,登陆成功
小明登录
小李登录
小李 2019/04/07 21:51:15
你好
重中之重
重中之重
小明 2019/04/07 21:51:20
重中之重
小李退出
quit
quit
自quit断开连接
client2:
输入你的用户名:小李
小李
请输入用户名!
恭喜你,登陆成功
小李登录
你好
你好
小李 2019/04/07 21:51:15
你好
小明 2019/04/07 21:51:20
重中之重
quit
quit
自quit断开连接
--------------------------------------------------------
1小时40分钟:
然后考虑加入mysql,登陆数据连接
命令回溯,事务
#改进想法
1,加入数据库连接
2,改成面向对象,class
3,加入日志
4,多文件
1 class ChattingRoome: 2 def __init__(self,proxy_dict): 3 self.proxy_dict = proxy_dict 4 5 def send_data(): 6 self.func() 7 8 def func(self): 9 self.send_data()