feat: enhance captcha generation with access token validation and configurable max attempts
parent
db0ed2662b
commit
0406fbcaf7
|
|
@ -191,7 +191,8 @@ class CaptchaView(APIView):
|
||||||
responses=CaptchaAPI.get_response())
|
responses=CaptchaAPI.get_response())
|
||||||
def get(self, request: Request):
|
def get(self, request: Request):
|
||||||
username = request.query_params.get('username', None)
|
username = request.query_params.get('username', None)
|
||||||
return result.success(CaptchaSerializer().generate(username, 'chat'))
|
accessToken = request.query_params.get('accessToken', None)
|
||||||
|
return result.success(CaptchaSerializer().chat_generate(username, 'chat', accessToken))
|
||||||
|
|
||||||
|
|
||||||
class SpeechToText(APIView):
|
class SpeechToText(APIView):
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ from django.db.models import QuerySet
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
from rest_framework import serializers
|
from rest_framework import serializers
|
||||||
|
|
||||||
|
from application.models import ApplicationAccessToken
|
||||||
from common.constants.authentication_type import AuthenticationType
|
from common.constants.authentication_type import AuthenticationType
|
||||||
from common.constants.cache_version import Cache_Version
|
from common.constants.cache_version import Cache_Version
|
||||||
from common.database_model_manage.database_model_manage import DatabaseModelManage
|
from common.database_model_manage.database_model_manage import DatabaseModelManage
|
||||||
|
|
@ -33,8 +34,7 @@ class LoginRequest(serializers.Serializer):
|
||||||
captcha = serializers.CharField(required=False, max_length=64, label=_('captcha'), allow_null=True,
|
captcha = serializers.CharField(required=False, max_length=64, label=_('captcha'), allow_null=True,
|
||||||
allow_blank=True)
|
allow_blank=True)
|
||||||
encryptedData = serializers.CharField(required=False, label=_('encryptedData'), allow_null=True,
|
encryptedData = serializers.CharField(required=False, label=_('encryptedData'), allow_null=True,
|
||||||
allow_blank=True)
|
allow_blank=True)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
system_version, system_get_key = Cache_Version.SYSTEM.value
|
system_version, system_get_key = Cache_Version.SYSTEM.value
|
||||||
|
|
@ -61,6 +61,20 @@ def record_login_fail(username: str, expire: int = 600):
|
||||||
|
|
||||||
class LoginSerializer(serializers.Serializer):
|
class LoginSerializer(serializers.Serializer):
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_auth_setting():
|
||||||
|
"""获取认证设置"""
|
||||||
|
auth_setting_model = DatabaseModelManage.get_model('auth_setting')
|
||||||
|
auth_setting = {}
|
||||||
|
if auth_setting_model:
|
||||||
|
setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first()
|
||||||
|
if setting_obj:
|
||||||
|
try:
|
||||||
|
auth_setting = json.loads(setting_obj.param_value) or {}
|
||||||
|
except Exception:
|
||||||
|
auth_setting = {}
|
||||||
|
return auth_setting
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def login(instance):
|
def login(instance):
|
||||||
username = instance.get("username", "")
|
username = instance.get("username", "")
|
||||||
|
|
@ -73,16 +87,7 @@ class LoginSerializer(serializers.Serializer):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
record_login_fail(username)
|
record_login_fail(username)
|
||||||
raise e
|
raise e
|
||||||
auth_setting_model = DatabaseModelManage.get_model('auth_setting')
|
auth_setting = LoginSerializer.get_auth_setting()
|
||||||
# 默认配置
|
|
||||||
auth_setting = {}
|
|
||||||
if auth_setting_model:
|
|
||||||
setting_obj = auth_setting_model.objects.filter(param_key='auth_setting').first()
|
|
||||||
if setting_obj:
|
|
||||||
try:
|
|
||||||
auth_setting = json.loads(setting_obj.param_value) or {}
|
|
||||||
except Exception:
|
|
||||||
auth_setting = {}
|
|
||||||
|
|
||||||
max_attempts = auth_setting.get("max_attempts", 1)
|
max_attempts = auth_setting.get("max_attempts", 1)
|
||||||
password = instance.get("password")
|
password = instance.get("password")
|
||||||
|
|
@ -135,10 +140,53 @@ class CaptchaResponse(serializers.Serializer):
|
||||||
class CaptchaSerializer(serializers.Serializer):
|
class CaptchaSerializer(serializers.Serializer):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def generate(username: str, type: str = 'system'):
|
def generate(username: str, type: str = 'system'):
|
||||||
chars = get_random_chars()
|
auth_setting = LoginSerializer.get_auth_setting()
|
||||||
image = ImageCaptcha()
|
max_attempts = auth_setting.get("max_attempts", 1)
|
||||||
data = image.generate(chars)
|
need_captcha = False
|
||||||
captcha = base64.b64encode(data.getbuffer())
|
if max_attempts == -1:
|
||||||
cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(),
|
need_captcha = False
|
||||||
timeout=300, version=Cache_Version.CAPTCHA.get_version())
|
elif max_attempts > 0:
|
||||||
return {'captcha': 'data:image/png;base64,' + captcha.decode()}
|
fail_count = cache.get(system_get_key(f'{type}_{username}'), version=system_version) or 0
|
||||||
|
need_captcha = fail_count >= max_attempts
|
||||||
|
|
||||||
|
if need_captcha:
|
||||||
|
chars = get_random_chars()
|
||||||
|
image = ImageCaptcha()
|
||||||
|
data = image.generate(chars)
|
||||||
|
captcha = base64.b64encode(data.getbuffer())
|
||||||
|
cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(),
|
||||||
|
timeout=300, version=Cache_Version.CAPTCHA.get_version())
|
||||||
|
return {'captcha': 'data:image/png;base64,' + captcha.decode()}
|
||||||
|
return {'captcha': ''}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def chat_generate(username: str, type: str = 'chat', access_token: str = ''):
|
||||||
|
auth_setting = {}
|
||||||
|
application_access_token = ApplicationAccessToken.objects.filter(
|
||||||
|
access_token=access_token
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if not application_access_token:
|
||||||
|
raise AppApiException(1005, _('Invalid access token'))
|
||||||
|
if application_access_token:
|
||||||
|
auth_setting = application_access_token.authentication_value
|
||||||
|
max_attempts = auth_setting.get("max_attempts", 1)
|
||||||
|
need_captcha = False
|
||||||
|
if max_attempts == -1:
|
||||||
|
need_captcha = False
|
||||||
|
elif max_attempts > 0:
|
||||||
|
fail_count = cache.get(system_get_key(f'{type}_{username}'), version=system_version) or 0
|
||||||
|
need_captcha = fail_count >= max_attempts
|
||||||
|
|
||||||
|
if need_captcha:
|
||||||
|
chars = get_random_chars()
|
||||||
|
image = ImageCaptcha()
|
||||||
|
data = image.generate(chars)
|
||||||
|
captcha = base64.b64encode(data.getbuffer())
|
||||||
|
cache.set(Cache_Version.CAPTCHA.get_key(captcha=f'{type}_{username}'), chars.lower(),
|
||||||
|
timeout=300, version=Cache_Version.CAPTCHA.get_version())
|
||||||
|
return {'captcha': 'data:image/png;base64,' + captcha.decode()}
|
||||||
|
return {'captcha': ''}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue