2025-04-14 12:11:23 +00:00
|
|
|
|
# coding=utf-8
|
|
|
|
|
|
"""
|
2025-04-15 12:37:38 +00:00
|
|
|
|
@project: MaxKB
|
2025-04-14 12:11:23 +00:00
|
|
|
|
@Author:虎虎
|
|
|
|
|
|
@file: authenticate.py
|
|
|
|
|
|
@date:2024/3/14 03:02
|
|
|
|
|
|
@desc: 用户认证
|
|
|
|
|
|
"""
|
2025-04-15 12:37:38 +00:00
|
|
|
|
from django.core.cache import cache
|
2025-04-14 12:11:23 +00:00
|
|
|
|
from django.db.models import QuerySet
|
2025-04-15 12:37:38 +00:00
|
|
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
|
|
|
2025-04-14 12:11:23 +00:00
|
|
|
|
from common.auth.handle.auth_base_handle import AuthBaseHandle
|
|
|
|
|
|
from common.constants.cache_version import Cache_Version
|
2025-04-15 12:37:38 +00:00
|
|
|
|
from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role
|
|
|
|
|
|
from common.database_model_manage.database_model_manage import DatabaseModelManage
|
2025-04-14 12:11:23 +00:00
|
|
|
|
from common.exception.app_exception import AppAuthenticationFailed
|
|
|
|
|
|
from users.models import User
|
2025-04-15 12:37:38 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_permission_list(user_id,
|
|
|
|
|
|
workspace_id,
|
|
|
|
|
|
workspace_user_role_mapping_model,
|
|
|
|
|
|
workspace_model,
|
|
|
|
|
|
role_model,
|
|
|
|
|
|
role_permission_mapping_model):
|
|
|
|
|
|
version, get_key = Cache_Version.PERMISSION_LIST.value
|
|
|
|
|
|
key = get_key(user_id, workspace_id)
|
|
|
|
|
|
# 获取权限列表
|
|
|
|
|
|
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
|
|
|
|
|
|
permission_list = cache.get(key, version=version)
|
|
|
|
|
|
if permission_list is None:
|
|
|
|
|
|
if is_query_model:
|
|
|
|
|
|
# 获取工作空间 用户 角色映射数据
|
|
|
|
|
|
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
|
|
|
|
|
|
# 获取角色权限映射数据
|
|
|
|
|
|
role_permission_mapping_list = QuerySet(role_permission_mapping_model).filter(
|
|
|
|
|
|
role_id__in=[workspace_user_role_mapping.role_id for workspace_user_role_mapping in
|
|
|
|
|
|
workspace_user_role_mapping_list])
|
|
|
|
|
|
permission_list = [role_model.id for role_model in role_permission_mapping_list]
|
|
|
|
|
|
cache.set(key, permission_list, version=version)
|
|
|
|
|
|
else:
|
|
|
|
|
|
permission_list = get_default_permission_list_by_role(RoleConstants.ADMIN)
|
|
|
|
|
|
cache.set(key, permission_list, version=version)
|
|
|
|
|
|
return permission_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_workspace_list(user_id,
|
|
|
|
|
|
workspace_id,
|
|
|
|
|
|
workspace_user_role_mapping_model,
|
|
|
|
|
|
workspace_model,
|
|
|
|
|
|
role_model,
|
|
|
|
|
|
role_permission_mapping_model):
|
|
|
|
|
|
version, get_key = Cache_Version.WORKSPACE_LIST.value
|
|
|
|
|
|
key = get_key(user_id)
|
|
|
|
|
|
workspace_list = cache.get(key, version=version)
|
|
|
|
|
|
# 获取权限列表
|
|
|
|
|
|
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
|
|
|
|
|
|
if workspace_list is None:
|
|
|
|
|
|
if is_query_model:
|
|
|
|
|
|
# 获取工作空间 用户 角色映射数据
|
|
|
|
|
|
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
|
|
|
|
|
|
cache.set(key, [workspace_user_role_mapping.workspace_id for workspace_user_role_mapping in
|
|
|
|
|
|
workspace_user_role_mapping_list], version=version)
|
|
|
|
|
|
else:
|
|
|
|
|
|
return ["default"]
|
|
|
|
|
|
return workspace_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_role_list(user,
|
|
|
|
|
|
workspace_id,
|
|
|
|
|
|
workspace_user_role_mapping_model,
|
|
|
|
|
|
workspace_model,
|
|
|
|
|
|
role_model,
|
|
|
|
|
|
role_permission_mapping_model):
|
|
|
|
|
|
version, get_key = Cache_Version.ROLE_LIST.value
|
|
|
|
|
|
key = get_key(user.id, workspace_id)
|
|
|
|
|
|
workspace_list = cache.get(key, version=version)
|
|
|
|
|
|
# 获取权限列表
|
|
|
|
|
|
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
|
|
|
|
|
|
if workspace_list is None:
|
|
|
|
|
|
if is_query_model:
|
|
|
|
|
|
# 获取工作空间 用户 角色映射数据
|
|
|
|
|
|
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user.id)
|
|
|
|
|
|
cache.set(key, [workspace_user_role_mapping.role_id for workspace_user_role_mapping in
|
|
|
|
|
|
workspace_user_role_mapping_list], version=version)
|
|
|
|
|
|
else:
|
|
|
|
|
|
cache.set(key, [user.role], version=version)
|
|
|
|
|
|
return [user.role]
|
|
|
|
|
|
return workspace_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_auth(user, workspace_id):
|
|
|
|
|
|
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
|
|
|
|
|
|
workspace_model = DatabaseModelManage.get_model("workspace_model")
|
|
|
|
|
|
role_model = DatabaseModelManage.get_model("role_model")
|
|
|
|
|
|
role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
|
|
|
|
|
|
workspace_list = get_workspace_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model,
|
|
|
|
|
|
role_model, role_permission_mapping_model)
|
|
|
|
|
|
permission_list = get_permission_list(user.id, workspace_id, workspace_user_role_mapping_model, workspace_model,
|
|
|
|
|
|
role_model, role_permission_mapping_model)
|
|
|
|
|
|
role_list = get_role_list(user, workspace_id, workspace_user_role_mapping_model, workspace_model,
|
|
|
|
|
|
role_model, role_permission_mapping_model)
|
|
|
|
|
|
return Auth(workspace_list, workspace_id, role_list, permission_list)
|
2025-04-14 12:11:23 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UserToken(AuthBaseHandle):
|
|
|
|
|
|
def support(self, request, token: str, get_token_details):
|
|
|
|
|
|
auth_details = get_token_details()
|
|
|
|
|
|
if auth_details is None:
|
|
|
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
def handle(self, request, token: str, get_token_details):
|
2025-04-15 12:37:38 +00:00
|
|
|
|
version, get_key = Cache_Version.TOKEN.value
|
|
|
|
|
|
cache_token = cache.get(get_key(token), version=version)
|
2025-04-14 12:11:23 +00:00
|
|
|
|
if cache_token is None:
|
|
|
|
|
|
raise AppAuthenticationFailed(1002, _('Login expired'))
|
|
|
|
|
|
auth_details = get_token_details()
|
2025-04-15 12:37:38 +00:00
|
|
|
|
# 当前工作空间
|
|
|
|
|
|
current_workspace = auth_details['current_workspace']
|
2025-04-14 12:11:23 +00:00
|
|
|
|
user = QuerySet(User).get(id=auth_details['id'])
|
2025-04-15 12:37:38 +00:00
|
|
|
|
auth = get_auth(user, current_workspace)
|
|
|
|
|
|
return user, auth
|