将在前面的话
fastapi默认的组件,官网上其实也有,关于权限什么的,在这里 FastAPI 安全性简介
但是我在学习过程中,发现他官方提供的默认组件,有以下缺陷(当然这些是我个人的一些看法,可能我FastAPI本身学习的不到位,没有get到他的一些知识点,如果有大佬能够有更好的解决方案,希望能放在评论区 ):
- 用户名,过期时间等关键信息,存储在客户端,并且使用默认的jwt,基本没有加密存在安全隐患
- 默认组件并不会刷新用户验证信息的过期时间,每次登录会话持续的时间是固定的,而不是随着用户的访问,刷新持续时间。
- 提供的功能并不完善,如果加入不同的权限,其实还是有很多工作量。
- 默认将授权码,放在了requests headers里,很多时候其实不太方便,主要是习惯了cookie,毕竟浏览器会自动携带cookie信息。
所以我参考flask-login
,使用Fastapi依赖的方式,写了一个权限组件,考虑到这部分组件未来开发可能还是用得到的,所以特意上传一下。
第一节是使用起来的效果,后续是具体实现的过程,详细阅读的话建议把第一节放在最后看。
1. 在视图中使用
- 首先导入实例化的权限管理对象
from ..permission import role_required
- 当某个视图,需要某权限才能访问的时候,我们可以这样调用:
通过fastapi框架的依赖,就可以先通过权限组件,获取当前的用户信息,通过传递不同的参数,锁定不同的权限需要,如果权限不足,则返回错误的状态码。
@router.get("/")
async def read_users(
current_user: schemas.PyUser=Depends(role_required("管理员"))
):
print(current_user)
return [{
"username": "Foo"}, {
"username": "Bar"}]
- 登录时,注册当前用户
调用role_required.login(response, current_user)
,将当前用户注册进权限组件,这样当前用户访问其他需要权限的路由时,就会自动判断权限了
@router.post("/login/")
async def login(
user:schemas.PyUserLogin,
response: Response,
session: Session = Depends(get_db),
):
dbuser = session.query(db.User).filter(db.User.username == db.User.username).first()
if not (dbuser and dbuser.verify_password(user.password)):
raise ApiException(
code = 1001,
message = "账号不存在或密码错误"
)
current_user = schemas.PyUser.from_orm(dbuser)
role_required.login(response, current_user)
return ApiResponse(
code = 0,
message = "登录成功",
data = {
"username": current_user.username,
"realname": current_user.realname,
"description": current_user.description
}
)
- 登出时,注销当前用户
登出时,调用role_required.logout(request)
即可从权限组件中注销掉当前用户,此时组件中用户对应的cookie会立即移除,这样就去掉了用户的登录信息了
@router.post("/logout/")
async def logout(
request: Request,
_: schemas.PyUser=Depends(role_required("管理员"))
):
current_user = role_required.logout(request)
return ApiResponse(
code = 0,
message = "登出成功",
data = {
"username": current_user.username,
"realname": current_user.realname,
"description": current_user.description
}
)
2. 文件结构
权限定义
这部分的定义如果又需要的话,可以从数据库查询之类的,等等,这里只使用了最简单的定义
这部分代码在文件roles.py
中
roles = {
"超级管理员": 1,
"管理员": 2,
"标注员": 3,
"编辑者": 4,
"审核员": 5,
"游客": 6,
}
3. 权限组件类
权限组件核心类,代码在require.py
里
三个主要函数:
- login() 用户登录
- logout() 用户登出
- self() 定义访问需要的权限
- 需要注意的是,我在出现权限不足时,返回的
ApiException
是一个本项目中自定义的错误状态,如有需要,自行更改。 - 具体的使用方法在下面
from fastapi import HTTPException, Response, Request
import copy
from datetime import datetime, timedelta
import uuid
from starlette import status
from ..exception.apiexception import ApiResponse, ApiException
class RoleRequired:
def __init__(
self,
guest,
roles,
redirect_url: str = None,
expire_minutes: int = 30,
clear_interval: int = 60,
):
self.sessions = {
}
self.roles = roles
self.guest = guest
self.redirect_url = redirect_url
self.expire_minutes = expire_minutes
self.userclass = type(self.guest)
self.last_clear_time = datetime.utcnow()
self.interval = timedelta(minutes=clear_interval)
def __clear_overstayed(self):
now = datetime.utcnow()
if (now - self.last_clear_time) < self.interval:
return
self.last_clear_time = now
self.sessions = {
k: v for k, v in self.sessions.items() if v['exp'] < now }
def __create_token(self, response: Response, user=None):
# print(self.sessions)
self.__clear_overstayed()
authorization = str(uuid.uuid1())
response.set_cookie(key="authorization", value=authorization)
if not user:
user = copy.deepcopy(self.guest)
self.sessions[authorization] = {
"user": user,
"exp": datetime.utcnow() + timedelta(minutes=self.expire_minutes)
}
return self.sessions[authorization]
def __update_exp(self, authorization):
exp = datetime.utcnow() + timedelta(minutes=self.expire_minutes)
self.sessions[authorization]["exp"] = exp
return self.sessions[authorization]
def __verify_roles(self, user, roleids):
if isinstance(roleids, list):
return user.roleid in roleids
return user.roleid == roleids
def login(self, response: Response, user):
# assert type(user) == type(self.guest)
current_session = self.__create_token(response, user)
return current_session['user']
def logout(self, request: Request):
authorization = request.cookies.get("authorization", None)
try:
current_session = self.sessions.pop(authorization)
return current_session['user']
except :
raise ApiException(code=1005, message="当前未登录,登出发生错误")
def __call__(self, *roles, **kwargs):
login_only = kwargs.get("login_only", False)
roles = [ self.roles[x] for x in roles ]
if login_only:
roles = list(self.roles.values())
async def func(request: Request, response: Response) -> self.userclass:
authorization = request.cookies.get("authorization", None)
if not authorization or authorization not in self.sessions:
current_session = self.__create_token(response)
else:
ntime = datetime.utcnow()
session = self.sessions.get(authorization, None)
if session['exp'] < ntime:
self.sessions.pop(authorization)
current_session = self.__create_token(response)
else:
current_session = self.__update_exp(authorization)
current_user = current_session['user']
if not roles or self.__verify_roles(current_user, roles):
return current_session['user']
else:
raise ApiException(
code=1004, message="权限不足"
)
return func
4. 实例化权限类
这部分代码在 权限包中__init__.py
文件中
实例化的主要参数如下:
- guest:当没有登录的游客访问时,默认给他们的一个游客账户
- roles:权限的字典,就是
roles.py
中定义的字典 - redirect_url: str = None:没有登录或者没有权限时,跳转的url(这部分功能没实现,因为直接返回1004权限不足状态码了)
- expire_minutes: int = 30:登录状态持续时间,过期的cookie将不再生效
- clear_interval: int = 60:时间间隔超过60分钟时,如果有请求,将会清空一次过期的cookie
from .require import RoleRequired
from ..models.schemas import users as schemas
from .roles import roles
from app.settings import SESSION_DURATION
role_required = RoleRequired(guest=schemas.PyUser( userid=0, username="guest", password="guest", realname="guest", roleid=6 ),
roles=roles,
expire_minutes=SESSION_DURATION)