diff --git a/seahub/api2/endpoints/repos_batch.py b/seahub/api2/endpoints/repos_batch.py index dae4fed4c7..a17fbedd33 100644 --- a/seahub/api2/endpoints/repos_batch.py +++ b/seahub/api2/endpoints/repos_batch.py @@ -29,7 +29,7 @@ from seahub.utils import is_org_context, send_perm_audit_msg, \ normalize_dir_path, get_folder_permission_recursively, \ normalize_file_path, check_filename_with_rename from seahub.utils.repo import get_repo_owner, get_available_repo_perms, \ - parse_repo_perm, get_locked_files_by_dir + parse_repo_perm, get_locked_files_by_dir, get_sub_folder_permission_by_dir from seahub.views import check_folder_permission from seahub.settings import MAX_PATH @@ -1254,6 +1254,15 @@ class ReposAsyncBatchMoveItemView(APIView): error_msg = _(u'File %s is locked.') % dirent return api_error(status.HTTP_403_FORBIDDEN, error_msg) + # check sub folder permission + folder_permission_dict = get_sub_folder_permission_by_dir(request, + src_repo_id, src_parent_dir) + for dirent in src_dirents: + if dirent in folder_permission_dict.keys() and \ + folder_permission_dict[dirent] != 'rw': + error_msg = _(u"Can't move folder %s, please check its permission.") % dirent + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + # move file result = {} formated_src_dirents = [dirent.strip('/') for dirent in src_dirents] @@ -1464,6 +1473,15 @@ class ReposSyncBatchMoveItemView(APIView): error_msg = _(u'File %s is locked.') % dirent return api_error(status.HTTP_403_FORBIDDEN, error_msg) + # check sub folder permission + folder_permission_dict = get_sub_folder_permission_by_dir(request, + src_repo_id, src_parent_dir) + for dirent in src_dirents: + if dirent in folder_permission_dict.keys() and \ + folder_permission_dict[dirent] != 'rw': + error_msg = _(u"Can't move folder %s, please check its permission.") % dirent + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + # move file result = {} formated_src_dirents = [dirent.strip('/') for dirent in src_dirents] @@ -1483,3 +1501,87 @@ class ReposSyncBatchMoveItemView(APIView): result = {} result['success'] = True return Response(result) + + +class ReposBatchDeleteItemView(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAuthenticated, ) + throttle_classes = (UserRateThrottle, ) + + def delete(self, request): + """ Multi delete files/folders. + Permission checking: + 1. User must has `rw` permission for parent folder. + Parameter: + { + "repo_id":"7460f7ac-a0ff-4585-8906-bb5a57d2e118", + "parent_dir":"/a/b/c/", + "dirents":["1.md", "2.md"], + } + """ + + # argument check + repo_id = request.data.get('repo_id', None) + if not repo_id: + error_msg = 'repo_id invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + parent_dir = request.data.get('parent_dir', None) + if not parent_dir: + error_msg = 'parent_dir invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + dirents = request.data.get('dirents', None) + if not dirents: + error_msg = 'dirents invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check + if not seafile_api.get_repo(repo_id): + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_dir_id_by_path(repo_id, parent_dir): + error_msg = 'Folder %s not found.' % parent_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check + # User must has `rw` permission for parent dir. + if check_folder_permission(request, repo_id, parent_dir) != PERMISSION_READ_WRITE: + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # check locked files + username = request.user.username + locked_files = get_locked_files_by_dir(request, repo_id, parent_dir) + for dirent in dirents: + # file is locked and lock owner is not current user + if dirent in locked_files.keys() and \ + locked_files[dirent] != username: + error_msg = _(u'File %s is locked.') % dirent + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # check sub folder permission + folder_permission_dict = get_sub_folder_permission_by_dir(request, repo_id, parent_dir) + for dirent in dirents: + if dirent in folder_permission_dict.keys() and \ + folder_permission_dict[dirent] != 'rw': + error_msg = _(u"Can't delete folder %s, please check its permission.") % dirent + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # delete file + result = {} + formated_dirents = [dirent.strip('/') for dirent in dirents] + multi_dirents_str = "\t".join(formated_dirents) + + try: + seafile_api.del_file(repo_id, parent_dir, multi_dirents_str, username) + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + result = {} + result['success'] = True + return Response(result) diff --git a/seahub/urls.py b/seahub/urls.py index 6fbf20a91d..5a9e8b957a 100644 --- a/seahub/urls.py +++ b/seahub/urls.py @@ -46,7 +46,8 @@ from seahub.api2.endpoints.repos_batch import ReposBatchView, \ ReposBatchCopyDirView, ReposBatchCreateDirView, \ ReposBatchCopyItemView, ReposBatchMoveItemView, \ ReposAsyncBatchCopyItemView, ReposAsyncBatchMoveItemView, \ - ReposSyncBatchCopyItemView, ReposSyncBatchMoveItemView + ReposSyncBatchCopyItemView, ReposSyncBatchMoveItemView, \ + ReposBatchDeleteItemView from seahub.api2.endpoints.repos import RepoView, ReposView from seahub.api2.endpoints.file import FileView from seahub.api2.endpoints.file_history import FileHistoryView, NewFileHistoryView @@ -314,11 +315,14 @@ urlpatterns = [ url(r'^api/v2.1/revision-tags/tag-names/$', TagNamesView.as_view(), name='api-v2.1-revision-tags-tag-names'), ## user::repos-batch-operate + # for icourt url(r'^api/v2.1/repos/batch/$', ReposBatchView.as_view(), name='api-v2.1-repos-batch'), url(r'^api/v2.1/repos/batch-copy-dir/$', ReposBatchCopyDirView.as_view(), name='api-v2.1-repos-batch-copy-dir'), url(r'^api/v2.1/repos/batch-create-dir/$', ReposBatchCreateDirView.as_view(), name='api-v2.1-repos-batch-create-dir'), url(r'^api/v2.1/repos/batch-copy-item/$', ReposBatchCopyItemView.as_view(), name='api-v2.1-repos-batch-copy-item'), url(r'^api/v2.1/repos/batch-move-item/$', ReposBatchMoveItemView.as_view(), name='api-v2.1-repos-batch-move-item'), + + url(r'^api/v2.1/repos/batch-delete-item/$', ReposBatchDeleteItemView.as_view(), name='api-v2.1-repos-batch-delete-item'), url(r'^api/v2.1/repos/async-batch-copy-item/$', ReposAsyncBatchCopyItemView.as_view(), name='api-v2.1-repos-async-batch-copy-item'), url(r'^api/v2.1/repos/async-batch-move-item/$', ReposAsyncBatchMoveItemView.as_view(), name='api-v2.1-repos-async-batch-move-item'), url(r'^api/v2.1/repos/sync-batch-copy-item/$', ReposSyncBatchCopyItemView.as_view(), name='api-v2.1-repos-sync-batch-copy-item'), diff --git a/seahub/utils/repo.py b/seahub/utils/repo.py index 161528c6a2..91520e7b3f 100644 --- a/seahub/utils/repo.py +++ b/seahub/utils/repo.py @@ -1,5 +1,6 @@ # Copyright (c) 2012-2016 Seafile Ltd. # -*- coding: utf-8 -*- +import stat import logging from collections import namedtuple @@ -208,6 +209,30 @@ def get_locked_files_by_dir(request, repo_id, folder_path): return locked_files +def get_sub_folder_permission_by_dir(request, repo_id, parent_dir): + """ Get sub folder permission in a folder + + Returns: + A dict contains folder name and permission. + + folder_permission_dict = { + 'folder_name_1': 'r'; + 'folder_name_2': 'rw'; + ... + } + """ + username = request.user.username + dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir) + dirents = seafile_api.list_dir_with_perm(repo_id, + parent_dir, dir_id, username, -1, -1) + + folder_permission_dict = {} + for dirent in dirents: + if stat.S_ISDIR(dirent.mode): + folder_permission_dict[dirent.obj_name] = dirent.permission + + return folder_permission_dict + def get_shared_groups_by_repo(repo_id, org_id=None): if not org_id: group_ids = seafile_api.get_shared_group_ids_by_repo( diff --git a/tests/api/endpoints/test_repos_batch.py b/tests/api/endpoints/test_repos_batch.py index 10036efab2..f0232fc00c 100644 --- a/tests/api/endpoints/test_repos_batch.py +++ b/tests/api/endpoints/test_repos_batch.py @@ -1025,6 +1025,38 @@ class ReposAsyncBatchMoveItemView(BaseTestCase): json_resp = json.loads(resp.content) assert json_resp['error_msg'] == 'File %s is locked.' % admin_file_name + def test_move_with_r_permission_sub_folder(self): + + if not LOCAL_PRO_DEV_ENV: + return + + self.login_as(self.user) + + # share admin's tmp repo to user with 'rw' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'rw') + + # admin set 'r' sub folder permission + admin_folder_name = randstring(6) + seafile_api.post_dir(admin_repo_id, '/', admin_folder_name, self.admin_name) + seafile_api.add_folder_user_perm(admin_repo_id, '/' + + admin_folder_name, 'r', self.user_name) + + # user move r permission folder + data = { + "src_repo_id": admin_repo_id, + "src_parent_dir": '/', + "src_dirents":[admin_folder_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + json_resp = json.loads(resp.content) + assert json_resp['error_msg'] == "Can't move folder %s, please check its permission." % admin_folder_name + + class ReposSyncBatchCopyItemView(BaseTestCase): def create_new_repo(self, username): @@ -1577,7 +1609,7 @@ class ReposSyncBatchMoveItemView(BaseTestCase): self.login_as(self.user) - # share admin's tmp repo to user with 'r' permission + # share admin's tmp repo to user with 'rw' permission admin_repo_id = self.create_new_repo(self.admin_name) seafile_api.share_repo(admin_repo_id, self.admin_name, self.user_name, 'rw') @@ -1600,3 +1632,220 @@ class ReposSyncBatchMoveItemView(BaseTestCase): self.assertEqual(403, resp.status_code) json_resp = json.loads(resp.content) assert json_resp['error_msg'] == 'File %s is locked.' % admin_file_name + + def test_move_with_r_permission_sub_folder(self): + + if not LOCAL_PRO_DEV_ENV: + return + + self.login_as(self.user) + + # share admin's tmp repo to user with 'rw' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'rw') + + # admin set 'r' sub folder permission + admin_folder_name = randstring(6) + seafile_api.post_dir(admin_repo_id, '/', admin_folder_name, self.admin_name) + seafile_api.add_folder_user_perm(admin_repo_id, '/' + + admin_folder_name, 'r', self.user_name) + + # user move r permission folder + data = { + "src_repo_id": admin_repo_id, + "src_parent_dir": '/', + "src_dirents":[admin_folder_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + json_resp = json.loads(resp.content) + assert json_resp['error_msg'] == "Can't move folder %s, please check its permission." % admin_folder_name + +class ReposBatchDeleteItemView(BaseTestCase): + + def create_new_repo(self, username): + new_repo_id = seafile_api.create_repo(name=randstring(10), + desc='', username=username, passwd=None) + + return new_repo_id + + def setUp(self): + self.user_name = self.user.username + self.admin_name = self.admin.username + + self.repo_id = self.repo.id + + self.file_path = self.file + self.file_name = os.path.basename(self.file_path) + + self.folder_path = self.folder + self.folder_name = os.path.basename(self.folder) + + self.url = reverse('api-v2.1-repos-batch-delete-item') + + def tearDown(self): + self.remove_repo(self.repo_id) + + def test_can_delete(self): + + # items in parent folder + assert seafile_api.get_dir_id_by_path(self.repo_id, self.folder_path) != None + assert seafile_api.get_file_id_by_path(self.repo_id, self.file_path) != None + + self.login_as(self.user) + + data = { + "repo_id": self.repo_id, + "parent_dir": '/', + "dirents":[self.folder_name, self.file_name], + } + + resp = self.client.delete(self.url, json.dumps(data), + 'application/json') + self.assertEqual(200, resp.status_code) + + # items NOT in parent folder + assert seafile_api.get_dir_id_by_path(self.repo_id, self.folder_path) is None + assert seafile_api.get_file_id_by_path(self.repo_id, self.file_path) is None + + def test_delete_with_invalid_parameter(self): + + self.login_as(self.user) + + data = { + "parent_dir": '/', + "dirents":[self.folder_name, self.file_name], + } + resp = self.client.delete(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "repo_id": self.repo_id, + "dirents":[self.folder_name, self.file_name], + } + resp = self.client.delete(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "repo_id": self.repo_id, + "parent_dir": '/', + } + resp = self.client.delete(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + def test_delete_with_repo_not_exist(self): + + self.login_as(self.user) + + invalid_repo_id = 'd53fe97e-919a-42f9-a29f-042d285ba6fb' + data = { + "repo_id": invalid_repo_id, + "parent_dir": '/', + "dirents":[self.folder_name, self.file_name], + } + resp = self.client.delete(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + def test_delete_with_folder_not_exist(self): + + self.login_as(self.user) + + data = { + "repo_id": self.repo_id, + "parent_dir": 'invalid_folder', + "dirents":[self.folder_name, self.file_name], + } + resp = self.client.delete(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + def test_delete_with_invalid_repo_permission(self): + + tmp_repo_id = self.create_new_repo(self.admin_name) + + self.login_as(self.user) + + data = { + "repo_id": tmp_repo_id, + "parent_dir": '/', + "dirents":[self.folder_name, self.file_name], + } + resp = self.client.delete(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + + def test_delete_with_invalid_parent_folder_permission(self): + + self.login_as(self.user) + + # share admin's tmp repo to user with 'r' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'r') + data = { + "repo_id": admin_repo_id, + "parent_dir": '/', + "dirents":[self.folder_name, self.file_name], + } + resp = self.client.delete(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + self.remove_repo(admin_repo_id) + + def test_delete_with_locked_file(self): + + if not LOCAL_PRO_DEV_ENV: + return + + self.login_as(self.user) + + # share admin's tmp repo to user with 'r' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'rw') + + # admin lock file + admin_file_name = randstring(6) + seafile_api.post_empty_file(admin_repo_id, '/', admin_file_name, + self.admin_name) + seafile_api.lock_file(admin_repo_id, admin_file_name, self.admin_name, 0) + + # user move locked file + data = { + "repo_id": admin_repo_id, + "parent_dir": '/', + "dirents":[admin_file_name], + } + resp = self.client.delete(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + json_resp = json.loads(resp.content) + assert json_resp['error_msg'] == 'File %s is locked.' % admin_file_name + + def test_delete_with_r_permission_sub_folder(self): + + if not LOCAL_PRO_DEV_ENV: + return + + self.login_as(self.user) + + # share admin's tmp repo to user with 'r' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'rw') + + # admin set 'r' sub folder permission + admin_folder_name = randstring(6) + seafile_api.post_dir(admin_repo_id, '/', admin_folder_name, self.admin_name) + seafile_api.add_folder_user_perm(admin_repo_id, '/' + + admin_folder_name, 'r', self.user_name) + + # user move locked file + data = { + "repo_id": admin_repo_id, + "parent_dir": '/', + "dirents":[admin_folder_name], + } + resp = self.client.delete(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + json_resp = json.loads(resp.content) + assert json_resp['error_msg'] == "Can't delete folder %s, please check its permission." % admin_folder_name