diff --git a/seahub/api2/urls.py b/seahub/api2/urls.py index b920234f05..a8e84a88fb 100644 --- a/seahub/api2/urls.py +++ b/seahub/api2/urls.py @@ -28,6 +28,8 @@ urlpatterns = patterns('', url(r'^repos/public/$', PubRepos.as_view(), name="api2-pub-repos"), url(r'^repos/(?P[-0-9a-f]{36})/$', Repo.as_view(), name="api2-repo"), url(r'^repos/(?P[-0-9a-f]{36})/history/$', RepoHistory.as_view()), + url(r'^repos/(?P[-0-9a-f]{36})/user-folder-perm/$', RepoUserFolderPerm.as_view(), name="api2-repo-user-folder-perm"), + url(r'^repos/(?P[-0-9a-f]{36})/group-folder-perm/$', RepoGroupFolderPerm.as_view(), name="api2-repo-group-folder-perm"), url(r'^repos/(?P[-0-9a-f]{36})/history-limit/$', RepoHistoryLimit.as_view(), name="api2-repo-history-limit"), url(r'^repos/(?P[-0-9a-f]{36})/download-info/$', DownloadRepo.as_view()), url(r'^repos/(?P[-0-9a-f]{36})/owner/$', RepoOwner.as_view(), name="api2-repo-owner"), diff --git a/seahub/api2/utils.py b/seahub/api2/utils.py index b0a0b87e64..bfe8d107f6 100644 --- a/seahub/api2/utils.py +++ b/seahub/api2/utils.py @@ -14,6 +14,7 @@ from django.core.paginator import EmptyPage, InvalidPage from django.http import HttpResponse from rest_framework.response import Response from rest_framework import status, serializers +import seaserv from seaserv import seafile_api, get_commits, server_repo_size, \ get_personal_groups_by_user, is_group_user, get_group, seafserv_threaded_rpc from pysearpc import SearpcError @@ -28,7 +29,7 @@ from seahub.group.views import is_group_staff from seahub.message.models import UserMessage, UserMsgAttachment from seahub.notifications.models import UserNotification from seahub.utils import api_convert_desc_link, get_file_type_and_ext, \ - gen_file_get_url + gen_file_get_url, is_org_context from seahub.utils.paginator import Paginator from seahub.utils.file_types import IMAGE from seahub.api2.models import Token, TokenV2, DESKTOP_PLATFORMS @@ -558,3 +559,123 @@ def to_python_boolean(string): def is_seafile_pro(): return any(['seahub_extra' in app for app in settings.INSTALLED_APPS]) + +def api_repo_setting_permission_check(func): + """Decorator for initial repo setting permission check + """ + def _decorated(view, request, repo_id, *args, **kwargs): + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # check permission + if is_org_context(request): + repo_owner = seafile_api.get_org_repo_owner(repo_id) + else: + repo_owner = seafile_api.get_repo_owner(repo_id) + + username = request.user.username + if repo.is_virtual or username != repo_owner: + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + return func(view, request, repo_id, *args, **kwargs) + + return _decorated + +def api_repo_user_folder_perm_check(func): + """Check repo setting permission and args used by user-folder-perm + """ + def _decorated(view, request, repo_id, *args, **kwargs): + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # check permission + if is_org_context(request): + repo_owner = seafile_api.get_org_repo_owner(repo_id) + else: + repo_owner = seafile_api.get_repo_owner(repo_id) + + username = request.user.username + if repo.is_virtual or username != repo_owner: + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # check arguments + user = request.data.get('user', None) + path = request.data.get('path', None) + perm = request.data.get('perm', None) + + try: + User.objects.get(email=user) + except User.DoesNotExist: + error_msg = 'User %s not found.' % user + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if path: + path = path.rstrip('/') if path != '/' else path + + if seafile_api.get_dir_id_by_path(repo_id, path) is None: + error_msg = 'Folder %s not found.' % path + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if request.method in ('POST', 'PUT') and perm not in ('r', 'rw'): + error_msg = 'perm invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + return func(view, request, repo_id, *args, **kwargs) + + return _decorated + +def api_repo_group_folder_perm_check(func): + """Check repo setting permission and args used by group-folder-perm + """ + def _decorated(view, request, repo_id, *args, **kwargs): + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # check permission + if is_org_context(request): + repo_owner = seafile_api.get_org_repo_owner(repo_id) + else: + repo_owner = seafile_api.get_repo_owner(repo_id) + + username = request.user.username + if repo.is_virtual or username != repo_owner: + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # check arguments + group_id = request.data.get('group_id', None) + path = request.data.get('path', None) + perm = request.data.get('perm', None) + + try: + group_id = int(group_id) + except ValueError: + error_msg = 'group_id invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if not seaserv.get_group(group_id): + error_msg = 'Group %s not found.' % group_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if path: + path = path.rstrip('/') if path != '/' else path + + if seafile_api.get_dir_id_by_path(repo_id, path) is None: + error_msg = 'Folder %s not found.' % path + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if request.method in ('POST', 'PUT') and perm not in ('r', 'rw'): + error_msg = 'perm invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + return func(view, request, repo_id, *args, **kwargs) + + return _decorated diff --git a/seahub/api2/views.py b/seahub/api2/views.py index 7302ad2b37..8c7cda8e77 100644 --- a/seahub/api2/views.py +++ b/seahub/api2/views.py @@ -39,7 +39,9 @@ from .utils import is_repo_writable, is_repo_accessible, \ get_groups, get_group_and_contacts, prepare_events, \ get_person_msgs, api_group_check, get_email, get_timestamp, \ get_group_message_json, get_group_msgs, get_group_msgs_json, get_diff_details, \ - json_response, to_python_boolean, is_seafile_pro + json_response, to_python_boolean, is_seafile_pro, \ + api_repo_user_folder_perm_check, api_repo_setting_permission_check, \ + api_repo_group_folder_perm_check from seahub.avatar.settings import AVATAR_DEFAULT_SIZE from seahub.avatar.templatetags.avatar_tags import api_avatar_url, avatar @@ -94,7 +96,7 @@ if HAS_OFFICE_CONVERTER: import seahub.settings as settings from seahub.settings import THUMBNAIL_EXTENSION, THUMBNAIL_ROOT, \ ENABLE_GLOBAL_ADDRESSBOOK, FILE_LOCK_EXPIRATION_DAYS, \ - ENABLE_THUMBNAIL, ENABLE_SUB_LIBRARY + ENABLE_THUMBNAIL, ENABLE_SUB_LIBRARY, ENABLE_FOLDER_PERM try: from seahub.settings import CLOUD_MODE except ImportError: @@ -4374,3 +4376,252 @@ class RepoUploadSharedLink(APIView): link.delete() result = {'success': True} return Response(result) + +def get_repo_user_folder_perm_result(repo_id, path, user): + result = {} + permission = seafile_api.get_folder_user_perm(repo_id, path, user) + if permission: + result['repo_id'] = repo_id + result['user_email'] = user + result['user_name'] = email2nickname(user) + result['folder_path'] = path + result['folder_name'] = path if path == '/' else os.path.basename(path.rstrip('/')) + result['permission'] = permission + + return result + +class RepoUserFolderPerm(APIView): + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAuthenticated,) + throttle_classes = (UserRateThrottle,) + + @api_repo_setting_permission_check + def get(self, request, repo_id, format=None): + + if not is_pro_version(): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + results = [] + folder_perms = seafile_api.list_folder_user_perm_by_repo(repo_id) + for perm in folder_perms: + result = {} + result['repo_id'] = perm.repo_id + result['user_email'] = perm.user + result['user_name'] = email2nickname(perm.user) + result['folder_path'] = perm.path + result['folder_name'] = os.path.basename(perm.path.rstrip('/')) + result['permission'] = perm.permission + + results.append(result) + + return Response(results) + + @api_repo_user_folder_perm_check + def post(self, request, repo_id, format=None): + + if not (is_pro_version() and ENABLE_FOLDER_PERM): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + user = request.data.get('user') + path = request.data.get('path') + perm = request.data.get('perm') + path = path.rstrip('/') if path != '/' else path + + permission = seafile_api.get_folder_user_perm(repo_id, path, user) + if permission: + error_msg = 'Permission already exists.' + return api_error(status.HTTP_409_CONFLICT, error_msg) + + username = request.user.username + try: + seafile_api.add_folder_user_perm(repo_id, path, perm, user) + send_perm_audit_msg('add-repo-perm', username, user, repo_id, path, perm) + result = get_repo_user_folder_perm_result(repo_id, path, user) + return Response(result) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + @api_repo_user_folder_perm_check + def put(self, request, repo_id, format=None): + + if not (is_pro_version() and ENABLE_FOLDER_PERM): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + user = request.data.get('user') + path = request.data.get('path') + perm = request.data.get('perm') + path = path.rstrip('/') if path != '/' else path + + permission = seafile_api.get_folder_user_perm(repo_id, path, user) + if not permission: + error_msg = 'Folder permission not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + username = request.user.username + try: + seafile_api.set_folder_user_perm(repo_id, path, perm, user) + send_perm_audit_msg('modify-repo-perm', username, user, repo_id, path, perm) + result = get_repo_user_folder_perm_result(repo_id, path, user) + return Response(result) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + @api_repo_user_folder_perm_check + def delete(self, request, repo_id, format=None): + + if not (is_pro_version() and ENABLE_FOLDER_PERM): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + user = request.data.get('user') + path = request.data.get('path') + path = path.rstrip('/') if path != '/' else path + + permission = seafile_api.get_folder_user_perm(repo_id, path, user) + if not permission: + return Response('success') + + username = request.user.username + try: + seafile_api.rm_folder_user_perm(repo_id, path, user) + send_perm_audit_msg('delete-repo-perm', username, + user, repo_id, path, permission) + return Response('success') + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + +def get_repo_group_folder_perm_result(repo_id, path, group_id): + result = {} + group = seaserv.get_group(group_id) + permission = seafile_api.get_folder_group_perm(repo_id, path, group_id) + if permission: + result['repo_id'] = repo_id + result['group_id'] = group_id + result['group_name'] = group.group_name + result['folder_path'] = path + result['folder_name'] = os.path.basename(path.rstrip('/')) + result['permission'] = permission + + return result + +class RepoGroupFolderPerm(APIView): + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAuthenticated,) + throttle_classes = (UserRateThrottle,) + + @api_repo_setting_permission_check + def get(self, request, repo_id, format=None): + + if not is_pro_version(): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + results = [] + group_folder_perms = seafile_api.list_folder_group_perm_by_repo(repo_id) + for perm in group_folder_perms: + result = {} + group = seaserv.get_group(perm.group_id) + result['repo_id'] = perm.repo_id + result['group_id'] = perm.group_id + result['group_name'] = group.group_name + result['folder_path'] = perm.path + result['folder_name'] = os.path.basename(perm.path.rstrip('/')) + result['permission'] = perm.permission + + results.append(result) + + return Response(results) + + @api_repo_group_folder_perm_check + def post(self, request, repo_id, format=None): + + if not (is_pro_version() and ENABLE_FOLDER_PERM): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + group_id = request.data.get('group_id') + path = request.data.get('path') + perm = request.data.get('perm') + group_id = int(group_id) + path = path.rstrip('/') if path != '/' else path + + permission = seafile_api.get_folder_group_perm(repo_id, path, group_id) + if permission: + error_msg = 'Permission already exists.' + return api_error(status.HTTP_409_CONFLICT, error_msg) + + username = request.user.username + try: + seafile_api.add_folder_group_perm(repo_id, path, perm, group_id) + send_perm_audit_msg('add-repo-perm', username, group_id, repo_id, path, perm) + result = get_repo_group_folder_perm_result(repo_id, path, group_id) + return Response(result) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + @api_repo_group_folder_perm_check + def put(self, request, repo_id, format=None): + + if not (is_pro_version() and ENABLE_FOLDER_PERM): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + group_id = request.data.get('group_id') + path = request.data.get('path') + perm = request.data.get('perm') + group_id = int(group_id) + path = path.rstrip('/') if path != '/' else path + + permission = seafile_api.get_folder_group_perm(repo_id, path, group_id) + if not permission: + error_msg = 'Folder permission not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + username = request.user.username + try: + seafile_api.set_folder_group_perm(repo_id, path, perm, group_id) + send_perm_audit_msg('modify-repo-perm', username, group_id, repo_id, path, perm) + result = get_repo_group_folder_perm_result(repo_id, path, group_id) + return Response(result) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + @api_repo_group_folder_perm_check + def delete(self, request, repo_id, format=None): + + if not (is_pro_version() and ENABLE_FOLDER_PERM): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + group_id = request.data.get('group_id') + path = request.data.get('path') + group_id = int(group_id) + path = path.rstrip('/') if path != '/' else path + + permission = seafile_api.get_folder_group_perm(repo_id, path, group_id) + if not permission: + return Response('success') + + username = request.user.username + try: + seafile_api.rm_folder_group_perm(repo_id, path, group_id) + send_perm_audit_msg('delete-repo-perm', username, group_id, + repo_id, path, permission) + return Response('success') + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) diff --git a/tests/api/test_repo_group_folder_perm.py b/tests/api/test_repo_group_folder_perm.py new file mode 100644 index 0000000000..406cd2c519 --- /dev/null +++ b/tests/api/test_repo_group_folder_perm.py @@ -0,0 +1,265 @@ +"""seahub/api2/views.py::Repo api tests. +""" +import json +from random import randint +from tests.common.utils import randstring +from django.core.urlresolvers import reverse +from seaserv import seafile_api +from seahub.test_utils import BaseTestCase +try: + from seahub.settings import LOCAL_PRO_DEV_ENV +except ImportError: + LOCAL_PRO_DEV_ENV = False + +class RepoGroupFolderPermTest(BaseTestCase): + + def setUp(self): + self.user_repo_id = self.repo.id + self.user_folder_path = self.folder + self.perm_r = 'r' + self.perm_rw = 'rw' + self.group_id = self.group.id + + def tearDown(self): + self.remove_repo() + self.remove_group() + + def test_can_get_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + seafile_api.add_folder_group_perm(self.user_repo_id, + self.user_folder_path, self.perm_r, self.group_id) + + self.login_as(self.user) + + resp = self.client.get(reverse("api2-repo-group-folder-perm", args=[self.user_repo_id])) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + assert json_resp[0]['group_id'] == self.group_id + assert json_resp[0]['repo_id'] == self.user_repo_id + assert json_resp[0]['permission'] == self.perm_r + assert json_resp[0]['folder_path'] == self.user_folder_path + + def test_can_not_get_if_not_repo_owner(self): + self.login_as(self.admin) + resp = self.client.get(reverse("api2-repo-group-folder-perm", args=[self.user_repo_id])) + self.assertEqual(403, resp.status_code) + + def test_can_modify_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + seafile_api.add_folder_group_perm(self.user_repo_id, + self.user_folder_path, self.perm_r, self.group_id) + + self.login_as(self.user) + + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s&perm=%s' % (self.group_id, + self.user_folder_path, self.perm_rw) + + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(200, resp.status_code) + + resp = self.client.get(reverse("api2-repo-group-folder-perm", args=[self.user_repo_id])) + json_resp = json.loads(resp.content) + assert json_resp[0]['permission'] == self.perm_rw + + def test_can_not_modify_if_not_repo_owner(self): + self.login_as(self.admin) + + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s&perm=%s' % (self.group_id, + self.user_folder_path, self.perm_rw) + + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code) + + def test_can_not_modify_if_folder_perm_not_exist(self): + + if not LOCAL_PRO_DEV_ENV: + return + + self.login_as(self.user) + + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s&perm=%s' % (self.group_id, + self.user_folder_path, self.perm_rw) + + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + def test_can_add_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + self.login_as(self.user) + + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = { + "group_id": self.group_id, + "path": self.user_folder_path, + "perm": self.perm_rw + } + + resp = self.client.post(url, data) + self.assertEqual(200, resp.status_code) + + resp = self.client.get(reverse("api2-repo-group-folder-perm", args=[self.user_repo_id])) + json_resp = json.loads(resp.content) + assert json_resp[0]['group_id'] == self.group_id + assert json_resp[0]['repo_id'] == self.user_repo_id + assert json_resp[0]['permission'] == self.perm_rw + assert json_resp[0]['folder_path'] == self.user_folder_path + + def test_can_not_add_if_not_repo_owner(self): + self.login_as(self.admin) + + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = { + "group_id": self.group_id, + "path": self.user_folder_path, + "perm": self.perm_rw + } + + resp = self.client.post(url, data) + self.assertEqual(403, resp.status_code) + + def test_can_not_add_if_folder_perm_already_exist(self): + + if not LOCAL_PRO_DEV_ENV: + return + + seafile_api.add_folder_group_perm(self.user_repo_id, + self.user_folder_path, self.perm_r, self.group_id) + + self.login_as(self.user) + + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = { + "group_id": self.group_id, + "path": self.user_folder_path, + "perm": self.perm_rw + } + + resp = self.client.post(url, data) + self.assertEqual(409, resp.status_code) + + def test_can_delete_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + seafile_api.add_folder_group_perm(self.user_repo_id, + self.user_folder_path, self.perm_r, self.group_id) + + self.login_as(self.user) + + resp = self.client.get(reverse("api2-repo-group-folder-perm", args=[self.user_repo_id])) + json_resp = json.loads(resp.content) + assert len(json_resp) == 1 + + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s' % (self.group_id, self.user_folder_path) + resp = self.client.delete(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(200, resp.status_code) + + resp = self.client.get(reverse("api2-repo-group-folder-perm", args=[self.user_repo_id])) + json_resp = json.loads(resp.content) + assert len(json_resp) == 0 + + def test_can_not_delete_if_not_repo_owner(self): + self.login_as(self.admin) + + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s' % (self.group_id, self.user_folder_path) + resp = self.client.delete(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code) + + def test_invalid_path(self): + self.login_as(self.user) + + invalid_path = randstring(6) + + # test delete + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s' % (self.group_id, invalid_path) + resp = self.client.delete(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + # test modify + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s&perm=%s' % (self.group_id, invalid_path, self.perm_rw) + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + # test add + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = { + "group_id": self.group_id, + "path": invalid_path, + "perm": self.perm_rw + } + resp = self.client.post(url, data) + self.assertEqual(404, resp.status_code) + + def test_invalid_group(self): + self.login_as(self.user) + + invalid_group_id = randint(0, 9) + while invalid_group_id == self.group_id: + invalid_group_id = randint(0, 9) + + # test delete + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s' % (invalid_group_id, self.user_folder_path) + resp = self.client.delete(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + # test modify + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s&perm=%s' % (invalid_group_id, self.user_folder_path, self.perm_rw) + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + # test add + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = { + "group_id": invalid_group_id, + "path": self.user_folder_path, + "perm": self.perm_rw + } + resp = self.client.post(url, data) + self.assertEqual(404, resp.status_code) + + def test_invalid_perm(self): + self.login_as(self.user) + + invalid_perm = randstring(1) + while invalid_perm == 'r': + invalid_perm = randstring(1) + + # test modify + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = 'group_id=%s&path=%s&perm=%s' % (self.group_id, self.user_folder_path, invalid_perm) + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(400, resp.status_code) + + invalid_perm = randstring(2) + while invalid_perm == 'rw': + invalid_perm = randstring(2) + + # test add + url = reverse("api2-repo-group-folder-perm", args=[self.user_repo_id]) + data = { + "group_id": self.group_id, + "path": self.user_folder_path, + "perm": invalid_perm + } + resp = self.client.post(url, data) + self.assertEqual(400, resp.status_code) diff --git a/tests/api/test_repo_user_folder_perm.py b/tests/api/test_repo_user_folder_perm.py new file mode 100644 index 0000000000..52ca94b65c --- /dev/null +++ b/tests/api/test_repo_user_folder_perm.py @@ -0,0 +1,261 @@ +"""seahub/api2/views.py::Repo api tests. +""" +import json +from tests.common.utils import randstring +from django.core.urlresolvers import reverse +from seaserv import seafile_api +from seahub.test_utils import BaseTestCase +try: + from seahub.settings import LOCAL_PRO_DEV_ENV +except ImportError: + LOCAL_PRO_DEV_ENV = False + +class RepoUserFolderPermTest(BaseTestCase): + + def setUp(self): + self.user_repo_id = self.repo.id + self.user_folder_path = self.folder + self.perm_r = 'r' + self.perm_rw = 'rw' + self.admin_email = self.admin.email + + def tearDown(self): + self.remove_repo() + + def test_can_get_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + seafile_api.add_folder_user_perm(self.user_repo_id, + self.user_folder_path, self.perm_r, self.admin_email) + + self.login_as(self.user) + + resp = self.client.get(reverse("api2-repo-user-folder-perm", args=[self.user_repo_id])) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + assert json_resp[0]['user_email'] == self.admin_email + assert json_resp[0]['repo_id'] == self.user_repo_id + assert json_resp[0]['permission'] == self.perm_r + assert json_resp[0]['folder_path'] == self.user_folder_path + + def test_can_not_get_if_not_repo_owner(self): + self.login_as(self.admin) + resp = self.client.get(reverse("api2-repo-user-folder-perm", args=[self.user_repo_id])) + self.assertEqual(403, resp.status_code) + + def test_can_modify_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + seafile_api.add_folder_user_perm(self.user_repo_id, + self.user_folder_path, self.perm_r, self.admin_email) + + self.login_as(self.user) + + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s&perm=%s' % (self.admin_email, + self.user_folder_path, self.perm_rw) + + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(200, resp.status_code) + + resp = self.client.get(reverse("api2-repo-user-folder-perm", args=[self.user_repo_id])) + json_resp = json.loads(resp.content) + assert json_resp[0]['permission'] == self.perm_rw + + def test_can_not_modify_if_not_repo_owner(self): + self.login_as(self.admin) + + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s&perm=%s' % (self.admin_email, + self.user_folder_path, self.perm_rw) + + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code) + + def test_can_not_modify_if_folder_perm_not_exist(self): + + if not LOCAL_PRO_DEV_ENV: + return + + self.login_as(self.user) + + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s&perm=%s' % (self.admin_email, + self.user_folder_path, self.perm_rw) + + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + def test_can_add_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + self.login_as(self.user) + + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = { + "user": self.admin_email, + "path": self.user_folder_path, + "perm": self.perm_rw + } + + resp = self.client.post(url, data) + self.assertEqual(200, resp.status_code) + + resp = self.client.get(reverse("api2-repo-user-folder-perm", args=[self.user_repo_id])) + json_resp = json.loads(resp.content) + assert json_resp[0]['user_email'] == self.admin_email + assert json_resp[0]['repo_id'] == self.user_repo_id + assert json_resp[0]['permission'] == self.perm_rw + assert json_resp[0]['folder_path'] == self.user_folder_path + + def test_can_not_add_if_not_repo_owner(self): + self.login_as(self.admin) + + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = { + "user": self.admin_email, + "path": self.user_folder_path, + "perm": self.perm_rw + } + + resp = self.client.post(url, data) + self.assertEqual(403, resp.status_code) + + def test_can_not_add_if_folder_perm_already_exist(self): + + if not LOCAL_PRO_DEV_ENV: + return + + seafile_api.add_folder_user_perm(self.user_repo_id, + self.user_folder_path, self.perm_r, self.admin_email) + + self.login_as(self.user) + + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = { + "user": self.admin_email, + "path": self.user_folder_path, + "perm": self.perm_r + } + + resp = self.client.post(url, data) + self.assertEqual(409, resp.status_code) + + def test_can_delete_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + seafile_api.add_folder_user_perm(self.user_repo_id, + self.user_folder_path, self.perm_r, self.admin_email) + + self.login_as(self.user) + + resp = self.client.get(reverse("api2-repo-user-folder-perm", args=[self.user_repo_id])) + json_resp = json.loads(resp.content) + assert len(json_resp) == 1 + + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s' % (self.admin_email, self.user_folder_path) + resp = self.client.delete(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(200, resp.status_code) + + resp = self.client.get(reverse("api2-repo-user-folder-perm", args=[self.user_repo_id])) + json_resp = json.loads(resp.content) + assert len(json_resp) == 0 + + def test_can_not_delete_if_not_repo_owner(self): + self.login_as(self.admin) + + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s' % (self.admin_email, self.user_folder_path) + resp = self.client.delete(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code) + + def test_invalid_path(self): + self.login_as(self.user) + + invalid_path = randstring(6) + + # test add + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = { + "user": self.admin_email, + "path": invalid_path, + "perm": self.perm_rw + } + resp = self.client.post(url, data) + self.assertEqual(404, resp.status_code) + + # test modify + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s&perm=%s' % (self.admin_email, invalid_path, self.perm_rw) + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + # test delete + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s' % (self.admin_email, invalid_path) + resp = self.client.delete(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + def test_invalid_user(self): + self.login_as(self.user) + + invalid_user = randstring(6) + '@' + randstring(6) + '.com' + + # test add + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = { + "user": invalid_user, + "path": self.user_folder_path, + "perm": self.perm_rw + } + resp = self.client.post(url, data) + self.assertEqual(404, resp.status_code) + + # test modify + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s&perm=%s' % (invalid_user, self.user_folder_path, self.perm_rw) + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + # test delete + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s' % (invalid_user, self.user_folder_path) + resp = self.client.delete(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(404, resp.status_code) + + def test_invalid_perm(self): + self.login_as(self.user) + + invalid_perm = randstring(1) + while invalid_perm == 'r': + invalid_perm = randstring(1) + + # test add + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = { + "user": self.admin_email, + "path": self.user_folder_path, + "perm": invalid_perm + } + resp = self.client.post(url, data) + self.assertEqual(400, resp.status_code) + + invalid_perm = randstring(2) + while invalid_perm == 'rw': + invalid_perm = randstring(2) + + # test modify + url = reverse("api2-repo-user-folder-perm", args=[self.user_repo_id]) + data = 'user=%s&path=%s&perm=%s' % (self.admin_email, self.user_folder_path, invalid_perm) + resp = self.client.put(url, data, 'application/x-www-form-urlencoded') + self.assertEqual(400, resp.status_code)