1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-20 19:08:21 +00:00

check if user num exceed limit before add user

This commit is contained in:
lian
2016-11-05 12:09:58 +08:00
parent d00cba5769
commit 34a350d623
11 changed files with 343 additions and 23 deletions

View File

@@ -1,6 +1,5 @@
# Copyright (c) 2012-2016 Seafile Ltd.
import logging
import os
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAdminUser
@@ -16,7 +15,6 @@ from seahub.utils.licenseparse import parse_license
from seahub.api2.authentication import TokenAuthentication
from seahub.api2.throttling import UserRateThrottle
import seahub.settings
try:
from seahub.settings import MULTI_TENANCY
except ImportError:
@@ -31,11 +29,6 @@ class SysInfo(APIView):
throttle_classes = (UserRateThrottle,)
permission_classes = (IsAdminUser,)
def _get_license_dict(self):
license_file = os.path.join(seahub.settings.PROJECT_ROOT, '../../seafile-license.txt')
license_dict = parse_license(license_file)
return license_dict
def get(self, request, format=None):
# count repos
try:
@@ -96,14 +89,14 @@ class SysInfo(APIView):
is_pro = is_pro_version()
if is_pro:
license_dict = self._get_license_dict()
license_dict = parse_license()
else:
license_dict = {}
if license_dict:
with_license = True
try:
max_users = int(license_dict.get('MaxUsers', ''))
max_users = int(license_dict.get('MaxUsers', 0))
except ValueError as e:
logger.error(e)
max_users = 0

View File

@@ -22,6 +22,7 @@ from seahub.role_permissions.utils import get_enabled_role_permissions_by_role
from seahub.utils import is_valid_username, is_user_password_strong, \
clear_token, get_system_admins
from seahub.utils.mail import send_html_email_with_dj_template, MAIL_PRIORITY
from seahub.utils.licenseparse import user_number_over_limit
try:
from seahub.settings import CLOUD_MODE
@@ -567,6 +568,9 @@ class RegistrationForm(forms.Form):
return False if prog.match(email) is None else True
def clean_email(self):
if user_number_over_limit():
raise forms.ValidationError(_("The number of users exceeds the limit."))
email = self.cleaned_data['email']
if not self.allow_register(email):
raise forms.ValidationError(_("Enter a valid email address."))

View File

@@ -4,11 +4,12 @@ from django.conf import settings
from django import forms
from django.utils.translation import ugettext_lazy as _
from seaserv import seafserv_threaded_rpc, is_valid_filename
from seaserv import is_valid_filename
from pysearpc import SearpcError
from seahub.base.accounts import User
from seahub.constants import DEFAULT_USER, GUEST_USER
from seahub.utils.licenseparse import user_number_over_limit
class AddUserForm(forms.Form):
"""
@@ -21,6 +22,9 @@ class AddUserForm(forms.Form):
password2 = forms.CharField(widget=forms.PasswordInput())
def clean_email(self):
if user_number_over_limit():
raise forms.ValidationError(_("The number of users exceeds the limit."))
email = self.cleaned_data['email']
try:
user = User.objects.get(email=email)

View File

@@ -1,12 +1,21 @@
# Copyright (c) 2012-2016 Seafile Ltd.
import os
import logging
from django.conf import settings
from seahub.utils import is_pro_version
from seaserv import ccnet_api
logger = logging.getLogger(__name__)
def parse_license(file_path):
def get_license_path():
return os.path.join(settings.PROJECT_ROOT, '../../seafile-license.txt')
def parse_license():
"""Parse license file and return dict.
Arguments:
- `file_path`:
- `license_path`:
Returns:
e.g.
@@ -23,8 +32,9 @@ def parse_license(file_path):
"""
ret = {}
lines = []
license_path = get_license_path()
try:
with open(file_path) as f:
with open(license_path) as f:
lines = f.readlines()
except Exception as e:
logger.warn(e)
@@ -36,3 +46,24 @@ def parse_license(file_path):
ret[k.strip()] = v.strip().strip('"')
return ret
def user_number_over_limit(new_users = 0):
if is_pro_version():
try:
# get license user limit
license_dict = parse_license()
max_users = int(license_dict.get('MaxUsers', 0))
# get active user number
active_db_users = ccnet_api.count_emailusers('DB')
active_ldap_users = ccnet_api.count_emailusers('LDAP')
active_users = active_db_users + active_ldap_users
return active_users + new_users >= max_users
except Exception as e:
logger = logging.getLogger(__name__)
logger.error(e)
return False
else:
return False

View File

@@ -43,7 +43,7 @@ from seahub.utils import IS_EMAIL_CONFIGURED, string2list, is_valid_username, \
get_virus_record, FILE_AUDIT_ENABLED, get_max_upload_file_size
from seahub.utils.file_size import get_file_size_unit
from seahub.utils.rpc import mute_seafile_api
from seahub.utils.licenseparse import parse_license
from seahub.utils.licenseparse import parse_license, user_number_over_limit
from seahub.utils.sysinfo import get_platform_name
from seahub.utils.mail import send_html_email_with_dj_template
from seahub.utils.ms_excel import write_xls
@@ -1782,6 +1782,8 @@ def batch_add_user(request):
if request.method != 'POST':
raise Http404
next = request.META.get('HTTP_REFERER', reverse(sys_user_admin))
form = BatchAddUserForm(request.POST, request.FILES)
if form.is_valid():
content = request.FILES['file'].read()
@@ -1790,8 +1792,16 @@ def batch_add_user(request):
content = content.decode(encoding, 'replace').encode('utf-8')
filestream = StringIO.StringIO(content)
reader = csv.reader(filestream)
reader = csv.reader(filestream)
new_users_count = len(list(reader))
if user_number_over_limit(new_users = new_users_count):
messages.error(request, _(u'The number of users exceeds the limit.'))
return HttpResponseRedirect(next)
# return to the top of the file
filestream.seek(0)
reader = csv.reader(filestream)
for row in reader:
if not row:
continue
@@ -1825,7 +1835,6 @@ def batch_add_user(request):
else:
messages.error(request, _(u'Please select a csv file first.'))
next = request.META.get('HTTP_REFERER', reverse(sys_user_admin))
return HttpResponseRedirect(next)
@login_required
@@ -1937,8 +1946,7 @@ def sys_check_license(request):
content_type = 'application/json; charset=utf-8'
result = {}
license_file = os.path.join(settings.PROJECT_ROOT, '../../seafile-license.txt')
license_dict = parse_license(license_file)
license_dict = parse_license()
if license_dict:
try:
expiration = license_dict['Expiration']

View File

@@ -28,13 +28,13 @@ class SysinfoTest(BaseTestCase):
assert json_resp['license_maxusers'] == 0
@patch('seahub.api2.endpoints.admin.sysinfo.is_pro_version')
@patch('seahub.api2.endpoints.admin.sysinfo.SysInfo._get_license_dict')
def test_get_sysinfo_in_pro_edition(self, mock_get_license_dict, mock_is_pro_version):
@patch('seahub.api2.endpoints.admin.sysinfo.parse_license')
def test_get_sysinfo_in_pro_edition(self, mock_parse_license, mock_is_pro_version):
test_user = 'Test user'
mock_is_pro_version.return_value = True
mock_get_license_dict.return_value = {
mock_parse_license.return_value = {
'Hash': '2981bd12cf0c83c81aaa453ce249ffdd2e492ed2220f3c89c57f06518de36c487c873be960577a0534f3de4ac2bb52d3918016aaa07d60dccbce92673bc23604f4d8ff547f88287c398f74f16e114a8a3b978cce66961fd0facd283da7b050b5fc6205934420e1b4a65daf1c6dcdb2dc78e38a3799eeb5533779595912f1723129037f093f925d8ab94478c8aded304c62d003c07a6e98e706fdf81b6f73c3a806f523bbff1a92f8eb8ea325e09b2b80acfc4b99dd0f5b339d5ed832da00bad3394b9d40a09cce6066b6dc2c9b2ec47338de41867f5c2380c96f7708a5e9cdf244fbdfa1cc174751b90e74e620f53778593b84ec3b15175c3e432c20dcb4cfde',
'Name': test_user,
'Mode': 'life-time',

View File

@@ -0,0 +1,79 @@
from mock import patch
from django.test import TestCase
from django.utils.html import escape
from tests.common.utils import randstring
from seahub.forms import AddUserForm
from seahub.constants import DEFAULT_USER
from tests.common.common import USERNAME
class TestAddUserForm(TestCase):
@patch('seahub.forms.user_number_over_limit')
def test_add_user_form_is_valid(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = False
user_info = {
'email':'%s@%s.com' % (randstring(10), randstring(10)) ,
'role': DEFAULT_USER,
'password1':'password',
'password2':'password',
}
f = AddUserForm(data = user_info)
self.assertTrue(f.is_valid())
@patch('seahub.forms.user_number_over_limit')
def test_add_user_form_email_invalid_for_exceed_limit(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = True
user_info = {
'email':'%s@%s.com' % (randstring(10), randstring(10)) ,
'role': DEFAULT_USER,
'password1':'password',
'password2':'password',
}
f = AddUserForm(data = user_info)
assert 'The number of users exceeds the limit.' in str(f['email'].errors)
@patch('seahub.forms.user_number_over_limit')
def test_add_user_form_email_invalid_for_user_exist(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = False
user_info = {
# invalid email
'email': USERNAME,
'role': DEFAULT_USER,
'password1':'password',
'password2':'password',
}
f = AddUserForm(data = user_info)
assert 'A user with this email already exists.' in str(f['email'].errors)
@patch('seahub.forms.user_number_over_limit')
def test_add_user_form_password_invalid(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = False
user_info = {
'email':'%s@%s.com' % (randstring(10), randstring(10)) ,
'role': DEFAULT_USER,
# invalid password
'password1':'password1',
'password2':'password2',
}
f = AddUserForm(data = user_info)
# to escape `'`
assert escape("The two passwords didn't match.") in str(f.errors)

View File

@@ -1,7 +1,12 @@
from mock import patch
from django.core.urlresolvers import reverse
from django.test import TestCase
from django.utils.html import escape
from tests.common.common import BASE_URL, USERNAME, PASSWORD
from tests.common.utils import randstring
from seahub.base.accounts import RegistrationForm
from tests.common.common import USERNAME
LOGIN_URL = reverse('auth_login')
class LoginTest(TestCase):
@@ -23,3 +28,107 @@ class LoginTest(TestCase):
assert resp.context['form'].errors['__all__'] == [
u'Please enter a correct email/username and password. Note that both fields are case-sensitive.'
]
class TestRegistrationForm(TestCase):
@patch('seahub.base.accounts.user_number_over_limit')
def test_registration_form_is_valid(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = False
user_info = {
'email':'%s@%s.com' % (randstring(10), randstring(10)) ,
'userid': randstring(40),
'password1':'password',
'password2':'password',
}
f = RegistrationForm(data = user_info)
self.assertTrue(f.is_valid())
@patch('seahub.base.accounts.user_number_over_limit')
def test_registration_form_email_invalid(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = False
user_info = {
# invalid email without `@`
'email':'%s%s.com' % (randstring(10), randstring(10)) ,
'userid': randstring(40),
'password1':'password',
'password2':'password',
}
f = RegistrationForm(data = user_info)
assert 'Enter a valid email address.' in str(f['email'].errors)
@patch('seahub.base.accounts.user_number_over_limit')
def test_registration_form_email_invalid_for_exceed_limit(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = True
user_info = {
'email':'%s@%s.com' % (randstring(10), randstring(10)) ,
'userid': randstring(40),
'password1':'password',
'password2':'password',
}
f = RegistrationForm(data = user_info)
assert 'The number of users exceeds the limit.' in str(f['email'].errors)
@patch('seahub.base.accounts.user_number_over_limit')
def test_registration_form_email_invalid_for_user_exist(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = False
user_info = {
# invalid email
'email': USERNAME,
'userid': randstring(40),
'password1':'password',
'password2':'password',
}
f = RegistrationForm(data = user_info)
assert 'User %s already exists.' % USERNAME in str(f['email'].errors)
@patch('seahub.base.accounts.user_number_over_limit')
def test_registration_form_userid_invalid(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = False
user_info = {
'email':'%s@%s.com' % (randstring(10), randstring(10)) ,
# invalid userid length < 40
'userid': randstring(10),
'password1':'password',
'password2':'password',
}
f = RegistrationForm(data = user_info)
assert 'Invalid user id.' in str(f['userid'].errors)
@patch('seahub.base.accounts.user_number_over_limit')
def test_registration_form_password_invalid(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = False
user_info = {
'email':'%s@%s.com' % (randstring(10), randstring(10)) ,
'userid': randstring(40),
# invalid password
'password1':'password1',
'password2':'password2',
}
f = RegistrationForm(data = user_info)
# to escape `'`
assert escape("The two password fields didn't match.") in str(f['password2'].errors)

View File

@@ -0,0 +1,11 @@
#Seafile server licence
Name = "Test"
Licencetype = "User"
LicenceKEY = "1474598078"
ProductID = "Seafile server"
Expiration = "2017-7-20"
MaxUsers = "10"
Mode = "subscription"
Hash = "hash value"

View File

@@ -0,0 +1,61 @@
import os
from mock import patch
from seahub.utils.licenseparse import user_number_over_limit, \
parse_license
@patch('seahub.utils.licenseparse.get_license_path')
def test_parse_license(mock_get_license_path):
license_file = os.path.join(os.getcwd(), 'tests/seahub/utils/seafile-license.txt')
mock_get_license_path.return_value = license_file
license_dict = parse_license()
assert license_dict['Hash'] == 'hash value'
@patch('seahub.utils.licenseparse.parse_license')
@patch('seahub.utils.licenseparse.is_pro_version')
def test_not_user_number_over_limit(mock_is_pro_version, mock_parse_license):
# max user is 1000
license_dict = {'Expiration': '2017-7-20',
'Hash': 'hash value',
'LicenceKEY': '1474598078',
'Licencetype': 'User',
'MaxUsers': '1000',
'Mode': 'subscription',
'Name': 'Test',
'ProductID': 'Seafile server'}
mock_is_pro_version.return_value = True
mock_parse_license.return_value = license_dict
assert not user_number_over_limit()
@patch('seahub.utils.licenseparse.parse_license')
@patch('seahub.utils.licenseparse.is_pro_version')
def test_user_number_over_limit(mock_is_pro_version, mock_parse_license):
# max user is 1
license_dict = {'Expiration': '2017-7-20',
'Hash': 'hash value',
'LicenceKEY': '1474598078',
'Licencetype': 'User',
'MaxUsers': '1',
'Mode': 'subscription',
'Name': 'Test',
'ProductID': 'Seafile server'}
mock_is_pro_version.return_value = True
mock_parse_license.return_value = license_dict
assert user_number_over_limit()
@patch('seahub.utils.licenseparse.is_pro_version')
def test_user_number_over_limit_if_not_pro(mock_is_pro_version):
mock_is_pro_version.return_value = False
assert not user_number_over_limit()

View File

@@ -191,6 +191,26 @@ class BatchAddUserTest(BaseTestCase):
for e in self.new_users:
assert User.objects.get(e) is not None
@patch('seahub.views.sysadmin.user_number_over_limit')
def test_can_not_batch_add_if_user_over_limit(self, mock_user_number_over_limit):
mock_user_number_over_limit.return_value = True
for e in self.new_users:
try:
r = User.objects.get(e)
except User.DoesNotExist:
r = None
assert r is None
with open(self.csv_file) as f:
resp = self.client.post(reverse('batch_add_user'), {
'file': f
})
self.assertEqual(302, resp.status_code)
assert 'users exceeds the limit' in parse_cookie(resp.cookies)['messages']
def test_can_send_email(self):
self.assertEqual(0, len(Email.objects.all()))