feat: add authentication settings API endpoints and enhance login logic with access token validation
parent
ceb601d74a
commit
0969f70104
|
|
@ -190,7 +190,8 @@ class CaptchaView(APIView):
|
||||||
tags=[_("Chat")], # type: ignore
|
tags=[_("Chat")], # type: ignore
|
||||||
responses=CaptchaAPI.get_response())
|
responses=CaptchaAPI.get_response())
|
||||||
def get(self, request: Request):
|
def get(self, request: Request):
|
||||||
return result.success(CaptchaSerializer().generate())
|
username = request.query_params.get('username', None)
|
||||||
|
return result.success(CaptchaSerializer().generate(username, 'chat'))
|
||||||
|
|
||||||
|
|
||||||
class SpeechToText(APIView):
|
class SpeechToText(APIView):
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@
|
||||||
"""
|
"""
|
||||||
import base64
|
import base64
|
||||||
import datetime
|
import datetime
|
||||||
|
import json
|
||||||
|
|
||||||
from captcha.image import ImageCaptcha
|
from captcha.image import ImageCaptcha
|
||||||
from django.core import signing
|
from django.core import signing
|
||||||
|
|
@ -18,6 +19,7 @@ from rest_framework import serializers
|
||||||
|
|
||||||
from common.constants.authentication_type import AuthenticationType
|
from common.constants.authentication_type import AuthenticationType
|
||||||
from common.constants.cache_version import Cache_Version
|
from common.constants.cache_version import Cache_Version
|
||||||
|
from common.database_model_manage.database_model_manage import DatabaseModelManage
|
||||||
from common.exception.app_exception import AppApiException
|
from common.exception.app_exception import AppApiException
|
||||||
from common.utils.common import password_encrypt, get_random_chars
|
from common.utils.common import password_encrypt, get_random_chars
|
||||||
from maxkb.const import CONFIG
|
from maxkb.const import CONFIG
|
||||||
|
|
@ -27,7 +29,11 @@ from users.models import User
|
||||||
class LoginRequest(serializers.Serializer):
|
class LoginRequest(serializers.Serializer):
|
||||||
username = serializers.CharField(required=True, max_length=64, help_text=_("Username"), label=_("Username"))
|
username = serializers.CharField(required=True, max_length=64, help_text=_("Username"), label=_("Username"))
|
||||||
password = serializers.CharField(required=True, max_length=128, label=_("Password"))
|
password = serializers.CharField(required=True, max_length=128, label=_("Password"))
|
||||||
captcha = serializers.CharField(required=True, max_length=64, label=_('captcha'))
|
captcha = serializers.CharField(required=False, max_length=64, label=_('captcha'), allow_null=True,
|
||||||
|
allow_blank=True)
|
||||||
|
|
||||||
|
|
||||||
|
system_version, system_get_key = Cache_Version.SYSTEM.value
|
||||||
|
|
||||||
|
|
||||||
class LoginResponse(serializers.Serializer):
|
class LoginResponse(serializers.Serializer):
|
||||||
|
|
@ -37,23 +43,70 @@ class LoginResponse(serializers.Serializer):
|
||||||
token = serializers.CharField(required=True, label=_("token"))
|
token = serializers.CharField(required=True, label=_("token"))
|
||||||
|
|
||||||
|
|
||||||
|
def record_login_fail(username: str, expire: int = 3600):
|
||||||
|
"""记录登录失败次数"""
|
||||||
|
if not username:
|
||||||
|
return
|
||||||
|
fail_key = system_get_key(f'system_{username}')
|
||||||
|
fail_count = cache.get(fail_key, version=system_version)
|
||||||
|
if fail_count is None:
|
||||||
|
cache.set(fail_key, 1, timeout=expire, version=system_version)
|
||||||
|
else:
|
||||||
|
cache.incr(fail_key, 1, version=system_version)
|
||||||
|
|
||||||
|
|
||||||
class LoginSerializer(serializers.Serializer):
|
class LoginSerializer(serializers.Serializer):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def login(instance):
|
def login(instance):
|
||||||
LoginRequest(data=instance).is_valid(raise_exception=True)
|
username = instance.get("username", "")
|
||||||
username = instance.get('username')
|
try:
|
||||||
password = instance.get('password')
|
LoginRequest(data=instance).is_valid(raise_exception=True)
|
||||||
captcha = instance.get('captcha')
|
except Exception as e:
|
||||||
captcha_cache = cache.get(Cache_Version.CAPTCHA.get_key(captcha=captcha.lower()),
|
record_login_fail(username)
|
||||||
version=Cache_Version.CAPTCHA.get_version())
|
raise e
|
||||||
if captcha_cache is None:
|
auth_setting_model = DatabaseModelManage.get_model('auth_setting')
|
||||||
raise AppApiException(1005, _("Captcha code error or expiration"))
|
# 默认配置
|
||||||
|
auth_setting = {}
|
||||||
|
if auth_setting_model:
|
||||||
|
setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first()
|
||||||
|
if setting_obj:
|
||||||
|
try:
|
||||||
|
auth_setting = json.loads(setting_obj.param_value) or {}
|
||||||
|
except Exception:
|
||||||
|
auth_setting = {}
|
||||||
|
|
||||||
|
max_attempts = auth_setting.get("max_attempts", 0)
|
||||||
|
password = instance.get("password")
|
||||||
|
captcha = instance.get("captcha", "")
|
||||||
|
|
||||||
|
# 判断是否需要验证码
|
||||||
|
need_captcha = True
|
||||||
|
if max_attempts == -1:
|
||||||
|
need_captcha = False
|
||||||
|
elif max_attempts > 0:
|
||||||
|
fail_count = cache.get(system_get_key(f'system_{username}'), version=system_version) or 0
|
||||||
|
need_captcha = fail_count >= max_attempts
|
||||||
|
|
||||||
|
if need_captcha:
|
||||||
|
if not captcha:
|
||||||
|
raise AppApiException(1005, _("Captcha is required"))
|
||||||
|
|
||||||
|
captcha_cache = cache.get(
|
||||||
|
Cache_Version.CAPTCHA.get_key(captcha=f"system_{username}"),
|
||||||
|
version=Cache_Version.CAPTCHA.get_version()
|
||||||
|
)
|
||||||
|
if captcha_cache is None or captcha.lower() != captcha_cache:
|
||||||
|
raise AppApiException(1005, _("Captcha code error or expiration"))
|
||||||
|
|
||||||
user = QuerySet(User).filter(username=username, password=password_encrypt(password)).first()
|
user = QuerySet(User).filter(username=username, password=password_encrypt(password)).first()
|
||||||
if user is None:
|
if user is None:
|
||||||
|
record_login_fail(username)
|
||||||
raise AppApiException(500, _('The username or password is incorrect'))
|
raise AppApiException(500, _('The username or password is incorrect'))
|
||||||
if not user.is_active:
|
if not user.is_active:
|
||||||
|
record_login_fail(username)
|
||||||
raise AppApiException(1005, _("The user has been disabled, please contact the administrator!"))
|
raise AppApiException(1005, _("The user has been disabled, please contact the administrator!"))
|
||||||
|
cache.delete(system_get_key(f'system_{username}'), version=system_version)
|
||||||
token = signing.dumps({'username': user.username,
|
token = signing.dumps({'username': user.username,
|
||||||
'id': str(user.id),
|
'id': str(user.id),
|
||||||
'email': user.email,
|
'email': user.email,
|
||||||
|
|
@ -73,11 +126,11 @@ class CaptchaResponse(serializers.Serializer):
|
||||||
|
|
||||||
class CaptchaSerializer(serializers.Serializer):
|
class CaptchaSerializer(serializers.Serializer):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def generate():
|
def generate(username: str, type: str = 'system'):
|
||||||
chars = get_random_chars()
|
chars = get_random_chars()
|
||||||
image = ImageCaptcha()
|
image = ImageCaptcha()
|
||||||
data = image.generate(chars)
|
data = image.generate(chars)
|
||||||
captcha = base64.b64encode(data.getbuffer())
|
captcha = base64.b64encode(data.getbuffer())
|
||||||
cache.set(Cache_Version.CAPTCHA.get_key(captcha=chars.lower()), chars,
|
cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(),
|
||||||
timeout=60, version=Cache_Version.CAPTCHA.get_version())
|
timeout=300, version=Cache_Version.CAPTCHA.get_version())
|
||||||
return {'captcha': 'data:image/png;base64,' + captcha.decode()}
|
return {'captcha': 'data:image/png;base64,' + captcha.decode()}
|
||||||
|
|
|
||||||
|
|
@ -73,4 +73,5 @@ class CaptchaView(APIView):
|
||||||
tags=[_("User Management")], # type: ignore
|
tags=[_("User Management")], # type: ignore
|
||||||
responses=CaptchaAPI.get_response())
|
responses=CaptchaAPI.get_response())
|
||||||
def get(self, request: Request):
|
def get(self, request: Request):
|
||||||
return result.success(CaptchaSerializer().generate())
|
username = request.query_params.get('username', None)
|
||||||
|
return result.success(CaptchaSerializer().generate(username))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue