flask框架第三方组件
flask-session
第三方session,将session存入本地数据库中。
需要导入原session,原理是替换原session中open-session函数,相当于重写内容session
from flask import Flask,session
from flask_session import Session
from redis import Redis
app = Flask(__name__)
app.secret_key = "helloworld"
Session(app)
app.config["SESSION_TYPE"]="redis" #设置数据库类型
app.config["SESSION_REDIS"] = Redis(host="127.0.0.1",port=6379,db=1) #设置数据库属性
@app.route('/')
def hello_world():
if not session.get("IS_LOGINED"):
return "没有权限访问"
return 'Hello World!'
class Login(views.MethodView):
def get(self):
return render_template("login.html")
def post(self):
user = request.form.get("username")
passwd = request.form.get("password")
print(user)
print(passwd)
if user == "alex" and passwd == "123":
session["IS_LOGINED"] = "True"
return "登录成功"
else:
msg = "用户名或密码错误"
return render_template("login.html",msg=msg)
app.add_url_rule("/login",endpoint="login",view_func=Login.as_view(name="login"))
使用方法同原生session一样
flask-session现在在cookie存放的session不再是数据,而是uuid
在redis中数据格式为
key:前缀+uuid
value:session键值对
redis数据库一般存放在内网服务器中,数据存放在内存中
WTForms
相当于modelsForm,类似于ORM
register_forms.py
from wtforms.fields import simple,core
from wtforms import Form
from wtforms import validators
class RegisterForm(Form):
username = simple.StringField(
label="用户名",
validators=[
validators.DataRequired(message="数据不能为空"),
validators.Length(min=4,max=16,message="大于4位小于16位")
],
render_kw={"class":"form-control"} # 生成标签时,添加class
)
password = simple.PasswordField(
label="密码",
validators = [
validators.DataRequired(message="密码不能为空"),
validators.Length(min=4,max=16,message="大于4位小于16位"),
validators.Regexp(regex="\w+",message="密码必须为数字,字母,下划线")
],
render_kw={"class":"form-control"}
)
repassword = simple.PasswordField(
label="再次输入密码",
validators = [
validators.DataRequired(message="密码不能为空"),
validators.Length(min=4,max=16,message="大于4位小于16位"),
validators.Regexp(regex="\w+",message="密码必须为数字,字母,下划线"),
validators.equal_to("password",message="两次密码不一致")
],
render_kw={"class":"form-control"}
)
gender = core.RadioField(
label="性别",
choices=(
(1,"男"),
(2,"女"),
),
coerce=int, # 获取数据时,获取的是前面的数字
default=1 # 默认选中值
)
- 类需要继承Form
- 简单标签用simple,复杂标签用core
- validators用来校验字符串
校验方法
from register_forms import RegisterForm
class Register(views.MethodView):
def get(self):
regfm = RegisterForm() # 实例化
return render_template("register.html",fm=regfm)
def post(self):
regfm = RegisterForm(request.form)
print(request.form)
if not regfm.validate(): # 校验字符串
return render_template("register.html",fm=regfm)
#验证成功,则写入数据库
mh = mysqlHandler()
student_id = mh.get_student_id()+1
student_info = [student_id,]
for k,value in request.form.items():
if k == "repassword":
continue
student_info.append(value)
print(student_info)
mh.write(student_info)
return "ok"
app.add_url_rule("/register",endpoint="register",view_func=Register.as_view(name="register"))
register.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>注册</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
</head>
<body>
<div class="container">
<h1>注册</h1>
<form action="" method="post">
{% for field in fm %}
<div class="form-group">
{{ field.label }} {{ field }} {{ field.errors.0 }}
</div>
{% endfor %}
<button type="submit" class="btn btn-primary">登录</button>
</form>
</div>
</body>
</html>
{{ field.label }} 显示label标签
{{ field }} 显示相应标签
{{ field.errors.0 }} 如果有错误信息,显示错误信息
DBUtils数据库连接池
数据库创建链接后,保持一定量的链接数不变,节省链接时间
sql_help.py
import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, #使用数据库的模块
maxconnections=6, #最大连接数
mincached=2, # 初始化时,连接池中最少的空闲链接,0表示不创建
maxcached=5, #连接池中最多闲置的链接,0和None表示不限制
maxshared=3, #连接池中最多共享的链接数,0和None表示全部共享,无用因为无论怎么设置都为全部共享
blocking=True, #连接池中如果没有可用的链接后是否阻塞等待 True or False
maxusage=None, #一个链接最多的重复使用次数,None表示无限制
setsession=[], #开始会话前执行的命令列表
ping=0,
#ping mysql服务器,检查服务是否可用
#0 永远不ping 1 创建数据库链接前 2创建cursor时 4 执行execute时 7任何时候
host="127.0.0.1",
port=3306,
user="root",
password='123',
database='students_infov2',
charset='utf8'
)
if __name__ == "__main__":
conn = POOL.connection() # 从连接池拿出链接
cursor = conn.cursor(pymysql.cursors.DictCursor)
# DictCursor将查询的东西字典化
print("连接成功")
res = cursor.execute("select * from student_info")
print(cursor.fetchall())
cursor.close()
conn.close() # 将连接放回连接池
websocket
websocket同时是用来做即时通讯(IM)
需要安装gevent-websocket
轮询:不断向服务器发起询问,服务器还不断回复,浪费前端、后端、带宽,保证数据的实时性
长轮询:
方法一:客户端向服务器发起消息,服务端轮询,有数据则放到另外一个地方,客户端去另外一个地方去拿
方法二:服务器轮询,放到另外一个地方,直接推给客户端,优点,释放客户端资源,释放客户端资源,服务压力不可避免,节省带宽资源,数据不能实时性
方法三:客户端和服务端开启一个连接不断开,有消息直接推送过去,彻底解决实时性,解决带宽占用问题,解决资源,客户端和服务器都需要开启多线程
flask默认使用werkzug处理请求(app.run())
websocket使用wsgi处理请求(经过websocket处理后,将wsgi.websocket对象加入environ,再由app处理)
群聊
web_socket.py
from flask import Flask,request,render_template
from geventwebsocket.websocket import WebSocket
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler
import json
app = Flask(__name__)
@app.route('/')
def hello_world():
return render_template("index.html")
user_socket_list = []
@app.route("/ws/<username>")
def ws(username):
# 获取请求参数中的,websocket套接字
user_socket = request.environ.get('wsgi.websocket') # type:WebSocket
print(user_socket)
if user_socket:
user_socket_list.append(user_socket)
else:
# 没有建立连接
return "没有建立连接"
print("user_list",user_socket_list)
while 1:
try:
# 如果当前用户已经关闭,即返回None,终止连接
msg = user_socket.receive() # 接受用户信息
for socket in user_socket_list:
# 如果发送连接失败,则链接失效,在链接列表中删除链接
if socket != user_socket:
#包装成json格式,发送
msg_json = json.dumps({
"user":username,
"msg":msg
})
socket.send(msg_json) # 发送用户信息
except:
user_socket_list.remove(user_socket)
return "连接关闭"
if __name__ == '__main__':
http_serv = WSGIServer(("127.0.0.1",5000),app,handler_class=WebSocketHandler)
http_serv.serve_forever()
# app.run()
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>websocket</title>
<style>
.send_window{
border:2px solid black;
width:500px;
height:1000px;
}
</style>
</head>
<body>
<h1>websocket测试</h1>
<p>
<input type="text" id="username">
<button id="user_btn">确认姓名</button>
</p>
<p>
<input type="text" id="send_text">
<button id="send_btn">发送</button>
</p>
<div class="send_window">
</div>
</body>
<script type="text/javascript">
var user_btn = document.getElementById("user_btn");
var send_window = document.getElementsByClassName("send_window")[0];
var username = document.getElementById("username");
var ws = null;
user_btn.onclick = function () {
{# 协议头必须为ws:// 用ws建立协议 #}
ws = new WebSocket("ws://127.0.0.1:5000/ws/"+username.value);
{# 前端接受json字符串{user:'alex',msg:'123'}#}
ws.onmessage = function (data) {
data = JSON.parse(data.data);
send_window.innerHTML += "<p>"+ data.user + ":" + data.msg+"</p>"
};
};
var send_btn = document.getElementById("send_btn");
send_btn.onclick = function () {
var send_text = document.getElementById("send_text");
ws.send(send_text.value);
send_window.innerHTML += "<p style='text-align:right'>"+ send_text.value + ":" + username.value +"</p>";
send_text.value = "";
}
</script>
</html>
前端
ws = new websocket(“ws连接地址”)
ws.send("") 发送消息
ws.onmessage = func 接受消息
后端
reqeust.environ.get(‘wsgi.websocket’) 获取socket
socket.send() 发送
socket.receive 接受
单聊
web_socket.py
from flask import Flask,request,render_template
from geventwebsocket.websocket import WebSocket
from gevent.pywsgi import WSGIServer
from geventwebsocket.handler import WebSocketHandler
import json
app = Flask(__name__)
@app.route('/')
def hello_world():
return render_template("index2.html")
user_socket_list = {}
@app.route("/ws/<username>")
def ws(username):
user_socket = request.environ.get('wsgi.websocket') # type:WebSocket
print(user_socket)
if user_socket:
user_socket_list[username] = user_socket
else:
# 没有建立连接
return "没有建立连接"
print("user_list",user_socket_list)
while 1:
try:
# 如果当前用户已经关闭,即返回None,终止连接
recv_data = user_socket.receive()
recv = json.loads(recv_data)
print(recv)
to_sender = recv.get("to_sender")
msg = recv.get("msg")
to_send_sock = user_socket_list.get(to_sender)
#包装成json格式,发送
msg_json = json.dumps({
"user":username,
"msg":msg
})
to_send_sock.send(msg_json)
except:
user_socket_list.pop(username)
return "连接关闭"
if __name__ == '__main__':
http_serv = WSGIServer(("127.0.0.1",5000),app,handler_class=WebSocketHandler)
http_serv.serve_forever()
# app.run()
index.py
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>带昵称的单聊</title>
<style>
.send_window{
border:2px solid black;
width:500px;
height:1000px;
}
</style>
</head>
<body>
<h1>websocket测试</h1>
<p>
<input type="text" id="username">
<button id="user_btn">确认姓名</button>
</p>
<p>接收人<input type="text" id="to_send"></p>
<p>
<input type="text" id="send_text">
<button id="send_btn">发送</button>
</p>
<div class="send_window">
</div>
</body>
<script type="text/javascript">
var user_btn = document.getElementById("user_btn");
var send_window = document.getElementsByClassName("send_window")[0];
var username = document.getElementById("username");
var ws = null;
user_btn.onclick = function () {
ws = new WebSocket("ws://127.0.0.1:5000/ws/"+username.value);
{# 前端接受json字符串{user:'alex',msg:'123'}#}
ws.onmessage = function (data) {
data = JSON.parse(data.data);
send_window.innerHTML += "<p>"+ data.user + ":" + data.msg+"</p>"
};
};
var send_btn = document.getElementById("send_btn");
send_btn.onclick = function () {
var to_send = document.getElementById("to_send");
var send_text = document.getElementById("send_text");
var to_json = {
"to_sender":to_send.value,
"msg":send_text.value
};
ws.send(JSON.stringify(to_json));
send_window.innerHTML += "<p style='text-align:right'>"+ send_text.value + ":" + username.value +"</p>";
send_text.value = "";
}
</script>
</html>