diff --git a/seahub/api2/endpoints/admin/devices.py b/seahub/api2/endpoints/admin/devices.py index ba3e285b11..a053460082 100644 --- a/seahub/api2/endpoints/admin/devices.py +++ b/seahub/api2/endpoints/admin/devices.py @@ -11,7 +11,6 @@ from pysearpc import SearpcError from seahub.utils.devices import do_unlink_device from seahub.utils.timeutils import datetime_to_isoformat_timestr -from seahub.views import is_registered_user from seahub.api2.authentication import TokenAuthentication from seahub.api2.throttling import UserRateThrottle diff --git a/seahub/api2/endpoints/admin/users.py b/seahub/api2/endpoints/admin/users.py new file mode 100644 index 0000000000..d3d5c403db --- /dev/null +++ b/seahub/api2/endpoints/admin/users.py @@ -0,0 +1,364 @@ +# Copyright (c) 2012-2016 Seafile Ltd. +import logging + +from rest_framework import status +from rest_framework.authentication import SessionAuthentication +from rest_framework.permissions import IsAdminUser +from rest_framework.response import Response +from rest_framework.views import APIView +from django.core.urlresolvers import reverse + +from seaserv import seafile_api, ccnet_api + +from seahub.api2.authentication import TokenAuthentication +from seahub.api2.throttling import UserRateThrottle +from seahub.api2.utils import api_error, to_python_boolean +from seahub.api2.endpoints.utils import generate_links_header_for_paginator + +from seahub.base.accounts import User +from seahub.base.templatetags.seahub_tags import email2nickname, \ + email2contact_email +from seahub.profile.models import Profile, DetailedProfile +from seahub.utils import is_valid_username, is_org_context, \ + is_pro_version +from seahub.utils.timeutils import timestamp_to_isoformat_timestr +from seahub.utils.file_size import get_file_size_unit +from seahub.role_permissions.utils import get_available_roles + + +logger = logging.getLogger(__name__) +json_content_type = 'application/json; charset=utf-8' + +def update_user_info(request): + + email = request.data.get("email") + user = User.objects.get(email=email) + + # update basic user info + password = request.data.get("password") + if password: + user.set_password(password) + + is_staff = request.data.get("is_staff") + if is_staff: + is_staff = to_python_boolean(is_staff) + user.is_staff = is_staff + + is_active = request.data.get("is_active") + if is_active: + is_active = to_python_boolean(is_active) + user.is_active = is_active + + # update user + user.save() + + # update additional user info + if is_pro_version(): + role = request.data.get("role") + if role: + User.objects.update_role(email, role) + + name = request.data.get("name") + if name: + profile = Profile.objects.get_profile_by_user(email) + if profile is None: + profile = Profile(user=email) + profile.nickname = name + profile.save() + + department = request.data.get("department") + if department: + d_profile = DetailedProfile.objects.get_detailed_profile_by_user(email) + if d_profile is None: + d_profile = DetailedProfile(user=email) + + d_profile.department = department + d_profile.save() + + quota_total_mb = request.data.get("quota_total") + if quota_total_mb: + quota_total = int(quota_total_mb) * get_file_size_unit('MB') + if is_org_context(request): + org_id = request.user.org.org_id + seafile_api.set_org_user_quota(org_id, email, quota_total) + else: + seafile_api.set_user_quota(email, quota_total) + +def get_user_info(email): + + user = User.objects.get(email=email) + d_profile = DetailedProfile.objects.get_detailed_profile_by_user(email) + + info = {} + info['email'] = email + info['name'] = email2nickname(email) + info['contact_email'] = email2contact_email(email) + + info['is_staff'] = user.is_staff + info['is_active'] = user.is_active + info['create_time'] = user.ctime + + info['department'] = d_profile.department if d_profile else '' + + info['quota_total'] = seafile_api.get_user_quota(email) + info['quota_usage'] = seafile_api.get_user_self_usage(email) + + info['create_time'] = timestamp_to_isoformat_timestr(user.ctime) + + if is_pro_version(): + info['role'] = user.role + + return info + + +class AdminUsers(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAdminUser, ) + throttle_classes = (UserRateThrottle, ) + + def get(self, request): + + try: + page = int(request.GET.get('page', '1')) + per_page = int(request.GET.get('per_page', '25')) + except ValueError: + page = 1 + per_page = 25 + + start = (page - 1) * per_page + end = page * per_page + 1 + users = ccnet_api.get_emailusers('DB', start, end) + total_count = ccnet_api.count_emailusers('DB') + \ + ccnet_api.count_inactive_emailusers('DB') + + data = [] + for user in users: + user_info = get_user_info(user.email) + data.append(user_info) + + result = {'data': data, 'total_count': total_count} + resp = Response(result) + + ## generate `Links` header for paginator + base_url = reverse('api-v2.1-admin-users') + links_header = generate_links_header_for_paginator(base_url, + page, per_page, total_count) + resp['Links'] = links_header + + return resp + + def post(self, request): + + email = request.data.get('email', None) + if not email or not is_valid_username(email): + error_msg = 'email invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # basic user info check + is_staff = request.data.get("is_staff", None) + if is_staff: + try: + is_staff = to_python_boolean(is_staff) + except ValueError: + error_msg = 'is_staff invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + is_active = request.data.get("is_active", None) + if is_active: + try: + is_active = to_python_boolean(is_active) + except ValueError: + error_msg = 'is_active invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # additional user info check + role = request.data.get("role", None) + if role: + available_roles = get_available_roles() + if role.lower() not in available_roles: + error_msg = 'role must be in %s.' % str(available_roles) + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + name = request.data.get("name", None) + if name: + if len(name) > 64: + error_msg = 'Name is too long (maximum is 64 characters).' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if "/" in name: + error_msg = "Name should not include '/'." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + department = request.data.get("department", None) + if department: + if len(department) > 512: + error_msg = "Department is too long (maximum is 512 characters)." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + quota_total_mb = request.data.get("quota_total", None) + if quota_total_mb: + try: + quota_total_mb = int(quota_total_mb) + except ValueError: + error_msg = "Must be an integer that is greater than or equal to 0." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if quota_total_mb < 0: + error_msg = "Space quota is too low (minimum value is 0)." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if is_org_context(request): + org_id = request.user.org.org_id + org_quota_mb = seafile_api.get_org_quota(org_id) / \ + get_file_size_unit('MB') + + if quota_total_mb > org_quota_mb: + error_msg = 'Failed to set quota: maximum quota is %d MB' % org_quota_mb + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + try: + User.objects.get(email=email) + user_exist = True + except User.DoesNotExist: + user_exist = False + + if user_exist: + error_msg = "User %s already exists." % email + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + password = request.data.get('password', None) + if not password: + error_msg = 'password required.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # create user + try: + User.objects.create_user(email) + update_user_info(request) + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + user_info = get_user_info(email) + + return Response(user_info) + + +class AdminUser(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAdminUser, ) + throttle_classes = (UserRateThrottle, ) + + def get(self, request, email): + + try: + User.objects.get(email=email) + except User.DoesNotExist: + error_msg = 'User %s not found.' % email + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + user_info = get_user_info(email) + return Response(user_info) + + def put(self, request, email): + + # basic user info check + is_staff = request.data.get("is_staff", None) + if is_staff: + try: + is_staff = to_python_boolean(is_staff) + except ValueError: + error_msg = 'is_staff invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + is_active = request.data.get("is_active", None) + if is_active: + try: + is_active = to_python_boolean(is_active) + except ValueError: + error_msg = 'is_active invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # additional user info check + role = request.data.get("role", None) + if role: + available_roles = get_available_roles() + if role.lower() not in available_roles: + error_msg = 'role must be in %s.' % str(available_roles) + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + name = request.data.get("name", None) + if name: + if len(name) > 64: + error_msg = 'Name is too long (maximum is 64 characters).' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if "/" in name: + error_msg = "Name should not include '/'." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + department = request.data.get("department", None) + if department: + if len(department) > 512: + error_msg = "Department is too long (maximum is 512 characters)." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + quota_total_mb = request.data.get("quota_total", None) + if quota_total_mb: + try: + quota_total_mb = int(quota_total_mb) + except ValueError: + error_msg = "Must be an integer that is greater than or equal to 0." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if quota_total_mb < 0: + error_msg = "Space quota is too low (minimum value is 0)." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if is_org_context(request): + org_id = request.user.org.org_id + org_quota_mb = seafile_api.get_org_quota(org_id) / \ + get_file_size_unit('MB') + + if quota_total_mb > org_quota_mb: + error_msg = 'Failed to set quota: maximum quota is %d MB' % org_quota_mb + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # query user info + try: + User.objects.get(email=email) + except User.DoesNotExist: + error_msg = 'User %s not found.' % email + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + try: + update_user_info(request) + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + user_info = get_user_info(email) + + return Response(user_info) + + def delete(self, request, email): + + try: + User.objects.get(email=email) + except User.DoesNotExist: + error_msg = 'User %s not found.' % email + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # delete user + try: + User.objects.get(email=email).delete() + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + return Response({'success': True}) diff --git a/seahub/urls.py b/seahub/urls.py index c67eddf880..e21e8d418e 100644 --- a/seahub/urls.py +++ b/seahub/urls.py @@ -55,6 +55,7 @@ from seahub.api2.endpoints.admin.perm_audit import PermAudit from seahub.api2.endpoints.admin.sysinfo import SysInfo from seahub.api2.endpoints.admin.devices import AdminDevices from seahub.api2.endpoints.admin.device_errors import AdminDeviceErrors +from seahub.api2.endpoints.admin.users import AdminUsers, AdminUser from seahub.api2.endpoints.admin.libraries import AdminLibraries, AdminLibrary from seahub.api2.endpoints.admin.library_dirents import AdminLibraryDirents, AdminLibraryDirent from seahub.api2.endpoints.admin.system_library import AdminSystemLibrary, \ @@ -253,6 +254,10 @@ urlpatterns = patterns( ## admin::sysinfo url(r'^api/v2.1/admin/sysinfo/$', SysInfo.as_view(), name='api-v2.1-sysinfo'), + ## admin::users + url(r'^api/v2.1/admin/users/$', AdminUsers.as_view(), name='api-v2.1-admin-users'), + url(r'^api/v2.1/admin/users/(?P[^/]+)/$', AdminUser.as_view(), name='api-v2.1-admin-user'), + ## admin::devices url(r'^api/v2.1/admin/devices/$', AdminDevices.as_view(), name='api-v2.1-admin-devices'), url(r'^api/v2.1/admin/device-errors/$', AdminDeviceErrors.as_view(), name='api-v2.1-admin-device-errors'), diff --git a/tests/api/endpoints/admin/test_users.py b/tests/api/endpoints/admin/test_users.py new file mode 100644 index 0000000000..726ce812b7 --- /dev/null +++ b/tests/api/endpoints/admin/test_users.py @@ -0,0 +1,290 @@ +# -*- coding: utf-8 -*- +import json + +from seaserv import ccnet_api, seafile_api +from tests.common.utils import randstring +from django.core.urlresolvers import reverse +from seahub.constants import DEFAULT_USER, GUEST_USER +from seahub.test_utils import BaseTestCase +from seahub.base.templatetags.seahub_tags import email2nickname +from seahub.profile.models import DetailedProfile +from seahub.utils.file_size import get_file_size_unit + +try: + from seahub.settings import LOCAL_PRO_DEV_ENV +except ImportError: + LOCAL_PRO_DEV_ENV = False + +class AdminUsersTest(BaseTestCase): + + def setUp(self): + self.url = reverse('api-v2.1-admin-users') + self.tmp_email = '%s@email.com' % randstring(10) + + def tearDown(self): + self.remove_user(self.tmp_email) + + def test_get_users(self): + self.login_as(self.admin) + + resp = self.client.get(self.url) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert json_resp['total_count'] > 0 + assert len(json_resp['data']) == json_resp['total_count'] + + assert json_resp['data'][0].has_key('email') + assert json_resp['data'][0].has_key('name') + assert json_resp['data'][0].has_key('contact_email') + assert json_resp['data'][0].has_key('is_staff') + assert json_resp['data'][0].has_key('is_active') + assert json_resp['data'][0].has_key('create_time') + assert json_resp['data'][0].has_key('department') + assert json_resp['data'][0].has_key('quota_total') + assert json_resp['data'][0].has_key('quota_usage') + assert json_resp['data'][0].has_key('create_time') + + def test_get_with_invalid_user_permission(self): + self.login_as(self.user) + + resp = self.client.get(self.url) + self.assertEqual(403, resp.status_code) + + def test_create_user(self): + self.login_as(self.admin) + + data = { + "email": self.tmp_email, + "password": 'password', + } + + resp = self.client.post(self.url, json.dumps(data), + 'application/json') + + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + assert json_resp['email'] == self.tmp_email + + ccnet_email = ccnet_api.get_emailuser(self.tmp_email) + assert ccnet_email.email == self.tmp_email + + self.remove_user(self.tmp_email) + + def test_create_with_invalid_user_permission(self): + self.login_as(self.user) + + self.tmp_email = '%s@email.com' % randstring(10) + + # copy folders + data = { + "email": self.tmp_email, + "password": 'password', + } + + resp = self.client.post(self.url, json.dumps(data), + 'application/json') + + self.assertEqual(403, resp.status_code) + +class AdminUserTest(BaseTestCase): + + def setUp(self): + self.user_name = self.user.username + self.tmp_email = '%s@email.com' % randstring(10) + self.password = randstring(10) + self.url = reverse('api-v2.1-admin-user', args=[self.tmp_email]) + + ccnet_api.add_emailuser(self.tmp_email, self.password, 0, 0) + + def tearDown(self): + self.remove_user(self.tmp_email) + + def get_user_info(self): + self.login_as(self.admin) + resp = self.client.get(self.url) + json_resp = json.loads(resp.content) + return json_resp + + def test_get_user(self): + self.login_as(self.admin) + + resp = self.client.get(self.url) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert json_resp['email'] == self.tmp_email + + assert json_resp.has_key('email') + assert json_resp.has_key('name') + assert json_resp.has_key('contact_email') + assert json_resp.has_key('is_staff') + assert json_resp.has_key('is_active') + assert json_resp.has_key('create_time') + assert json_resp.has_key('department') + assert json_resp.has_key('quota_total') + assert json_resp.has_key('quota_usage') + + def test_get_with_invalid_user_permission(self): + self.login_as(self.user) + + resp = self.client.get(self.url) + self.assertEqual(403, resp.status_code) + + def test_update_is_active(self): + + self.login_as(self.admin) + + # acitve user + data = {"email": self.tmp_email, "is_active": 'true'} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + assert json_resp['is_active'] == True + + ccnet_email = ccnet_api.get_emailuser(self.tmp_email) + assert ccnet_email.is_active == True + + # inacitve user + data = {"email": self.tmp_email, "is_active": 'False'} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + assert json_resp['is_active'] == False + + ccnet_email = ccnet_api.get_emailuser(self.tmp_email) + assert ccnet_email.is_active == False + + def test_update_is_staff(self): + + self.login_as(self.admin) + + # make user staff + data = {"email": self.tmp_email, "is_staff": 'true'} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + assert json_resp['is_staff'] == True + + ccnet_email = ccnet_api.get_emailuser(self.tmp_email) + assert ccnet_email.is_staff == True + + # make user not staff + data = {"email": self.tmp_email, "is_staff": 'False'} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + assert json_resp['is_staff'] == False + + ccnet_email = ccnet_api.get_emailuser(self.tmp_email) + assert ccnet_email.is_staff == False + + def test_update_role(self): + + if not LOCAL_PRO_DEV_ENV: + return + + self.login_as(self.admin) + + # change user to guest user + data = {"email": self.tmp_email, "role": GUEST_USER} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + assert json_resp['role'] == GUEST_USER + + ccnet_email = ccnet_api.get_emailuser(self.tmp_email) + assert ccnet_email.role == GUEST_USER + + # change user to default user + data = {"email": self.tmp_email, "role": DEFAULT_USER} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + assert json_resp['role'] == DEFAULT_USER + + ccnet_email = ccnet_api.get_emailuser(self.tmp_email) + assert ccnet_email.role == DEFAULT_USER + + def test_update_password(self): + + self.login_as(self.admin) + + # change user password + password = randstring(10) + data = {"email": self.tmp_email, "password": password} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + self.assertEqual(200, resp.status_code) + + assert ccnet_api.validate_emailuser(self.tmp_email, password) == 0 + + def test_update_name(self): + + self.login_as(self.admin) + + # change user name + tmp_name = randstring(10) + data = {"email": self.tmp_email, "name": tmp_name} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + assert json_resp['name'] == tmp_name + + assert email2nickname(self.tmp_email) == tmp_name + + def test_update_department(self): + + self.login_as(self.admin) + + # change user name + department = randstring(10) + data = {"email": self.tmp_email, "department": department} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + assert json_resp['department'] == department + + d_profile = DetailedProfile.objects.get_detailed_profile_by_user(self.tmp_email) + assert d_profile.department == department + + def test_update_quota_total(self): + + self.login_as(self.admin) + + # change user name + quota_total = 1232 + data = {"email": self.tmp_email, "quota_total": quota_total} + resp = self.client.put(self.url, json.dumps(data), + 'application/json') + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + + quota_total_mb = quota_total * get_file_size_unit('mb') + assert json_resp['quota_total'] == quota_total_mb + assert seafile_api.get_user_quota(self.tmp_email) == quota_total_mb + + def test_delete_user(self): + self.login_as(self.admin) + + resp = self.client.delete(self.url) + self.assertEqual(200, resp.status_code) + json_resp = json.loads(resp.content) + assert json_resp['success'] == True + + assert ccnet_api.get_emailuser(self.tmp_email) is None + + def test_delete_with_invalid_user_permission(self): + self.login_as(self.user) + resp = self.client.delete(self.url) + self.assertEqual(403, resp.status_code)