UnisKB/apps/common/auth/handle/impl/user_token.py

249 lines
13 KiB
Python
Raw Normal View History

2025-04-14 12:11:23 +00:00
# coding=utf-8
"""
2025-04-15 12:37:38 +00:00
@project: MaxKB
2025-04-14 12:11:23 +00:00
@Author虎虎
@file authenticate.py
@date2024/3/14 03:02
@desc: 用户认证
"""
2025-04-16 12:09:00 +00:00
import datetime
from functools import reduce
from typing import List
2025-04-16 12:09:00 +00:00
2025-04-15 12:37:38 +00:00
from django.core.cache import cache
2025-04-14 12:11:23 +00:00
from django.db.models import QuerySet
2025-04-15 12:37:38 +00:00
from django.utils.translation import gettext_lazy as _
2025-04-14 12:11:23 +00:00
from common.auth.handle.auth_base_handle import AuthBaseHandle
2025-06-06 14:28:21 +00:00
from common.constants.authentication_type import AuthenticationType
2025-04-14 12:11:23 +00:00
from common.constants.cache_version import Cache_Version
from common.constants.permission_constants import Auth, PermissionConstants, ResourcePermissionGroup, \
get_permission_list_by_resource_group, ResourceAuthType, \
ResourcePermissionRole, get_default_role_permission_mapping_list, get_default_workspace_user_role_mapping_list
2025-04-15 12:37:38 +00:00
from common.database_model_manage.database_model_manage import DatabaseModelManage
2025-04-14 12:11:23 +00:00
from common.exception.app_exception import AppAuthenticationFailed
2025-04-16 12:09:00 +00:00
from common.utils.common import group_by
from system_manage.models.workspace_user_permission import WorkspaceUserResourcePermission
2025-04-14 12:11:23 +00:00
from users.models import User
2025-04-15 12:37:38 +00:00
2025-04-16 12:09:00 +00:00
def get_permission(permission_id):
"""
获取权限字符串
@param permission_id: 权限id
@return: 权限字符串
"""
2025-04-16 12:09:00 +00:00
if isinstance(permission_id, PermissionConstants):
permission_id = permission_id.value
return f"{permission_id}"
def get_workspace_permission(permission_id, workspace_id):
"""
获取工作空间权限字符串
@param permission_id: 权限id
@param workspace_id: 工作空间id
@return:
"""
2025-04-16 12:09:00 +00:00
if isinstance(permission_id, PermissionConstants):
permission_id = permission_id.value
return f"{permission_id}:/WORKSPACE/{workspace_id}"
def get_workspace_permission_list(role_permission_mapping_dict, workspace_user_role_mapping_list):
"""
获取工作空间下所有的权限
@param role_permission_mapping_dict: 角色权限关联字典
@param workspace_user_role_mapping_list: 工作空间用户角色关联列表
@return: 工作空间下的权限
"""
workspace_permission_list = [
[get_workspace_permission(role_permission_mapping.permission_id, w_u_r.workspace_id) for role_permission_mapping
in
role_permission_mapping_dict.get(w_u_r.role_id, [])] for w_u_r in workspace_user_role_mapping_list]
return reduce(lambda x, y: [*x, *y], workspace_permission_list, [])
def get_workspace_resource_permission_list(
workspace_user_resource_permission_list: List[WorkspaceUserResourcePermission],
role_permission_mapping_dict,
workspace_user_role_mapping_dict):
"""
@param workspace_user_resource_permission_list: 工作空间用户资源权限列表
@param role_permission_mapping_dict: 角色权限关联字典 key为role_id
@param workspace_user_role_mapping_dict: 工作空间用户角色映射字典 key为role_id
@return: 工作空间资源权限列表
"""
resource_permission_list = [
get_workspace_resource_permission_list_by_workspace_user_permission(workspace_user_resource_permission,
role_permission_mapping_dict,
workspace_user_role_mapping_dict) for
workspace_user_resource_permission in workspace_user_resource_permission_list]
# 将二维数组扁平为一维
return reduce(lambda x, y: [*x, *y], resource_permission_list, [])
def get_workspace_resource_permission_list_by_workspace_user_permission(
workspace_user_resource_permission: WorkspaceUserResourcePermission,
role_permission_mapping_dict,
workspace_user_role_mapping_dict):
"""
@param workspace_user_resource_permission: 工作空间用户资源权限对象
@param role_permission_mapping_dict: 角色权限关联字典 key为role_id
@param workspace_user_role_mapping_dict: 工作空间用户角色关联字典 key为role_id
@return: 工作空间用户资源的权限列表
"""
role_permission_mapping_list = [role_permission_mapping_dict.get(workspace_user_role_mapping.role_id) for
workspace_user_role_mapping in
workspace_user_role_mapping_dict.get(
workspace_user_resource_permission.workspace_id)]
role_permission_mapping_list = reduce(lambda x, y: [*x, *y], role_permission_mapping_list, [])
# 如果是根据角色
2025-05-06 10:35:11 +00:00
if (workspace_user_resource_permission.auth_type == ResourceAuthType.ROLE
and workspace_user_resource_permission.permission_list.__contains__(
ResourcePermissionRole.ROLE)):
2025-04-16 12:09:00 +00:00
return [
f"{role_permission_mapping.permission_id}:/WORKSPACE/{workspace_user_resource_permission.workspace_id}/{workspace_user_resource_permission.auth_target_type}/{workspace_user_resource_permission.target}"
for role_permission_mapping in role_permission_mapping_list]
2025-05-06 10:35:11 +00:00
elif workspace_user_resource_permission.auth_type == ResourceAuthType.RESOURCE_PERMISSION_GROUP:
resource_permission_list = [
[
f"{permission}:/WORKSPACE/{workspace_user_resource_permission.workspace_id}/{workspace_user_resource_permission.auth_target_type}/{workspace_user_resource_permission.target}"
for permission in get_permission_list_by_resource_group(ResourcePermissionGroup[resource_permission])]
for resource_permission in workspace_user_resource_permission.permission_list if
ResourcePermissionGroup.values.__contains__(resource_permission)]
# 将二维数组扁平为一维
return reduce(lambda x, y: [*x, *y], resource_permission_list, [])
return []
2025-04-16 12:09:00 +00:00
def get_permission_list(user,
2025-04-15 12:37:38 +00:00
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
2025-04-16 12:09:00 +00:00
user_id = user.id
version = Cache_Version.PERMISSION_LIST.get_version()
key = Cache_Version.PERMISSION_LIST.get_key(user_id=user_id)
2025-04-15 12:37:38 +00:00
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
permission_list = cache.get(key, version=version)
if permission_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user_id)
workspace_user_role_mapping_dict = group_by(workspace_user_role_mapping_list,
2025-05-06 10:35:11 +00:00
lambda item: item.workspace_id)
2025-04-15 12:37:38 +00:00
# 获取角色权限映射数据
role_permission_mapping_list = QuerySet(role_permission_mapping_model).filter(
role_id__in=[workspace_user_role_mapping.role_id for workspace_user_role_mapping in
workspace_user_role_mapping_list])
role_permission_mapping_dict = group_by(role_permission_mapping_list, lambda item: item.role_id)
2025-04-16 12:09:00 +00:00
workspace_user_permission_list = QuerySet(WorkspaceUserResourcePermission).filter(
2025-04-16 12:09:00 +00:00
workspace_id__in=[workspace_user_role.workspace_id for workspace_user_role in
workspace_user_role_mapping_list])
# 资源权限
workspace_resource_permission_list = get_workspace_resource_permission_list(workspace_user_permission_list,
role_permission_mapping_dict,
workspace_user_role_mapping_dict)
workspace_permission_list = get_workspace_permission_list(role_permission_mapping_dict,
workspace_user_role_mapping_list)
# 系统权限
system_permission_list = [role_permission_mapping.permission_id for role_permission_mapping in
role_permission_mapping_list]
# 合并权限
permission_list = system_permission_list + workspace_permission_list + workspace_resource_permission_list
2025-04-15 12:37:38 +00:00
cache.set(key, permission_list, version=version)
else:
2025-04-16 12:09:00 +00:00
workspace_id_list = ['default']
workspace_user_resource_permission_list = QuerySet(WorkspaceUserResourcePermission).filter(
2025-04-16 12:09:00 +00:00
workspace_id__in=workspace_id_list)
role_permission_mapping_list = get_default_role_permission_mapping_list()
role_permission_mapping_dict = group_by(role_permission_mapping_list, lambda item: item.role_id)
workspace_user_role_mapping_list = get_default_workspace_user_role_mapping_list([user.role])
workspace_user_role_mapping_dict = group_by(workspace_user_role_mapping_list,
2025-05-06 10:35:11 +00:00
lambda item: item.workspace_id)
# 资源权限
workspace_resource_permission_list = get_workspace_resource_permission_list(
workspace_user_resource_permission_list,
role_permission_mapping_dict,
workspace_user_role_mapping_dict)
2025-04-16 12:09:00 +00:00
workspace_permission_list = get_workspace_permission_list(role_permission_mapping_dict,
workspace_user_role_mapping_list)
# 系统权限
system_permission_list = [role_permission_mapping.permission_id for role_permission_mapping in
2025-04-28 06:41:46 +00:00
role_permission_mapping_list if
[user.role].__contains__(role_permission_mapping.role_id)]
# 合并权限
permission_list = system_permission_list + workspace_permission_list + workspace_resource_permission_list
2025-04-15 12:37:38 +00:00
cache.set(key, permission_list, version=version)
return permission_list
def get_role_list(user,
workspace_user_role_mapping_model,
workspace_model,
role_model,
role_permission_mapping_model):
2025-04-16 12:09:00 +00:00
"""
获取当前用户的角色列表
"""
version = Cache_Version.ROLE_LIST.get_version()
key = Cache_Version.ROLE_LIST.get_key(user_id=user.id)
2025-04-15 12:37:38 +00:00
workspace_list = cache.get(key, version=version)
# 获取权限列表
is_query_model = workspace_user_role_mapping_model is not None and workspace_model is not None and role_model is not None and role_permission_mapping_model is not None
if workspace_list is None:
if is_query_model:
# 获取工作空间 用户 角色映射数据
workspace_user_role_mapping_list = QuerySet(workspace_user_role_mapping_model).filter(user_id=user.id)
2025-04-16 12:09:00 +00:00
cache.set(key,
[f"{workspace_user_role_mapping.role_id}:/WORKSPACE/{workspace_user_role_mapping.workspace_id}"
for
workspace_user_role_mapping in
workspace_user_role_mapping_list] + [user.role], version=version)
2025-04-15 12:37:38 +00:00
else:
cache.set(key, [user.role], version=version)
return [user.role]
return workspace_list
2025-04-16 12:09:00 +00:00
def get_auth(user):
2025-04-15 12:37:38 +00:00
workspace_user_role_mapping_model = DatabaseModelManage.get_model("workspace_user_role_mapping")
workspace_model = DatabaseModelManage.get_model("workspace_model")
role_model = DatabaseModelManage.get_model("role_model")
role_permission_mapping_model = DatabaseModelManage.get_model("role_permission_mapping_model")
2025-04-16 12:09:00 +00:00
permission_list = get_permission_list(user, workspace_user_role_mapping_model, workspace_model,
2025-04-15 12:37:38 +00:00
role_model, role_permission_mapping_model)
2025-04-16 12:09:00 +00:00
role_list = get_role_list(user, workspace_user_role_mapping_model, workspace_model,
2025-04-15 12:37:38 +00:00
role_model, role_permission_mapping_model)
2025-04-16 12:09:00 +00:00
return Auth(role_list, permission_list)
2025-04-14 12:11:23 +00:00
class UserToken(AuthBaseHandle):
def support(self, request, token: str, get_token_details):
auth_details = get_token_details()
if auth_details is None:
return False
2025-06-06 14:28:21 +00:00
return 'id' in auth_details and auth_details.get('type') == AuthenticationType.SYSTEM_USER.value
2025-04-14 12:11:23 +00:00
def handle(self, request, token: str, get_token_details):
2025-04-15 12:37:38 +00:00
version, get_key = Cache_Version.TOKEN.value
cache_token = cache.get(get_key(token), version=version)
2025-04-14 12:11:23 +00:00
if cache_token is None:
raise AppAuthenticationFailed(1002, _('Login expired'))
auth_details = get_token_details()
2025-04-16 12:09:00 +00:00
cache.touch(token, timeout=datetime.timedelta(seconds=60 * 60 * 2).seconds, version=version)
2025-04-14 12:11:23 +00:00
user = QuerySet(User).get(id=auth_details['id'])
2025-04-16 12:09:00 +00:00
auth = get_auth(user)
2025-04-15 12:37:38 +00:00
return user, auth