mirror of
https://github.com/haiwen/seahub.git
synced 2025-09-08 18:30:53 +00:00
[sys] Add force 2FA option
This commit is contained in:
@@ -9,6 +9,7 @@ from seahub.api2.base import APIView
|
||||
from seahub.api2.throttling import UserRateThrottle
|
||||
from seahub.api2.utils import json_response, api_error
|
||||
from seahub.api2.authentication import TokenAuthentication
|
||||
from seahub.options.models import UserOptions
|
||||
from seahub.two_factor.models import devices_for_user
|
||||
|
||||
|
||||
@@ -17,6 +18,23 @@ class TwoFactorAuthView(APIView):
|
||||
throttle_classes = (UserRateThrottle,)
|
||||
permission_classes = (IsAdminUser,)
|
||||
|
||||
def put(self, request, email):
|
||||
"""Set/unset force 2FA for the user `email`.
|
||||
"""
|
||||
try:
|
||||
user = User.objects.get(email=email)
|
||||
except User.DoesNotExist:
|
||||
error_msg = "User %s not found" % email
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
force_2fa = request.data.get('force_2fa', None)
|
||||
if force_2fa == '1':
|
||||
UserOptions.objects.set_force_2fa(email)
|
||||
if force_2fa == '0':
|
||||
UserOptions.objects.unset_force_2fa(email)
|
||||
|
||||
return Response({'success': True}, status=status.HTTP_200_OK)
|
||||
|
||||
def delete(self, request, email):
|
||||
if not email:
|
||||
error_msg = "email can not be empty"
|
||||
|
@@ -21,6 +21,9 @@ VAL_SUB_LIB_DISABLED = "0"
|
||||
KEY_FORCE_PASSWD_CHANGE = "force_passwd_change"
|
||||
VAL_FORCE_PASSWD_CHANGE = "1"
|
||||
|
||||
KEY_FORCE_2FA = "force_2fa"
|
||||
VAL_FORCE_2FA = "1"
|
||||
|
||||
KEY_USER_LOGGED_IN = "user_logged_in"
|
||||
VAL_USER_LOGGED_IN = "1"
|
||||
|
||||
@@ -203,6 +206,17 @@ class UserOptionsManager(models.Manager):
|
||||
def unset_force_passwd_change(self, username):
|
||||
return self.unset_user_option(username, KEY_FORCE_PASSWD_CHANGE)
|
||||
|
||||
def set_force_2fa(self, username):
|
||||
return self.set_user_option(username, KEY_FORCE_2FA, VAL_FORCE_2FA)
|
||||
|
||||
def unset_force_2fa(self, username):
|
||||
return self.unset_user_option(username, KEY_FORCE_2FA)
|
||||
|
||||
def is_force_2fa(self, username):
|
||||
r = super(UserOptionsManager, self).filter(email=username,
|
||||
option_key=KEY_FORCE_2FA)
|
||||
return True if len(r) > 0 else False
|
||||
|
||||
def set_user_logged_in(self, username):
|
||||
return self.set_user_option(username, KEY_USER_LOGGED_IN,
|
||||
VAL_USER_LOGGED_IN)
|
||||
|
@@ -123,6 +123,7 @@ MIDDLEWARE_CLASSES = (
|
||||
'seahub.base.middleware.UserPermissionMiddleware',
|
||||
'termsandconditions.middleware.TermsAndConditionsRedirectMiddleware',
|
||||
'seahub.two_factor.middleware.OTPMiddleware',
|
||||
'seahub.two_factor.middleware.ForceTwoFactorAuthMiddleware',
|
||||
'seahub.trusted_ip.middleware.LimitIpMiddleware',
|
||||
)
|
||||
|
||||
|
@@ -1,9 +1,15 @@
|
||||
# Copyright (c) 2012-2016 Seafile Ltd.
|
||||
from __future__ import absolute_import, division, print_function, unicode_literals
|
||||
import re
|
||||
|
||||
from constance import config
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.http import HttpResponseRedirect
|
||||
|
||||
from . import DEVICE_ID_SESSION_KEY
|
||||
from .models import Device
|
||||
from seahub.options.models import UserOptions
|
||||
from seahub.settings import SITE_ROOT
|
||||
|
||||
|
||||
class IsVerified(object):
|
||||
@@ -53,3 +59,36 @@ class OTPMiddleware(object):
|
||||
user.otp_device = device
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class ForceTwoFactorAuthMiddleware(object):
|
||||
def filter_request(self, request):
|
||||
path = request.path
|
||||
black_list = (r'^%s$' % SITE_ROOT, r'sys/.+', r'repo/.+', r'lib/', )
|
||||
|
||||
for patt in black_list:
|
||||
if re.search(patt, path) is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
def process_request(self, request):
|
||||
if not config.ENABLE_TWO_FACTOR_AUTH:
|
||||
return None
|
||||
|
||||
user = getattr(request, 'user', None)
|
||||
if user is None:
|
||||
return None
|
||||
|
||||
if user.is_anonymous():
|
||||
return None
|
||||
|
||||
if not self.filter_request(request):
|
||||
return None
|
||||
|
||||
if not UserOptions.objects.is_force_2fa(user.username):
|
||||
return None
|
||||
|
||||
if user.otp_device is not None:
|
||||
return None
|
||||
|
||||
return HttpResponseRedirect(reverse('two_factor:setup'))
|
||||
|
@@ -2,6 +2,7 @@ import os
|
||||
import pytest
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
from seahub.options.models import (UserOptions, KEY_FORCE_2FA, VAL_FORCE_2FA)
|
||||
from seahub.test_utils import BaseTestCase
|
||||
from seahub.two_factor.models import TOTPDevice, devices_for_user
|
||||
|
||||
@@ -37,3 +38,27 @@ class TwoFactorAuthViewTest(BaseTestCase):
|
||||
device.delete()
|
||||
except:
|
||||
pass
|
||||
|
||||
def test_force_2fa(self):
|
||||
assert len(UserOptions.objects.filter(email=self.user.email,
|
||||
option_key=KEY_FORCE_2FA)) == 0
|
||||
|
||||
resp = self.client.put(
|
||||
reverse('two-factor-auth-view', args=[self.user.username]),
|
||||
'force_2fa=1',
|
||||
'application/x-www-form-urlencoded',
|
||||
)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
assert len(UserOptions.objects.filter(email=self.user.email,
|
||||
option_key=KEY_FORCE_2FA)) == 1
|
||||
|
||||
resp = self.client.put(
|
||||
reverse('two-factor-auth-view', args=[self.user.username]),
|
||||
'force_2fa=0',
|
||||
'application/x-www-form-urlencoded',
|
||||
)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
assert len(UserOptions.objects.filter(email=self.user.email,
|
||||
option_key=KEY_FORCE_2FA)) == 0
|
||||
|
@@ -1,7 +1,8 @@
|
||||
from seahub.test_utils import BaseTestCase
|
||||
from seahub.options.models import (UserOptions, KEY_USER_GUIDE,
|
||||
VAL_USER_GUIDE_ON, VAL_USER_GUIDE_OFF,
|
||||
KEY_DEFAULT_REPO)
|
||||
KEY_DEFAULT_REPO,
|
||||
KEY_FORCE_2FA, VAL_FORCE_2FA)
|
||||
|
||||
class UserOptionsManagerTest(BaseTestCase):
|
||||
def test_is_user_guide_enabled(self):
|
||||
@@ -51,3 +52,20 @@ class UserOptionsManagerTest(BaseTestCase):
|
||||
assert len(UserOptions.objects.filter(email=self.user.email, option_key=KEY_DEFAULT_REPO)) == 2
|
||||
assert UserOptions.objects.get_default_repo(self.user.email) is not None
|
||||
assert len(UserOptions.objects.filter(email=self.user.email, option_key=KEY_DEFAULT_REPO)) == 1
|
||||
|
||||
def test_force_2fa(self):
|
||||
assert len(UserOptions.objects.filter(email=self.user.email,
|
||||
option_key=KEY_FORCE_2FA)) == 0
|
||||
assert UserOptions.objects.is_force_2fa(self.user.email) is False
|
||||
|
||||
UserOptions.objects.set_force_2fa(self.user.email)
|
||||
|
||||
assert len(UserOptions.objects.filter(email=self.user.email,
|
||||
option_key=KEY_FORCE_2FA)) == 1
|
||||
assert UserOptions.objects.is_force_2fa(self.user.email) is True
|
||||
|
||||
UserOptions.objects.unset_force_2fa(self.user.email)
|
||||
|
||||
assert len(UserOptions.objects.filter(email=self.user.email,
|
||||
option_key=KEY_FORCE_2FA)) == 0
|
||||
assert UserOptions.objects.is_force_2fa(self.user.email) is False
|
||||
|
Reference in New Issue
Block a user