jumpserver/apps/authentication/api/mfa.py
fit2bot ec8dca90d6
refactor: 整合系统用户和管理用户 (#6236)
* perf: 整合系统用户和管理用户

* stash

stash

perf: 优化系统用户和资产的表结构

* perf: 添加信号

* perf: 添加算法

* perf: 去掉 asset user backends

* perf: 整理系统用户api

* perfF: 暂存一下

* stash

* perf: 暂存一下

* perf: 暂存

* xxx

* perf: ...

* stash it

* xxx

* xxx

* xxx

* xxx

* xxx

* stash it

* 修改Protocols

* perf: 修改创建authbook信号

* perf: 添加auth info

* .stash

* perf: 基本完成

* perf: 修复完成

* perf: 修复更改的id

* perf: 修复迁移过去数量不对的问题

* perf: 修改systemuser

* fix: 修复批量编辑近期的问题

* fix: 修复authbook加载的问题

* xxx

Co-authored-by: ibuler <ibuler@qq.com>
2021-07-08 14:23:18 +08:00

70 lines
2.4 KiB
Python

# -*- coding: utf-8 -*-
#
import time
from django.utils.translation import ugettext as _
from django.conf import settings
from rest_framework.permissions import AllowAny
from rest_framework.generics import CreateAPIView
from rest_framework.serializers import ValidationError
from rest_framework.response import Response
from common.permissions import IsValidUser, NeedMFAVerify
from ..serializers import OtpVerifySerializer
from .. import serializers
from .. import errors
from ..mixins import AuthMixin
__all__ = ['MFAChallengeApi', 'UserOtpVerifyApi']
class MFAChallengeApi(AuthMixin, CreateAPIView):
permission_classes = (AllowAny,)
serializer_class = serializers.MFAChallengeSerializer
def perform_create(self, serializer):
try:
user = self.get_user_from_session()
code = serializer.validated_data.get('code')
valid = user.check_mfa(code)
if not valid:
self.request.session['auth_mfa'] = ''
raise errors.MFAFailedError(
username=user.username, request=self.request, ip=self.get_request_ip()
)
else:
self.request.session['auth_mfa'] = '1'
except errors.AuthFailedError as e:
data = {"error": e.error, "msg": e.msg}
raise ValidationError(data)
except errors.NeedMoreInfoError as e:
return Response(e.as_data(), status=200)
def create(self, request, *args, **kwargs):
super().create(request, *args, **kwargs)
return Response({'msg': 'ok'})
class UserOtpVerifyApi(CreateAPIView):
permission_classes = (IsValidUser,)
serializer_class = OtpVerifySerializer
def get(self, request, *args, **kwargs):
return Response({'code': 'valid', 'msg': 'verified'})
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
code = serializer.validated_data["code"]
if request.user.check_mfa(code):
request.session["MFA_VERIFY_TIME"] = int(time.time())
return Response({"ok": "1"})
else:
return Response({"error": _("Code is invalid")}, status=400)
def get_permissions(self):
if self.request.method.lower() == 'get' and settings.SECURITY_VIEW_AUTH_NEED_MFA:
self.permission_classes = [NeedMFAVerify]
return super().get_permissions()