From 0b61259b9d4bf5c49406f3fe94e9e606c32bd75b Mon Sep 17 00:00:00 2001 From: zhengxie Date: Tue, 14 Jul 2015 11:01:48 +0800 Subject: [PATCH] Add permission check and quota check --- seahub/api2/endpoints/dir_shared_items.py | 65 +++++-- seahub/api2/urls.py | 2 +- seahub/test_utils.py | 2 +- tests/api/endpoints/test_dir_shared_items.py | 184 +++++++++++++++++++ tests/api/test_shares.py | 175 ------------------ 5 files changed, 240 insertions(+), 188 deletions(-) create mode 100644 tests/api/endpoints/test_dir_shared_items.py diff --git a/seahub/api2/endpoints/dir_shared_items.py b/seahub/api2/endpoints/dir_shared_items.py index 6f2e5f2675..28e5d85bd9 100644 --- a/seahub/api2/endpoints/dir_shared_items.py +++ b/seahub/api2/endpoints/dir_shared_items.py @@ -13,17 +13,24 @@ import seaserv from seaserv import seafile_api from seahub.api2.authentication import TokenAuthentication +from seahub.api2.permissions import IsRepoAccessible from seahub.api2.utils import api_error from seahub.base.templatetags.seahub_tags import email2nickname -from seahub.utils import is_org_context, is_valid_username from seahub.share.signals import share_repo_to_user_successful +from seahub.share.views import check_user_share_quota +from seahub.utils import (is_org_context, is_valid_username, + send_perm_audit_msg) + logger = logging.getLogger(__name__) json_content_type = 'application/json; charset=utf-8' class DirSharedItemsEndpoint(APIView): + """Support uniform interface(list, share, unshare, modify) for sharing + library/folder to users/groups. + """ authentication_classes = (TokenAuthentication, SessionAuthentication) - permission_classes = (IsAuthenticated,) + permission_classes = (IsAuthenticated, IsRepoAccessible) throttle_classes = (UserRateThrottle, ) def list_user_shared_items(self, request, repo_id, path): @@ -64,9 +71,6 @@ class DirSharedItemsEndpoint(APIView): }) return ret - # def add_user_shared_item(self, request, repo_id, path): - # pass - def handle_shared_to_args(self, request): share_type = request.GET.get('share_type', None) shared_to_user = False @@ -120,6 +124,8 @@ class DirSharedItemsEndpoint(APIView): return sub_repo def get(self, request, repo_id, format=None): + """List shared items(shared to users/groups) for a folder/library. + """ repo = seafile_api.get_repo(repo_id) if not repo: return api_error(status.HTTP_400_BAD_REQUEST, 'Repo not found.') @@ -184,6 +190,9 @@ class DirSharedItemsEndpoint(APIView): seafile_api.set_share_permission(shared_repo.id, username, shared_to, permission) + send_perm_audit_msg('modify-repo-perm', username, shared_to, + shared_repo.id, path, permission) + if shared_to_group: gid = request.GET.get('group_id') try: @@ -202,6 +211,9 @@ class DirSharedItemsEndpoint(APIView): seafile_api.set_group_repo_permission(gid, shared_repo.id, permission) + send_perm_audit_msg('modify-repo-perm', username, gid, + shared_repo.id, path, permission) + return HttpResponse(json.dumps({'success': True}), status=200, content_type=json_content_type) @@ -211,8 +223,6 @@ class DirSharedItemsEndpoint(APIView): if not repo: return api_error(status.HTTP_400_BAD_REQUEST, 'Repo not found.') - # TODO: perm check, quota check - path = request.GET.get('p', '/') if seafile_api.get_dir_id_by_path(repo.id, path) is None: return api_error(status.HTTP_400_BAD_REQUEST, 'Directory not found.') @@ -239,12 +249,20 @@ class DirSharedItemsEndpoint(APIView): if share_type == 'user': share_to_users = request.DATA.getlist('username') for to_user in share_to_users: + if not check_user_share_quota(username, shared_repo, users=[to_user]): + return api_error(status.HTTP_403_FORBIDDEN, + 'Failed to share: No enough quota.') + try: if is_org_context(request): org_id = request.user.org.org_id - # org_share_repo(org_id, shared_repo.id, username, to_user, permission) + seaserv.seafserv_threaded_rpc.org_add_share( + org_id, shared_repo.id, username, to_user, + permission) else: - seafile_api.share_repo(shared_repo.repo_id, username, to_user, permission) + seafile_api.share_repo(shared_repo.id, username, + to_user, permission) + # send a signal when sharing repo successful share_repo_to_user_successful.send(sender=None, from_user=username, @@ -258,6 +276,9 @@ class DirSharedItemsEndpoint(APIView): }, "permission": permission }) + + send_perm_audit_msg('add-repo-perm', username, to_user, + shared_repo.id, path, permission) except SearpcError as e: logger.error(e) failed.append(to_user) @@ -274,6 +295,10 @@ class DirSharedItemsEndpoint(APIView): if not group: return api_error(status.HTTP_400_BAD_REQUEST, 'Group not found: %s' % gid) + if not check_user_share_quota(username, shared_repo, groups=[group]): + return api_error(status.HTTP_403_FORBIDDEN, + 'Failed to share: No enough quota.') + try: if is_org_context(request): org_id = request.user.org.org_id @@ -283,7 +308,6 @@ class DirSharedItemsEndpoint(APIView): else: seafile_api.set_group_repo(shared_repo.repo_id, gid, username, permission) - # todo: perm audit msg success.append({ "share_type": "group", @@ -293,6 +317,9 @@ class DirSharedItemsEndpoint(APIView): }, "permission": permission }) + + send_perm_audit_msg('add-repo-perm', username, gid, + shared_repo.id, path, permission) except SearpcError as e: logger.error(e) failed.append(group.group_name) @@ -335,10 +362,16 @@ class DirSharedItemsEndpoint(APIView): if is_org_context(request): org_id = request.user.org.org_id - # org_remove_share(org_id, repo_id, from_email, shared_to) + seaserv.seafserv_threaded_rpc.org_remove_share( + org_id, shared_repo.id, username, shared_to) else: seaserv.remove_share(shared_repo.id, username, shared_to) + permission = seafile_api.check_permission_by_path(repo.id, path, + shared_to) + send_perm_audit_msg('delete-repo-perm', username, shared_to, + shared_repo.id, path, permission) + if shared_to_group: group_id = request.GET.get('group_id') try: @@ -346,11 +379,21 @@ class DirSharedItemsEndpoint(APIView): except ValueError: return api_error(status.HTTP_400_BAD_REQUEST, 'Bad group id') + # hacky way to get group repo permission + permission = '' + for e in seafile_api.list_repo_shared_group(username, shared_repo.id): + if e.group_id == group_id: + permission = e.perm + break + if is_org_context(request): org_id = request.user.org.org_id seaserv.del_org_group_repo(shared_repo.id, org_id, group_id) else: seafile_api.unset_group_repo(shared_repo.id, group_id, username) + send_perm_audit_msg('delete-repo-perm', username, group_id, + shared_repo.id, path, permission) + return HttpResponse(json.dumps({'success': True}), status=200, content_type=json_content_type) diff --git a/seahub/api2/urls.py b/seahub/api2/urls.py index 2a16410225..60d0e55860 100644 --- a/seahub/api2/urls.py +++ b/seahub/api2/urls.py @@ -41,7 +41,7 @@ urlpatterns = patterns('', url(r'^repos/(?P[-0-9-a-f]{36})/dir/$', DirView.as_view(), name='DirView'), url(r'^repos/(?P[-0-9-a-f]{36})/dir/sub_repo/$', DirSubRepoView.as_view()), url(r'^repos/(?P[-0-9-a-f]{36})/dir/share/$', DirShareView.as_view()), - url(r'^repos/(?P[-0-9-a-f]{36})/dir/shared_items/$', DirSharedItemsEndpoint.as_view()), + url(r'^repos/(?P[-0-9-a-f]{36})/dir/shared_items/$', DirSharedItemsEndpoint.as_view(), name="api2-dir-shared-items"), url(r'^repos/(?P[-0-9-a-f]{36})/dir/download/$', DirDownloadView.as_view()), url(r'^repos/(?P[-0-9-a-f]{36})/thumbnail/$', ThumbnailView.as_view(), name='api2-thumbnail'), url(r'^starredfiles/', StarredFileView.as_view(), name='starredfiles'), diff --git a/seahub/test_utils.py b/seahub/test_utils.py index 28b56ccb97..e8221e12c2 100644 --- a/seahub/test_utils.py +++ b/seahub/test_utils.py @@ -61,7 +61,7 @@ class Fixtures(Exam): def create_repo(self, **kwargs): repo_id = seafile_api.create_repo('test-repo', '', - 'test@test.com', None) + self.user.username, None) return repo_id def remove_repo(self, repo_id=None): diff --git a/tests/api/endpoints/test_dir_shared_items.py b/tests/api/endpoints/test_dir_shared_items.py new file mode 100644 index 0000000000..41d396385e --- /dev/null +++ b/tests/api/endpoints/test_dir_shared_items.py @@ -0,0 +1,184 @@ +import json + +from seaserv import seafile_api + +from seahub.test_utils import BaseTestCase + +class DirSharedItemsTest(BaseTestCase): + def tearDown(self): + self.remove_repo() + + def _add_shared_items(self): + sub_repo_id = seafile_api.create_virtual_repo(self.repo.id, + self.folder, + self.repo.name, '', + self.user.username) + # A user shares a folder to admin with permission 'rw'. + seafile_api.share_repo(sub_repo_id, self.user.username, + self.admin.username, 'rw') + # A user shares a folder to group with permission 'rw'. + seafile_api.set_group_repo(sub_repo_id, self.group.id, + self.user.username, 'rw') + + def test_can_list_all(self): + self._add_shared_items() + self.login_as(self.user) + + resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % ( + self.repo.id, + self.folder)) + + self.assertEqual(200, resp.status_code) + json_resp = json.loads(resp.content) + assert len(json_resp) == 2 + + def test_list_without_repo_permission(self): + self._add_shared_items() + self.login_as(self.admin) + + resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % ( + self.repo.id, + self.folder)) + + self.assertEqual(403, resp.status_code) + + def test_can_list_without_share_type_arg(self): + self._add_shared_items() + self.login_as(self.user) + + resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s' % ( + self.repo.id, + self.folder)) + + self.assertEqual(200, resp.status_code) + json_resp = json.loads(resp.content) + assert len(json_resp) == 2 + + def test_can_share_folder_to_users(self): + self.login_as(self.user) + + resp = self.client.put( + '/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id, + self.folder), + "share_type=user&username=a@a.com&username=b@b.com", + 'application/x-www-form-urlencoded', + ) + self.assertEqual(200, resp.status_code) + json_resp = json.loads(resp.content) + assert len(json_resp['success']) == 2 + assert json_resp['success'][0]['permission'] == 'r' + + def test_can_share_root_to_groups(self): + self.login_as(self.user) + + grp1 = self.group + grp2 = self.create_group(group_name="test-grp2", + username=self.user.username) + + resp = self.client.put( + '/api2/repos/%s/dir/shared_items/?p=/' % (self.repo.id), + "share_type=group&group_id=%d&group_id=%d&permission=rw" % (grp1.id, grp2.id), + 'application/x-www-form-urlencoded', + ) + self.assertEqual(200, resp.status_code) + json_resp = json.loads(resp.content) + assert len(json_resp['success']) == 2 + assert json_resp['success'][0]['permission'] == 'rw' + + def test_can_share_folder_to_groups(self): + self.login_as(self.user) + + grp1 = self.group + grp2 = self.create_group(group_name="test-grp2", + username=self.user.username) + + resp = self.client.put( + '/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id, + self.folder), + "share_type=group&group_id=%d&group_id=%d&permission=rw" % (grp1.id, grp2.id), + 'application/x-www-form-urlencoded', + ) + self.assertEqual(200, resp.status_code) + json_resp = json.loads(resp.content) + assert len(json_resp['success']) == 2 + assert json_resp['success'][0]['permission'] == 'rw' + + def test_can_modify_user_shared_repo(self): + self._add_shared_items() + self.login_as(self.user) + + resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % ( + self.repo.id, + self.folder, + self.admin.username), { + 'permission': 'r' + } + ) + json_resp = json.loads(resp.content) + assert json_resp['success'] is True + + resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user' % ( + self.repo.id, + self.folder)) + json_resp = json.loads(resp.content) + assert json_resp[0]['permission'] == 'r' + + def test_can_modify_group_shared_repo(self): + self._add_shared_items() + self.login_as(self.user) + + resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % ( + self.repo.id, + self.folder, + self.group.id), { + 'permission': 'r' + } + ) + json_resp = json.loads(resp.content) + assert json_resp['success'] is True + + resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % ( + self.repo.id, + self.folder)) + json_resp = json.loads(resp.content) + assert json_resp[0]['permission'] == 'r' + + def test_can_unshare_repo_to_user(self): + self._add_shared_items() + self.login_as(self.user) + + resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % ( + self.repo.id, + self.folder, + self.admin.username + )) + self.assertEqual(200, resp.status_code) + json_resp = json.loads(resp.content) + assert json_resp['success'] is True + + resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user' % ( + self.repo.id, + self.folder)) + + json_resp = json.loads(resp.content) + assert len(json_resp) == 0 + + def test_can_unshare_repo_to_group(self): + self._add_shared_items() + self.login_as(self.user) + + resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % ( + self.repo.id, + self.folder, + self.group.id + )) + self.assertEqual(200, resp.status_code) + json_resp = json.loads(resp.content) + assert json_resp['success'] is True + + resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % ( + self.repo.id, + self.folder)) + + json_resp = json.loads(resp.content) + assert len(json_resp) == 0 diff --git a/tests/api/test_shares.py b/tests/api/test_shares.py index 09c452b384..438462bafa 100644 --- a/tests/api/test_shares.py +++ b/tests/api/test_shares.py @@ -1,9 +1,4 @@ #coding: UTF-8 -import json - -from seaserv import seafile_api - -from seahub.test_utils import BaseTestCase from tests.common.utils import urljoin from tests.api.apitestbase import ApiTestBase from tests.api.urls import SHARED_LINKS_URL, SHARED_LIBRARIES_URL, \ @@ -32,173 +27,3 @@ class SharesApiTest(ApiTestBase): self.assertIsNotNone(fileshare['token']) self.assertIsNotNone(fileshare['view_cnt']) self.assertIsNotNone(fileshare['path']) - - -class DirSharedItemsTest(BaseTestCase): - def tearDown(self): - self.remove_repo() - - def _add_shared_items(self): - sub_repo_id = seafile_api.create_virtual_repo(self.repo.id, - self.folder, - self.repo.name, '', - self.user.username) - # A user shares a folder to admin with permission 'rw'. - seafile_api.share_repo(sub_repo_id, self.user.username, - self.admin.username, 'rw') - # A user shares a folder to group with permission 'rw'. - seafile_api.set_group_repo(sub_repo_id, self.group.id, - self.user.username, 'rw') - - def test_can_list_all(self): - self._add_shared_items() - self.login_as(self.user) - - resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % ( - self.repo.id, - self.folder)) - - self.assertEqual(200, resp.status_code) - json_resp = json.loads(resp.content) - assert len(json_resp) == 2 - - def test_can_list_without_share_type_arg(self): - self._add_shared_items() - self.login_as(self.user) - - resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s' % ( - self.repo.id, - self.folder)) - - self.assertEqual(200, resp.status_code) - json_resp = json.loads(resp.content) - assert len(json_resp) == 2 - - def test_can_share_folder_to_users(self): - self.login_as(self.user) - - resp = self.client.put( - '/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id, - self.folder), - "share_type=user&username=a@a.com&username=b@b.com", - 'application/x-www-form-urlencoded', - ) - self.assertEqual(200, resp.status_code) - json_resp = json.loads(resp.content) - assert len(json_resp['success']) == 2 - assert json_resp['success'][0]['permission'] == 'r' - - def test_can_share_root_to_groups(self): - self.login_as(self.user) - - grp1 = self.group - grp2 = self.create_group(group_name="test-grp2", - username=self.user.username) - - resp = self.client.put( - '/api2/repos/%s/dir/shared_items/?p=/' % (self.repo.id), - "share_type=group&group_id=%d&group_id=%d&permission=rw" % (grp1.id, grp2.id), - 'application/x-www-form-urlencoded', - ) - self.assertEqual(200, resp.status_code) - json_resp = json.loads(resp.content) - assert len(json_resp['success']) == 2 - assert json_resp['success'][0]['permission'] == 'rw' - - def test_can_share_folder_to_groups(self): - self.login_as(self.user) - - grp1 = self.group - grp2 = self.create_group(group_name="test-grp2", - username=self.user.username) - - resp = self.client.put( - '/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id, - self.folder), - "share_type=group&group_id=%d&group_id=%d&permission=rw" % (grp1.id, grp2.id), - 'application/x-www-form-urlencoded', - ) - self.assertEqual(200, resp.status_code) - json_resp = json.loads(resp.content) - assert len(json_resp['success']) == 2 - assert json_resp['success'][0]['permission'] == 'rw' - - def test_can_modify_user_shared_repo(self): - self._add_shared_items() - self.login_as(self.user) - - resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % ( - self.repo.id, - self.folder, - self.admin.username), { - 'permission': 'r' - } - ) - json_resp = json.loads(resp.content) - assert json_resp['success'] is True - - resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user' % ( - self.repo.id, - self.folder)) - json_resp = json.loads(resp.content) - assert json_resp[0]['permission'] == 'r' - - def test_can_modify_group_shared_repo(self): - self._add_shared_items() - self.login_as(self.user) - - resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % ( - self.repo.id, - self.folder, - self.group.id), { - 'permission': 'r' - } - ) - json_resp = json.loads(resp.content) - assert json_resp['success'] is True - - resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % ( - self.repo.id, - self.folder)) - json_resp = json.loads(resp.content) - assert json_resp[0]['permission'] == 'r' - - def test_can_unshare_repo_to_user(self): - self._add_shared_items() - self.login_as(self.user) - - resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % ( - self.repo.id, - self.folder, - self.admin.username - )) - self.assertEqual(200, resp.status_code) - json_resp = json.loads(resp.content) - assert json_resp['success'] is True - - resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user' % ( - self.repo.id, - self.folder)) - - json_resp = json.loads(resp.content) - assert len(json_resp) == 0 - - def test_can_unshare_repo_to_group(self): - self._add_shared_items() - self.login_as(self.user) - - resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % ( - self.repo.id, - self.folder, - self.group.id - )) - self.assertEqual(200, resp.status_code) - json_resp = json.loads(resp.content) - assert json_resp['success'] is True - - resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % ( - self.repo.id, - self.folder)) - - json_resp = json.loads(resp.content) - assert len(json_resp) == 0