2023-09-15 09:40:35 +00:00
|
|
|
|
# coding=utf-8
|
|
|
|
|
|
"""
|
|
|
|
|
|
@project: qabot
|
|
|
|
|
|
@Author:虎
|
|
|
|
|
|
@file: authenticate.py
|
|
|
|
|
|
@date:2023/9/4 11:16
|
|
|
|
|
|
@desc: 认证类
|
|
|
|
|
|
"""
|
2024-03-13 08:07:13 +00:00
|
|
|
|
import traceback
|
2024-07-09 10:59:15 +00:00
|
|
|
|
from importlib import import_module
|
2023-09-15 09:40:35 +00:00
|
|
|
|
|
2024-07-09 10:59:15 +00:00
|
|
|
|
from django.conf import settings
|
2023-09-15 09:40:35 +00:00
|
|
|
|
from django.core import cache
|
|
|
|
|
|
from django.core import signing
|
|
|
|
|
|
from rest_framework.authentication import TokenAuthentication
|
2023-10-09 11:03:41 +00:00
|
|
|
|
|
2024-03-13 08:07:13 +00:00
|
|
|
|
from common.exception.app_exception import AppAuthenticationFailed, AppEmbedIdentityFailed, AppChatNumOutOfBoundsFailed
|
2023-09-15 09:40:35 +00:00
|
|
|
|
|
|
|
|
|
|
token_cache = cache.caches['token_cache']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnonymousAuthentication(TokenAuthentication):
|
|
|
|
|
|
def authenticate(self, request):
|
|
|
|
|
|
return None, None
|
|
|
|
|
|
|
|
|
|
|
|
|
2024-07-09 10:59:15 +00:00
|
|
|
|
def new_instance_by_class_path(class_path: str):
|
|
|
|
|
|
parts = class_path.rpartition('.')
|
|
|
|
|
|
package_path = parts[0]
|
|
|
|
|
|
class_name = parts[2]
|
|
|
|
|
|
module = import_module(package_path)
|
|
|
|
|
|
HandlerClass = getattr(module, class_name)
|
|
|
|
|
|
return HandlerClass()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
handles = [new_instance_by_class_path(class_path) for class_path in settings.AUTH_HANDLES]
|
2024-03-13 21:43:01 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TokenDetails:
|
|
|
|
|
|
token_details = None
|
|
|
|
|
|
is_load = False
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, token: str):
|
|
|
|
|
|
self.token = token
|
|
|
|
|
|
|
|
|
|
|
|
def get_token_details(self):
|
|
|
|
|
|
if self.token_details is None and not self.is_load:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.token_details = signing.loads(self.token)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
self.is_load = True
|
|
|
|
|
|
return self.token_details
|
|
|
|
|
|
|
|
|
|
|
|
|
2023-09-15 09:40:35 +00:00
|
|
|
|
class TokenAuth(TokenAuthentication):
|
|
|
|
|
|
# 重新 authenticate 方法,自定义认证规则
|
|
|
|
|
|
def authenticate(self, request):
|
2024-04-16 14:55:34 +00:00
|
|
|
|
auth = request.META.get('HTTP_AUTHORIZATION')
|
2023-09-15 09:40:35 +00:00
|
|
|
|
# 未认证
|
|
|
|
|
|
if auth is None:
|
|
|
|
|
|
raise AppAuthenticationFailed(1003, '未登录,请先登录')
|
|
|
|
|
|
try:
|
2024-03-13 21:43:01 +00:00
|
|
|
|
token_details = TokenDetails(auth)
|
|
|
|
|
|
for handle in handles:
|
|
|
|
|
|
if handle.support(request, auth, token_details.get_token_details):
|
|
|
|
|
|
return handle.handle(request, auth, token_details.get_token_details)
|
|
|
|
|
|
raise AppAuthenticationFailed(1002, "身份验证信息不正确!非法用户")
|
2023-09-15 09:40:35 +00:00
|
|
|
|
except Exception as e:
|
2024-03-13 08:07:13 +00:00
|
|
|
|
traceback.format_exc()
|
|
|
|
|
|
if isinstance(e, AppEmbedIdentityFailed) or isinstance(e, AppChatNumOutOfBoundsFailed):
|
|
|
|
|
|
raise e
|
2023-09-15 09:40:35 +00:00
|
|
|
|
raise AppAuthenticationFailed(1002, "身份验证信息不正确!非法用户")
|