jumpserver/apps/authentication/api/mfa.py
fit2bot e259d2a9e9
fix: fix rbac to dev (#7636)
* feat: 添加 RBAC 应用模块

* feat: 添加 RBAC Model、API

* feat: 添加 RBAC Model、API 2

* feat: 添加 RBAC Model、API 3

* feat: 添加 RBAC Model、API 4

* feat: RBAC

* feat: RBAC

* feat: RBAC

* feat: RBAC

* feat: RBAC

* feat: RBAC 整理权限位

* feat: RBAC 整理权限位2

* feat: RBAC 整理权限位2

* feat: RBAC 整理权限位

* feat: RBAC 添加默认角色

* feat: RBAC 添加迁移文件;迁移用户角色->用户角色绑定

* feat: RBAC 添加迁移文件;迁移用户角色->用户角色绑定

* feat: RBAC 修改用户模块API

* feat: RBAC 添加组织模块迁移文件 & 修改组织模块API

* feat: RBAC 添加组织模块迁移文件 & 修改组织模块API

* feat: RBAC 修改用户角色属性的使用

* feat: RBAC No.1

* xxx

* perf: 暂存

* perf: ...

* perf(rbac): 添加 perms 到 profile serializer 中

* stash

* perf: 使用init

* perf: 修改migrations

* perf: rbac

* stash

* stash

* pref: 修改rbac

* stash it

* stash: 先去修复其他bug

* perf: 修改 role 添加 users

* pref: 修改 RBAC Model

* feat: 添加权限的 tree api

* stash: 暂存一下

* stash: 暂存一下

* perf: 修改 model verbose name

* feat: 添加model各种 verbose name

* perf: 生成 migrations

* perf: 优化权限位

* perf: 添加迁移脚本

* feat: 添加组织角色迁移

* perf: 添加迁移脚本

* stash

* perf: 添加migrateion

* perf: 暂存一下

* perf: 修改rbac

* perf: stash it

* fix: 迁移冲突

* fix: 迁移冲突

* perf: 暂存一下

* perf: 修改 rbac 逻辑

* stash: 暂存一下

* perf: 修改内置角色

* perf: 解决 root 组织的问题

* perf: stash it

* perf: 优化 rbac

* perf: 优化 rolebinding 处理

* perf: 完成用户离开组织的问题

* perf: 暂存一下

* perf: 修改翻译

* perf: 去掉了 IsSuperUser

* perf: IsAppUser 去掉完成

* perf: 修改 connection token 的权限

* perf: 去掉导入的问题

* perf: perms define 格式,修改 app 用户 的全新啊

* perf: 修改 permission

* perf: 去掉一些 org admin

* perf: 去掉部分 org admin

* perf: 再去掉点 org admin role

* perf: 再去掉部分 org admin

* perf: user 角色搜索

* perf: 去掉很多 js

* perf: 添加权限位

* perf: 修改权限

* perf: 去掉一个 todo

* merge: with dev

* fix: 修复冲突

Co-authored-by: Bai <bugatti_it@163.com>
Co-authored-by: Michael Bai <baijiangjie@gmail.com>
Co-authored-by: ibuler <ibuler@qq.com>
2022-02-17 20:13:31 +08:00

118 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
#
import time
from django.utils.translation import ugettext as _
from django.conf import settings
from django.shortcuts import get_object_or_404
from rest_framework.permissions import AllowAny
from rest_framework.generics import CreateAPIView
from rest_framework.serializers import ValidationError
from rest_framework.response import Response
from common.permissions import IsValidUser, NeedMFAVerify
from common.utils import get_logger
from common.exceptions import UnexpectError
from users.models.user import User
from ..serializers import OtpVerifySerializer
from .. import serializers
from .. import errors
from ..mfa.otp import MFAOtp
from ..mixins import AuthMixin
logger = get_logger(__name__)
__all__ = [
'MFAChallengeVerifyApi', 'UserOtpVerifyApi',
'MFASendCodeApi'
]
# MFASelectAPi 原来的名字
class MFASendCodeApi(AuthMixin, CreateAPIView):
"""
选择 MFA 后对应操作 apikoko 目前在用
"""
permission_classes = (AllowAny,)
serializer_class = serializers.MFASelectTypeSerializer
username = ''
ip = ''
def get_user_from_db(self, username):
"""避免暴力测试用户名"""
ip = self.get_request_ip()
self.check_mfa_is_block(username, ip)
try:
user = get_object_or_404(User, username=username)
return user
except Exception as e:
self.incr_mfa_failed_time(username, ip)
raise e
def perform_create(self, serializer):
username = serializer.validated_data.get('username', '')
mfa_type = serializer.validated_data['type']
if not username:
user = self.get_user_from_session()
else:
user = self.get_user_from_db(username)
mfa_backend = user.get_active_mfa_backend_by_type(mfa_type)
if not mfa_backend or not mfa_backend.challenge_required:
error = _('Current user not support mfa type: {}').format(mfa_type)
raise ValidationError({'error': error})
try:
mfa_backend.send_challenge()
except Exception as e:
raise UnexpectError(str(e))
class MFAChallengeVerifyApi(AuthMixin, CreateAPIView):
permission_classes = (AllowAny,)
serializer_class = serializers.MFAChallengeSerializer
def perform_create(self, serializer):
user = self.get_user_from_session()
code = serializer.validated_data.get('code')
mfa_type = serializer.validated_data.get('type', '')
self._do_check_user_mfa(code, mfa_type, user)
def create(self, request, *args, **kwargs):
try:
super().create(request, *args, **kwargs)
return Response({'msg': 'ok'})
except errors.AuthFailedError as e:
data = {"error": e.error, "msg": e.msg}
raise ValidationError(data)
except errors.NeedMoreInfoError as e:
return Response(e.as_data(), status=200)
class UserOtpVerifyApi(CreateAPIView):
permission_classes = (IsValidUser,)
serializer_class = OtpVerifySerializer
def get(self, request, *args, **kwargs):
return Response({'code': 'valid', 'msg': 'verified'})
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
code = serializer.validated_data["code"]
otp = MFAOtp(request.user)
ok, error = otp.check_code(code)
if ok:
request.session["MFA_VERIFY_TIME"] = int(time.time())
return Response({"ok": "1"})
else:
return Response({"error": _("Code is invalid, {}").format(error)}, status=400)
def get_permissions(self):
if self.request.method.lower() == 'get' \
and settings.SECURITY_VIEW_AUTH_NEED_MFA:
self.permission_classes = [NeedMFAVerify]
return super().get_permissions()