2023-09-15 09:40:35 +00:00
|
|
|
|
# coding=utf-8
|
|
|
|
|
|
"""
|
|
|
|
|
|
@project: qabot
|
|
|
|
|
|
@Author:虎
|
|
|
|
|
|
@file: authenticate.py
|
|
|
|
|
|
@date:2023/9/4 11:16
|
|
|
|
|
|
@desc: 认证类
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
from django.core import cache
|
|
|
|
|
|
from django.core import signing
|
2023-10-09 11:03:41 +00:00
|
|
|
|
from django.db.models import QuerySet
|
2023-09-15 09:40:35 +00:00
|
|
|
|
from rest_framework.authentication import TokenAuthentication
|
2023-10-09 11:03:41 +00:00
|
|
|
|
|
|
|
|
|
|
from common.constants.permission_constants import Auth, get_permission_list_by_role, RoleConstants
|
|
|
|
|
|
from common.exception.app_exception import AppAuthenticationFailed
|
2023-09-15 09:40:35 +00:00
|
|
|
|
from smartdoc.settings import JWT_AUTH
|
2023-10-09 11:03:41 +00:00
|
|
|
|
from users.models.user import User, get_user_dynamics_permission
|
2023-09-15 09:40:35 +00:00
|
|
|
|
|
|
|
|
|
|
token_cache = cache.caches['token_cache']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnonymousAuthentication(TokenAuthentication):
|
|
|
|
|
|
def authenticate(self, request):
|
|
|
|
|
|
return None, None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TokenAuth(TokenAuthentication):
|
|
|
|
|
|
# 重新 authenticate 方法,自定义认证规则
|
|
|
|
|
|
def authenticate(self, request):
|
|
|
|
|
|
auth = request.META.get('HTTP_AUTHORIZATION', None
|
|
|
|
|
|
)
|
|
|
|
|
|
# 未认证
|
|
|
|
|
|
if auth is None:
|
|
|
|
|
|
raise AppAuthenticationFailed(1003, '未登录,请先登录')
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 解析 token
|
|
|
|
|
|
user = signing.loads(auth)
|
|
|
|
|
|
if 'id' in user:
|
|
|
|
|
|
cache_token = token_cache.get(auth)
|
|
|
|
|
|
if cache_token is None:
|
|
|
|
|
|
raise AppAuthenticationFailed(1002, "登录过期")
|
2023-10-09 11:03:41 +00:00
|
|
|
|
user = QuerySet(User).get(id=user['id'])
|
2023-09-15 09:40:35 +00:00
|
|
|
|
# 续期
|
|
|
|
|
|
token_cache.touch(auth, timeout=JWT_AUTH['JWT_EXPIRATION_DELTA'].total_seconds())
|
|
|
|
|
|
rule = RoleConstants[user.role]
|
2023-10-09 11:03:41 +00:00
|
|
|
|
permission_list = get_permission_list_by_role(RoleConstants[user.role])
|
|
|
|
|
|
# 获取用户的应用和数据集的权限
|
|
|
|
|
|
permission_list += get_user_dynamics_permission(str(user.id))
|
2023-09-15 09:40:35 +00:00
|
|
|
|
return user, Auth(role_list=[rule],
|
2023-10-09 11:03:41 +00:00
|
|
|
|
permission_list=permission_list)
|
2023-09-15 09:40:35 +00:00
|
|
|
|
else:
|
|
|
|
|
|
raise AppAuthenticationFailed(1002, "身份验证信息不正确!非法用户")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
raise AppAuthenticationFailed(1002, "身份验证信息不正确!非法用户")
|