主程序
import socket
import re
import multiprocessing
import mini_web
class WebServer(object):
"""这个服务器类"""
def __init__(self):
"""初始化tcp服务器"""
self.tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.tcp_server_socket.bind(("", 8866))
self.tcp_server_socket.listen(128)
def service_client(self, new_socket):
"""为这个客户端返回数据"""
request = new_socket.recv(1024).decode("utf-8")
request_lines = request.splitlines()
print("")
print(">" * 20)
print(request_lines)
file_name = ""
ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])
if ret:
file_name = ret.group(1)
if file_name == "/":
file_name = "/index.html"
if file_name.endswith(".html"):
url_params = dict()
url_params['file_name'] = file_name
body = mini_web.application(url_params, self.head_params)
head = "HTTP/1.1 %s\r\n" % self.stauts
for temp in self.params:
head += "%s:%s\r\n" % temp
content = head + "\r\n" + body
new_socket.send(content.encode("utf-8"))
else:
try:
f = open("./static" + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "------file not found-----"
new_socket.send(response.encode("utf-8"))
print("文件找不到!")
else:
html_content = f.read()
f.close()
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
new_socket.close()
def run_server(self):
"""用来完成整体的控制"""
while True:
new_socket, client_addr = self.tcp_server_socket.accept()
p = multiprocessing.Process(target=self.service_client, args=(new_socket,))
p.start()
new_socket.close()
tcp_server_socket.close()
def head_params(self, stauts, params):
""" 把响应头存起来进行后期的拼接"""
self.stauts = stauts
self.params = params
def main():
server = WebServer()
server.run_server()
if __name__ == "__main__":
main()
webserver 仿照蓝图实现
"""
mini_web.py
"""
import re
from contextlib import contextmanager
from urllib.request import unquote
from pymysql import connect
url_dict = dict()
def application(environ, start_response):
"""返回具体展示的界面给服务器"""
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
file_name = environ['file_name']
for key, value in url_dict.items():
match = re.match(key, file_name)
if match:
return value(match)
else:
return "not page is find!"
def route(url_address):
"""主要的目的自动添加路径跟匹配的函数到我们的url字典中"""
def set_fun(func):
def call_fun(*args,**kwargs):
return func(*args,**kwargs)
url_dict[url_address] = call_fun
return call_fun
return set_fun
@contextmanager
def mysql_action():
"""对mysql进行操作"""
conn = connect(host='localhost', port=3306, user='root', password='mysql', database='stock_db', charset='utf8')
cs1 = conn.cursor()
yield cs1
conn.commit()
cs1.close()
conn.close()
@route(r'/(index).html')
def index(match):
with open('./templates/index.html') as f:
index_content = f.read()
row_str = """ <tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>
<input type="button" value="添加" id="toAdd" name="toAdd" systemidvaule="%s">
</td>
</tr> """
with mysql_action() as cs:
sql = "select id,code,short,chg,turnover,price,highs,time from info"
cs.execute(sql)
table_data = cs.fetchall()
table_str=""
for i,temp in enumerate(table_data):
table_str+=row_str%(i+1,temp[1],temp[2],temp[3],temp[4],temp[5],temp[6],temp[7],temp[1])
index_content = re.sub('\{%content%\}',table_str,index_content)
return index_content
@route(r'/center.html')
def center(match):
with open('./templates/center.html') as f:
center_content = f.read()
with mysql_action() as cs:
sql = 'select i.code, i.short,i.chg,i.turnover,i.price,i.highs,f.note_info from focus as f inner join info as i on f.info_id = i.id;'
cs.execute(sql)
table_data = cs.fetchall()
row_str = """ <tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>
<a type="button" class="btn btn-default btn-xs" href="/update/%s.html"> <span class="glyphicon glyphicon-star" aria-hidden="true"></span> 修改 </a>
</td>
<td>
<input type="button" value="删除" id="toDel" name="toDel" systemidvaule="%s">
</td>
</tr>"""
table_str = ''
for temp in table_data:
table_str += row_str%(temp[0],temp[1],temp[2],temp[3],temp[4],temp[5],temp[6],temp[0],temp[0])
center_content = re.sub(r'\{%content%\}',table_str,center_content)
return center_content
@route(r'/add/(\d+).html')
def insert(match):
code = match.group(1)
with mysql_action() as cs:
sql = 'select * from focus where info_id in (select id from info where code=%s);'
cs.execute(sql,(code,))
has_flag = cs.fetchone()
print(has_flag)
if has_flag:
return '个人中心已存在此股票'
else:
sql1 = 'insert into focus values (select id from info where code = %s)'
cs.execute(sql1,(code,))
return '添加成功'
@route(r'/update/(\d+).html')
def update(match):
code = match.group(1)
with open('./templates/update.html') as f:
updata_str = f.read()
updata_str = re.sub('\{%code%\}',code,updata_str)
with mysql_action() as cs:
sql = 'select f.note_info from focus as f INNER join info as i on f.info_id=i.id WHERE i.code=%s'
cs.execute(sql,(code,))
row_data = cs.fetchone()
note_info = row_data[0]
if row_data[0] is None:
note_info = ''
updata_str = re.sub('\{%note_info%\}',note_info,updata_str)
return updata_str
@route(r'/del/(\d+).html')
def delete(match):
code = match.group(1)
with mysql_action() as cs:
sql = 'delete from focus WHERE info_id= (SELECT id from info WHERE code=%s)'
cs.execute(sql,(code,))
return "删除成功"
@route(r'/update/(\d+)/(.*).html')
def updatenote(match):
code = match.group(1)
note_info = match.group(2)
with mysql_action() as cs:
sql = 'update focus set note_info = %s where info_id in (select id from info where code = %s);'
cs.execute(sql,(unquote(note_info),code))
return '更新成功'