1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-05 00:43:53 +00:00

update admin org user api

This commit is contained in:
lian
2017-07-12 18:09:03 +08:00
parent f8235ec7b2
commit f62a206ed8
2 changed files with 218 additions and 105 deletions

View File

@@ -8,18 +8,22 @@ from rest_framework.views import APIView
from rest_framework import status
from constance import config
from seaserv import ccnet_api
import seaserv
from seaserv import ccnet_api, seafile_api
from seahub.utils import clear_token, is_valid_email
from seahub.utils.licenseparse import user_number_over_limit
from seahub.utils.file_size import get_file_size_unit
from seahub.base.accounts import User
from seahub.base.templatetags.seahub_tags import email2nickname
from seahub.base.templatetags.seahub_tags import email2nickname, \
email2contact_email
from seahub.profile.models import Profile
from seahub.options.models import UserOptions
from seahub.api2.authentication import TokenAuthentication
from seahub.api2.throttling import UserRateThrottle
from seahub.api2.utils import api_error
from seahub.api2.permissions import IsProVersion
from seahub.api2.endpoints.utils import is_org_user
try:
from seahub.settings import ORG_MEMBER_QUOTA_ENABLED
@@ -28,17 +32,63 @@ except ImportError:
logger = logging.getLogger(__name__)
def get_org_user_info(org_id, user_obj):
def get_org_user_info(org_id, email):
user_info = {}
email = user_obj.username
user_obj = User.objects.get(email=email)
user_info['org_id'] = org_id
user_info['active'] = user_obj.is_active
user_info['email'] = email
user_info['name'] = email2nickname(email)
user_info['contact_email'] = Profile.objects.get_contact_email_by_user(email)
user_info['contact_email'] = email2contact_email(email)
org_user_quota = seafile_api.get_org_user_quota(org_id, email)
user_info['quota_total'] = org_user_quota / get_file_size_unit('MB')
org_user_quota_usage = seafile_api.get_org_user_quota_usage(org_id, email)
user_info['quota_usage'] = org_user_quota_usage / get_file_size_unit('MB')
return user_info
def check_org_user(func):
"""
Decorator for check if org user valid
"""
def _decorated(view, request, org_id, email):
# argument check
org_id = int(org_id)
if org_id == 0:
error_msg = 'org_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
try:
org = ccnet_api.get_org_by_id(org_id)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
# resource check
if not org:
error_msg = 'Organization %d not found.' % org_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
User.objects.get(email=email)
except User.DoesNotExist:
error_msg = 'User %s not found.' % email
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
if not is_org_user(email, org_id):
error_msg = 'User %s is not member of organization %s.' \
% (email, org.org_name)
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
return func(view, request, org_id, email)
return _decorated
class AdminOrgUsers(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
@@ -98,8 +148,7 @@ class AdminOrgUsers(APIView):
# create user
try:
user = User.objects.create_user(email,
password, is_staff=False, is_active=True)
User.objects.create_user(email, password, is_staff=False, is_active=True)
except User.DoesNotExist as e:
logger.error(e)
error_msg = 'Fail to add user %s.' % email
@@ -121,7 +170,7 @@ class AdminOrgUsers(APIView):
if config.FORCE_PASSWORD_CHANGE:
UserOptions.objects.set_force_passwd_change(email)
user_info = get_org_user_info(org_id, user)
user_info = get_org_user_info(org_id, email)
return Response(user_info)
@@ -131,53 +180,48 @@ class AdminOrgUser(APIView):
throttle_classes = (UserRateThrottle,)
permission_classes = (IsAdminUser, IsProVersion)
def put(self, request, org_id, email):
""" update active of a org user
@check_org_user
def get(self, request, org_id, email):
""" get base info of a org user
Permission checking:
1. only admin can perform this action.
"""
# argument check
org_id = int(org_id)
if org_id == 0:
error_msg = 'org_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
user_info = get_org_user_info(org_id, email)
return Response(user_info)
try:
org = ccnet_api.get_org_by_id(org_id)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
@check_org_user
def put(self, request, org_id, email):
""" update base info of a org user
if not org:
error_msg = 'Organization %d not found.' % org_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
error_msg = 'User %s not found.' % email
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
Permission checking:
1. only admin can perform this action.
"""
# update active
active = request.data.get('active', None)
if not active:
error_msg = 'active invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
if active.lower() not in ('true', 'false'):
if active:
active = active.lower()
if active not in ('true', 'false'):
error_msg = "active invalid, should be 'true' or 'false'."
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
active_user = True if active.lower() == 'true' else False
try:
if active_user:
user = User.objects.get(email=email)
if active == 'true':
user.is_active = True
else:
user.is_active = False
# clear web api and repo sync token
# when inactive an user
try:
clear_token(email)
except Exception as e:
logger.error(e)
try:
# update user status
result_code = user.save()
except Exception as e:
@@ -186,51 +230,73 @@ class AdminOrgUser(APIView):
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
if result_code == -1:
error_msg = 'Fail to add user %s.' % email
error_msg = 'Fail to update user %s.' % email
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
# clear web api and repo sync token
# when inactive an user
# update name
name = request.data.get('name', None)
if name:
profile = Profile.objects.get_profile_by_user(email)
if profile is None:
profile = Profile(user=email)
profile.nickname = name
profile.save()
# update contact_email
contact_email = request.data.get('contact_email', None)
if contact_email:
profile = Profile.objects.get_profile_by_user(email)
if profile is None:
profile = Profile(user=email)
profile.contact_email = contact_email
profile.save()
# update user quota
user_quota_mb = request.data.get("quota_total", None)
if user_quota_mb:
try:
if not active_user:
clear_token(email)
user_quota_mb = int(user_quota_mb)
except Exception as e:
logger.error(e)
error_msg = "quota_total invalid."
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
user_info = get_org_user_info(org_id, user)
user_quota = int(user_quota_mb) * get_file_size_unit('MB')
# TODO
# seafile_api.get_org_quota(org_id)
org_quota = seaserv.seafserv_threaded_rpc.get_org_quota(org_id)
# -1 means org has unlimited quota
if org_quota > 0:
org_quota_mb = org_quota / get_file_size_unit('MB')
if user_quota_mb > org_quota_mb:
error_msg = 'Failed to set quota: maximum quota is %d MB' % org_quota_mb
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
# TODO
# seafile_api.set_org_user_quota(org_id, email, user_quota)
seaserv.seafserv_threaded_rpc.set_org_user_quota(org_id, email, user_quota)
user_info = get_org_user_info(org_id, email)
return Response(user_info)
@check_org_user
def delete(self, request, org_id, email):
""" Delete an user from org
Permission checking:
1. only admin can perform this action.
"""
# argument check
org_id = int(org_id)
if org_id == 0:
error_msg = 'org_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
org = ccnet_api.get_org_by_id(org_id)
if not org:
error_msg = 'Organization %d not found.' % org_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
error_msg = 'User %s not found.' % email
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
if org.creator == email:
error_msg = 'Failed to delete: %s is an organization creator.' % email
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
try:
ccnet_api.remove_org_user(org_id, email)
user.delete()
User.objects.get(email=email).delete()
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'

View File

@@ -141,6 +141,13 @@ class OrgUserTest(BaseTestCase):
self.org_users_url = reverse('api-v2.1-admin-org-users',
args=[self.org_id])
email = '%s@%s.com' % (randstring(6), randstring(6))
self.create_user(email=email)
ccnet_api.add_org_user(self.org_id, email, 0)
assert ccnet_api.org_user_exists(self.org_id, email) == 1
self.org_user = email
def tearDown(self):
self.remove_group()
self.remove_repo()
@@ -154,30 +161,20 @@ class OrgUserTest(BaseTestCase):
if not LOCAL_PRO_DEV_ENV:
return
email = '%s@%s.com' % (randstring(6), randstring(6))
self.create_user(email=email)
ccnet_api.add_org_user(self.org_id, email, 0)
assert ccnet_api.org_user_exists(self.org_id, email) == 1
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, email])
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, self.org_user])
resp = self.client.delete(url)
self.assertEqual(200, resp.status_code)
assert ccnet_api.org_user_exists(self.org_id, email) == 0
assert ccnet_api.org_user_exists(self.org_id, self.org_user) == 0
def test_can_not_delete_if_not_admin(self):
if not LOCAL_PRO_DEV_ENV:
return
email = '%s@%s.com' % (randstring(6), randstring(6))
self.create_user(email=email)
ccnet_api.add_org_user(self.org_id, email, 0)
assert ccnet_api.org_user_exists(self.org_id, email) == 1
self.login_as(self.user)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, email])
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, self.org_user])
resp = self.client.delete(url)
self.assertEqual(403, resp.status_code)
@@ -208,41 +205,91 @@ class OrgUserTest(BaseTestCase):
self.assertEqual(404, resp.status_code)
def test_can_update(self):
def test_update_active(self):
if not LOCAL_PRO_DEV_ENV:
return
email = '%s@%s.com' % (randstring(6), randstring(6))
tmp_user = self.create_user(email=email)
ccnet_api.add_org_user(self.org_id, email, 0)
assert ccnet_api.org_user_exists(self.org_id, email) == 1
assert tmp_user.is_active
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, email])
status = 'false'
data = 'active=%s' % status
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, self.org_user])
# test inactive user
data = 'active=false'
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['active'] is False
# test active user
data = 'active=true'
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['active'] is True
self.remove_user(self.org_user)
def test_update_name(self):
if not LOCAL_PRO_DEV_ENV:
return
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, self.org_user])
name = randstring(6)
data = 'name=%s' % name
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['name'] == name
self.remove_user(self.org_user)
def test_update_contact_email(self):
if not LOCAL_PRO_DEV_ENV:
return
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, self.org_user])
contact_email = '%s@%s.com' % (randstring(6), randstring(6))
data = 'contact_email=%s' % contact_email
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['contact_email'] == contact_email
self.remove_user(self.org_user)
def test_update_quota_total(self):
if not LOCAL_PRO_DEV_ENV:
return
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, self.org_user])
quota_total = 1234
data = 'quota_total=%s' % quota_total
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['quota_total'] == int(quota_total)
self.remove_user(self.org_user)
def test_update_with_invalid_args(self):
if not LOCAL_PRO_DEV_ENV:
return
email = '%s@%s.com' % (randstring(6), randstring(6))
tmp_user = self.create_user(email=email)
ccnet_api.add_org_user(self.org_id, email, 0)
assert ccnet_api.org_user_exists(self.org_id, email) == 1
assert tmp_user.is_active
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, email])
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, self.org_user])
status = 'fals'
data = 'active=%s' % status
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
self.assertEqual(400, resp.status_code)
self.remove_user(self.org_user)