diff --git a/seahub/api2/endpoints/repos_batch.py b/seahub/api2/endpoints/repos_batch.py index ad4a256a10..c899078597 100644 --- a/seahub/api2/endpoints/repos_batch.py +++ b/seahub/api2/endpoints/repos_batch.py @@ -33,8 +33,7 @@ from seahub.utils.repo import get_repo_owner, get_available_repo_perms, \ from seahub.views import check_folder_permission from seahub.settings import MAX_PATH -from seahub.constants import PERMISSION_READ, PERMISSION_READ_WRITE, \ - PERMISSION_ADMIN +from seahub.constants import PERMISSION_READ_WRITE, PERMISSION_READ logger = logging.getLogger(__name__) @@ -1064,3 +1063,204 @@ class ReposBatchMoveItemView(APIView): result['success'].append(common_dict) return Response(result) + + +class ReposAsyncBatchCopyItemView(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAuthenticated, ) + throttle_classes = (UserRateThrottle, ) + + def post(self, request): + """ Asynchronous multi copy files/folders. + Permission checking: + 1. User must has `r/rw` permission for src folder. + 2. User must has `rw` permission for dst folder. + + Parameter: + { + "src_repo_id":"7460f7ac-a0ff-4585-8906-bb5a57d2e118", + "src_parent_dir":"/a/b/c/", + "src_dirents":["1.md", "2.md"], + + "dst_repo_id":"a3fa768d-0f00-4343-8b8d-07b4077881db", + "dst_parent_dir":"/x/y/", + } + """ + + # argument check + src_repo_id = request.data.get('src_repo_id', None) + if not src_repo_id: + error_msg = 'src_repo_id invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + src_parent_dir = request.data.get('src_parent_dir', None) + if not src_parent_dir: + error_msg = 'src_parent_dir invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + src_dirents = request.data.get('src_dirents', None) + if not src_dirents: + error_msg = 'src_dirents invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + dst_repo_id = request.data.get('dst_repo_id', None) + if not dst_repo_id: + error_msg = 'dst_repo_id invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + dst_parent_dir = request.data.get('dst_parent_dir', None) + if not dst_parent_dir: + error_msg = 'dst_parent_dir invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check + if not seafile_api.get_repo(src_repo_id): + error_msg = 'Library %s not found.' % src_repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_dir_id_by_path(src_repo_id, src_parent_dir): + error_msg = 'Folder %s not found.' % src_parent_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_repo(dst_repo_id): + error_msg = 'Library %s not found.' % dst_repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_dir_id_by_path(dst_repo_id, dst_parent_dir): + error_msg = 'Folder %s not found.' % dst_parent_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check + # 1. User must has `r/rw` permission for src parent dir. + src_parent_permission = check_folder_permission(request, src_repo_id, src_parent_dir) + if src_parent_permission not in (PERMISSION_READ_WRITE, + PERMISSION_READ): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # 2. User must has `rw` permission for dst parent dir. + dst_parent_permission = check_folder_permission(request, dst_repo_id, dst_parent_dir) + if dst_parent_permission != PERMISSION_READ_WRITE: + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + result = {} + username = request.user.username + + formated_src_dirents = [dirent.strip('/') for dirent in src_dirents] + src_multi = "\t".join(formated_src_dirents) + dst_multi = "\t".join(formated_src_dirents) + + try: + res = seafile_api.copy_file(src_repo_id, src_parent_dir, src_multi, + dst_repo_id, dst_parent_dir, dst_multi, + username=username, need_progress=1, + synchronous=0) + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + result = {} + result['task_id'] = res.task_id if res.background else '' + return Response(result) + + +class ReposAsyncBatchMoveItemView(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAuthenticated, ) + throttle_classes = (UserRateThrottle, ) + + def post(self, request): + """ Asynchronous multi move files/folders. + Permission checking: + 1. User must has `rw` permission for src folder. + 2. User must has `rw` permission for dst folder. + + Parameter: + { + "src_repo_id":"7460f7ac-a0ff-4585-8906-bb5a57d2e118", + "src_parent_dir":"/a/b/c/", + "src_dirents":["1.md", "2.md"], + + "dst_repo_id":"a3fa768d-0f00-4343-8b8d-07b4077881db", + "dst_parent_dir":"/x/y/", + } + """ + + # argument check + src_repo_id = request.data.get('src_repo_id', None) + if not src_repo_id: + error_msg = 'src_repo_id invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + src_parent_dir = request.data.get('src_parent_dir', None) + if not src_parent_dir: + error_msg = 'src_parent_dir invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + src_dirents = request.data.get('src_dirents', None) + if not src_dirents: + error_msg = 'src_dirents invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + dst_repo_id = request.data.get('dst_repo_id', None) + if not dst_repo_id: + error_msg = 'dst_repo_id invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + dst_parent_dir = request.data.get('dst_parent_dir', None) + if not dst_parent_dir: + error_msg = 'dst_parent_dir invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check + if not seafile_api.get_repo(src_repo_id): + error_msg = 'Library %s not found.' % src_repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_dir_id_by_path(src_repo_id, src_parent_dir): + error_msg = 'Folder %s not found.' % src_parent_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_repo(dst_repo_id): + error_msg = 'Library %s not found.' % dst_repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_dir_id_by_path(dst_repo_id, dst_parent_dir): + error_msg = 'Folder %s not found.' % dst_parent_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check + # 1. User must has `rw` permission for src parent dir. + if check_folder_permission(request, src_repo_id, src_parent_dir) != PERMISSION_READ_WRITE: + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # 2. User must has `rw` permission for dst parent dir. + if check_folder_permission(request, dst_repo_id, dst_parent_dir) != PERMISSION_READ_WRITE: + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + result = {} + username = request.user.username + + formated_src_dirents = [dirent.strip('/') for dirent in src_dirents] + src_multi = "\t".join(formated_src_dirents) + dst_multi = "\t".join(formated_src_dirents) + + try: + res = seafile_api.move_file(src_repo_id, src_parent_dir, src_multi, + dst_repo_id, dst_parent_dir, dst_multi, + replace=False, username=username, + need_progress=1, synchronous=0) + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + result = {} + result['task_id'] = res.task_id if res.background else '' + return Response(result) diff --git a/seahub/urls.py b/seahub/urls.py index e9fd491d5a..858325730d 100644 --- a/seahub/urls.py +++ b/seahub/urls.py @@ -44,7 +44,8 @@ from seahub.api2.endpoints.upload_links import UploadLinks, UploadLink, \ UploadLinkUpload from seahub.api2.endpoints.repos_batch import ReposBatchView, \ ReposBatchCopyDirView, ReposBatchCreateDirView, \ - ReposBatchCopyItemView, ReposBatchMoveItemView + ReposBatchCopyItemView, ReposBatchMoveItemView, \ + ReposAsyncBatchCopyItemView, ReposAsyncBatchMoveItemView from seahub.api2.endpoints.repos import RepoView, ReposView from seahub.api2.endpoints.file import FileView from seahub.api2.endpoints.file_history import FileHistoryView, NewFileHistoryView @@ -313,6 +314,8 @@ urlpatterns = [ url(r'^api/v2.1/repos/batch-create-dir/$', ReposBatchCreateDirView.as_view(), name='api-v2.1-repos-batch-create-dir'), url(r'^api/v2.1/repos/batch-copy-item/$', ReposBatchCopyItemView.as_view(), name='api-v2.1-repos-batch-copy-item'), url(r'^api/v2.1/repos/batch-move-item/$', ReposBatchMoveItemView.as_view(), name='api-v2.1-repos-batch-move-item'), + url(r'^api/v2.1/repos/async-batch-copy-item/$', ReposAsyncBatchCopyItemView.as_view(), name='api-v2.1-repos-async-batch-copy-item'), + url(r'^api/v2.1/repos/async-batch-move-item/$', ReposAsyncBatchMoveItemView.as_view(), name='api-v2.1-repos-async-batch-move-item'), ## user::deleted repos url(r'^api/v2.1/deleted-repos/$', DeletedRepos.as_view(), name='api2-v2.1-deleted-repos'), diff --git a/tests/api/endpoints/test_repos_batch.py b/tests/api/endpoints/test_repos_batch.py index dc37fa9246..91248400c7 100644 --- a/tests/api/endpoints/test_repos_batch.py +++ b/tests/api/endpoints/test_repos_batch.py @@ -421,3 +421,564 @@ class ReposBatchCreateDirViewTest(BaseTestCase): path_2) is not None assert seafile_api.get_dir_id_by_path(self.repo_id, path_3) is not None + + +class ReposAsyncBatchCopyItemView(BaseTestCase): + + def create_new_repo(self, username): + new_repo_id = seafile_api.create_repo(name=randstring(10), + desc='', username=username, passwd=None) + + return new_repo_id + + def setUp(self): + self.user_name = self.user.username + self.admin_name = self.admin.username + + self.src_repo_id = self.repo.id + self.dst_repo_id = self.create_new_repo(self.user_name) + + self.file_path = self.file + self.file_name = os.path.basename(self.file_path) + + self.folder_path = self.folder + self.folder_name = os.path.basename(self.folder) + + self.url = reverse('api-v2.1-repos-async-batch-copy-item') + + def tearDown(self): + self.remove_repo(self.src_repo_id) + self.remove_repo(self.dst_repo_id) + + def test_can_copy(self): + + self.login_as(self.user) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + + resp = self.client.post(self.url, json.dumps(data), + 'application/json') + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + task_id = json_resp['task_id'] + assert len(task_id) == 36 + +# progress_url = reverse('api-v2.1-query-copy-move-progress') + '?task_id=%s' % task_id +# count = 1 +# while True: +# count += 1 +# resp = self.client.get(progress_url) +# json_resp = json.loads(resp.content) +# if json_resp['done'] == 1 or count == 10: +# break +# +# # items remain in src folder +# assert seafile_api.get_dir_id_by_path(self.src_repo_id, self.folder_path) is not None +# assert seafile_api.get_file_id_by_path(self.src_repo_id, self.file_path) is not None +# +# # items in dst folder +# assert seafile_api.get_file_id_by_path(self.dst_repo_id, self.file_path) is not None +# assert seafile_api.get_dir_id_by_path(self.dst_repo_id, self.folder_path) is not None + + def test_copy_with_invalid_parameter(self): + + self.login_as(self.user) + + data = { + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + def test_copy_with_repo_not_exist(self): + + self.login_as(self.user) + + invalid_repo_id = 'd53fe97e-919a-42f9-a29f-042d285ba6fb' + data = { + "src_repo_id": invalid_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + invalid_repo_id = 'd53fe97e-919a-42f9-a29f-042d285ba6fb' + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": invalid_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + def test_copy_with_folder_not_exist(self): + + self.login_as(self.user) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": 'invalid_folder', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": 'invalid_folder', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + def test_copy_with_invalid_repo_permission(self): + + tmp_repo_id = self.create_new_repo(self.admin_name) + + self.login_as(self.user) + + data = { + "src_repo_id": tmp_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": tmp_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + + def test_copy_with_invalid_src_folder_permission(self): + + self.login_as(self.user) + + # share admin's tmp repo to user with 'cloud-edit' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'cloud-edit') + data = { + "src_repo_id": admin_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.src_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + self.remove_repo(admin_repo_id) + + # share admin's tmp repo to user with 'preivew' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'preview') + data = { + "src_repo_id": admin_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.src_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + self.remove_repo(admin_repo_id) + + def test_copy_with_invalid_dst_folder_permission(self): + + self.login_as(self.user) + + # share admin's tmp repo to user with 'r' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'r') + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": admin_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + + self.remove_repo(admin_repo_id) + + # share admin's tmp repo to user with 'cloud-edit' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'cloud-edit') + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": admin_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + self.remove_repo(admin_repo_id) + + # share admin's tmp repo to user with 'preview' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'preivew') + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": admin_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + self.remove_repo(admin_repo_id) + + +class ReposAsyncBatchMoveItemView(BaseTestCase): + + def create_new_repo(self, username): + new_repo_id = seafile_api.create_repo(name=randstring(10), + desc='', username=username, passwd=None) + + return new_repo_id + + def setUp(self): + self.user_name = self.user.username + self.admin_name = self.admin.username + + self.src_repo_id = self.repo.id + self.dst_repo_id = self.create_new_repo(self.user_name) + + self.file_path = self.file + self.file_name = os.path.basename(self.file_path) + + self.folder_path = self.folder + self.folder_name = os.path.basename(self.folder) + + self.url = reverse('api-v2.1-repos-async-batch-move-item') + + def tearDown(self): + self.remove_repo(self.src_repo_id) + self.remove_repo(self.dst_repo_id) + + def test_can_move(self): + + self.login_as(self.user) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + + resp = self.client.post(self.url, json.dumps(data), + 'application/json') + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + task_id = json_resp['task_id'] + assert len(task_id) == 36 + +# progress_url = reverse('api-v2.1-query-copy-move-progress') + '?task_id=%s' % task_id +# count = 1 +# while True: +# count += 1 +# resp = self.client.get(progress_url) +# json_resp = json.loads(resp.content) +# if json_resp['done'] == 1 or count == 10: +# break +# +# # items NOT in src folder +# assert seafile_api.get_dir_id_by_path(self.src_repo_id, self.folder_path) is None +# assert seafile_api.get_file_id_by_path(self.src_repo_id, self.file_path) is None +# +# # items in dst folder +# assert seafile_api.get_file_id_by_path(self.dst_repo_id, self.file_path) is not None +# assert seafile_api.get_dir_id_by_path(self.dst_repo_id, self.folder_path) is not None + + def test_move_with_invalid_parameter(self): + + self.login_as(self.user) + + data = { + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(400, resp.status_code) + + def test_move_with_repo_not_exist(self): + + self.login_as(self.user) + + invalid_repo_id = 'd53fe97e-919a-42f9-a29f-042d285ba6fb' + data = { + "src_repo_id": invalid_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + invalid_repo_id = 'd53fe97e-919a-42f9-a29f-042d285ba6fb' + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": invalid_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + def test_move_with_folder_not_exist(self): + + self.login_as(self.user) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": 'invalid_folder', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": 'invalid_folder', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(404, resp.status_code) + + def test_move_with_invalid_repo_permission(self): + + tmp_repo_id = self.create_new_repo(self.admin_name) + + self.login_as(self.user) + + data = { + "src_repo_id": tmp_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.dst_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": tmp_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + + def test_move_with_invalid_src_folder_permission(self): + + self.login_as(self.user) + + # share admin's tmp repo to user with 'r' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'r') + data = { + "src_repo_id": admin_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.src_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + self.remove_repo(admin_repo_id) + + # share admin's tmp repo to user with 'cloud-edit' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'cloud-edit') + data = { + "src_repo_id": admin_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.src_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + self.remove_repo(admin_repo_id) + + # share admin's tmp repo to user with 'preivew' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'preview') + data = { + "src_repo_id": admin_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": self.src_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + self.remove_repo(admin_repo_id) + + def test_move_with_invalid_dst_folder_permission(self): + + self.login_as(self.user) + + # share admin's tmp repo to user with 'r' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'r') + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": admin_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + + # share admin's tmp repo to user with 'cloud-edit' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'cloud-edit') + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": admin_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code) + + # share admin's tmp repo to user with 'preview' permission + admin_repo_id = self.create_new_repo(self.admin_name) + seafile_api.share_repo(admin_repo_id, self.admin_name, + self.user_name, 'preview') + data = { + "src_repo_id": self.src_repo_id, + "src_parent_dir": '/', + "src_dirents":[self.folder_name, self.file_name], + "dst_repo_id": admin_repo_id, + "dst_parent_dir": '/', + } + resp = self.client.post(self.url, json.dumps(data), 'application/json') + self.assertEqual(403, resp.status_code)