diff --git a/seahub/api2/views.py b/seahub/api2/views.py index add96ac036..855ba9d00c 100644 --- a/seahub/api2/views.py +++ b/seahub/api2/views.py @@ -66,7 +66,8 @@ from seahub.utils import gen_file_get_url, gen_token, gen_file_upload_url, \ gen_file_share_link, gen_dir_share_link, is_org_context, gen_shared_link, \ get_org_user_events, calculate_repos_last_modify, send_perm_audit_msg, \ gen_shared_upload_link, convert_cmmt_desc_link, is_valid_dirent_name, \ - is_org_repo_creation_allowed, is_windows_operating_system + is_org_repo_creation_allowed, is_windows_operating_system, \ + get_no_duplicate_obj_name from seahub.utils.devices import do_unlink_device from seahub.utils.repo import get_sub_repo_abbrev_origin_path from seahub.utils.star import star_file, unstar_file @@ -1671,11 +1672,8 @@ class OpDeleteView(APIView): return api_error(status.HTTP_404_NOT_FOUND, 'File or directory not found.') - multi_files = '' - for file_name in file_names.split(':'): - multi_files += file_name + '\t' - try: + multi_files = "\t".join(file_names.split(':')) seafile_api.del_file(repo_id, parent_dir, multi_files, username) except SearpcError as e: @@ -1694,20 +1692,41 @@ class OpMoveView(APIView): def post(self, request, repo_id, format=None): - repo = get_repo(repo_id) - if not repo: - return api_error(status.HTTP_404_NOT_FOUND, 'Library not found.') - username = request.user.username parent_dir = request.GET.get('p', '/') dst_repo = request.POST.get('dst_repo', None) dst_dir = request.POST.get('dst_dir', None) - file_names = request.POST.get("file_names", None) + obj_names = request.POST.get("file_names", None) - if not parent_dir or not file_names or not dst_repo or not dst_dir: + # argument check + if not parent_dir or not obj_names or not dst_repo or not dst_dir: return api_error(status.HTTP_400_BAD_REQUEST, 'Missing argument.') + if repo_id == dst_repo and parent_dir == dst_dir: + return api_error(status.HTTP_400_BAD_REQUEST, + 'The destination directory is the same as the source.') + + # src resource check + repo = get_repo(repo_id) + if not repo: + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_dir_id_by_path(repo_id, parent_dir): + error_msg = 'Folder %s not found.' % parent_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # dst resource check + if not get_repo(dst_repo): + error_msg = 'Library %s not found.' % dst_repo + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_dir_id_by_path(dst_repo, dst_dir): + error_msg = 'Folder %s not found.' % dst_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check if check_folder_permission(request, repo_id, parent_dir) != 'rw': return api_error(status.HTTP_403_FORBIDDEN, 'You do not have permission to move file in this folder.') @@ -1716,31 +1735,47 @@ class OpMoveView(APIView): return api_error(status.HTTP_403_FORBIDDEN, 'You do not have permission to move file to destination folder.') - if repo_id == dst_repo and parent_dir == dst_dir: + # check if all file/dir existes + obj_names = obj_names.strip(':').split(':') + dirents = seafile_api.list_dir_by_path(repo_id, parent_dir) + exist_obj_names = [dirent.obj_name for dirent in dirents] + if not set(obj_names).issubset(exist_obj_names): return api_error(status.HTTP_400_BAD_REQUEST, - 'The destination directory is the same as the source.') + 'file_names invalid.') + + # make new name + dst_dirents = seafile_api.list_dir_by_path(dst_repo, dst_dir) + dst_obj_names = [dirent.obj_name for dirent in dst_dirents] + + new_obj_names = [] + for obj_name in obj_names: + new_obj_name = get_no_duplicate_obj_name(obj_name, dst_obj_names) + new_obj_names.append(new_obj_name) + + # move file + try: + src_multi_objs = "\t".join(obj_names) + dst_multi_objs = "\t".join(new_obj_names) + + seafile_api.move_file(repo_id, parent_dir, src_multi_objs, + dst_repo, dst_dir, dst_multi_objs, replace=False, + username=username, need_progress=0, synchronous=1) + except SearpcError as e: + logger.error(e) + return api_error(HTTP_520_OPERATION_FAILED, + "Failed to move file.") obj_info_list = [] - for file_name in file_names.split(':'): - new_filename = check_filename_with_rename(dst_repo, dst_dir, file_name) - try: - seafile_api.move_file(repo_id, parent_dir, file_name, - dst_repo, dst_dir, new_filename, - replace=False, username=username, - need_progress=0, synchronous=1) - except SearpcError as e: - logger.error(e) - return api_error(HTTP_520_OPERATION_FAILED, - "Failed to move file.") - + for new_obj_name in new_obj_names: obj_info = {} obj_info['repo_id'] = dst_repo obj_info['parent_dir'] = dst_dir - obj_info['obj_name'] = new_filename + obj_info['obj_name'] = new_obj_name obj_info_list.append(obj_info) return reloaddir_if_necessary(request, repo, parent_dir, obj_info_list) + class OpCopyView(APIView): """ Copy files. @@ -1750,20 +1785,41 @@ class OpCopyView(APIView): def post(self, request, repo_id, format=None): - repo = get_repo(repo_id) - if not repo: - return api_error(status.HTTP_404_NOT_FOUND, 'Library not found.') - username = request.user.username parent_dir = request.GET.get('p', '/') dst_repo = request.POST.get('dst_repo', None) dst_dir = request.POST.get('dst_dir', None) - file_names = request.POST.get("file_names", None) + obj_names = request.POST.get("file_names", None) - if not parent_dir or not file_names or not dst_repo or not dst_dir: + # argument check + if not parent_dir or not obj_names or not dst_repo or not dst_dir: return api_error(status.HTTP_400_BAD_REQUEST, 'Missing argument.') + if repo_id == dst_repo and parent_dir == dst_dir: + return api_error(status.HTTP_400_BAD_REQUEST, + 'The destination directory is the same as the source.') + + # src resource check + repo = get_repo(repo_id) + if not repo: + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_dir_id_by_path(repo_id, parent_dir): + error_msg = 'Folder %s not found.' % parent_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # dst resource check + if not get_repo(dst_repo): + error_msg = 'Library %s not found.' % dst_repo + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not seafile_api.get_dir_id_by_path(dst_repo, dst_dir): + error_msg = 'Folder %s not found.' % dst_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check if check_folder_permission(request, repo_id, parent_dir) is None: return api_error(status.HTTP_403_FORBIDDEN, 'You do not have permission to copy file of this folder.') @@ -1772,29 +1828,41 @@ class OpCopyView(APIView): return api_error(status.HTTP_403_FORBIDDEN, 'You do not have permission to copy file to destination folder.') - if not get_repo(dst_repo): - return api_error(status.HTTP_404_NOT_FOUND, 'Library not found.') + # check if all file/dir existes + obj_names = obj_names.strip(':').split(':') + dirents = seafile_api.list_dir_by_path(repo_id, parent_dir) + exist_obj_names = [dirent.obj_name for dirent in dirents] + if not set(obj_names).issubset(exist_obj_names): + return api_error(status.HTTP_400_BAD_REQUEST, + 'file_names invalid.') - if seafile_api.get_dir_id_by_path(repo_id, parent_dir) is None or \ - seafile_api.get_dir_id_by_path(dst_repo, dst_dir) is None: - return api_error(status.HTTP_400_BAD_REQUEST, 'Path does not exist.') + # make new name + dst_dirents = seafile_api.list_dir_by_path(dst_repo, dst_dir) + dst_obj_names = [dirent.obj_name for dirent in dst_dirents] + + new_obj_names = [] + for obj_name in obj_names: + new_obj_name = get_no_duplicate_obj_name(obj_name, dst_obj_names) + new_obj_names.append(new_obj_name) + + # copy file + try: + src_multi_objs = "\t".join(obj_names) + dst_multi_objs = "\t".join(new_obj_names) + + seafile_api.copy_file(repo_id, parent_dir, src_multi_objs, + dst_repo, dst_dir, dst_multi_objs, username, 0, synchronous=1) + except SearpcError as e: + logger.error(e) + return api_error(HTTP_520_OPERATION_FAILED, + "Failed to copy file.") obj_info_list = [] - for file_name in file_names.split(':'): - new_filename = check_filename_with_rename(dst_repo, dst_dir, file_name) - try: - seafile_api.copy_file(repo_id, parent_dir, file_name, - dst_repo, dst_dir, new_filename, - username, 0, synchronous=1) - except SearpcError as e: - logger.error(e) - return api_error(HTTP_520_OPERATION_FAILED, - "Failed to copy file.") - + for new_obj_name in new_obj_names: obj_info = {} obj_info['repo_id'] = dst_repo obj_info['parent_dir'] = dst_dir - obj_info['obj_name'] = new_filename + obj_info['obj_name'] = new_obj_name obj_info_list.append(obj_info) return reloaddir_if_necessary(request, repo, parent_dir, obj_info_list) diff --git a/seahub/utils/__init__.py b/seahub/utils/__init__.py index 79feffcd18..c6e936f893 100644 --- a/seahub/utils/__init__.py +++ b/seahub/utils/__init__.py @@ -267,39 +267,44 @@ def is_ldap_user(user): """ return user.source == 'LDAP' or user.source == 'LDAPImport' -def check_filename_with_rename(repo_id, parent_dir, filename): +def get_no_duplicate_obj_name(obj_name, exist_obj_names): + + def no_duplicate(obj_name): + for exist_obj_name in exist_obj_names: + if exist_obj_name == obj_name: + return False + return True + + def make_new_name(obj_name, i): + base, ext = os.path.splitext(obj_name) + if ext: + new_base = "%s (%d)" % (base, i) + return new_base + ext + else: + return "%s (%d)" % (obj_name, i) + + if no_duplicate(obj_name): + return obj_name + else: + i = 1 + while True: + new_name = make_new_name(obj_name, i) + if no_duplicate(new_name): + return new_name + else: + i += 1 + +def check_filename_with_rename(repo_id, parent_dir, obj_name): cmmts = seafile_api.get_commit_list(repo_id, 0, 1) latest_commit = cmmts[0] if cmmts else None if not latest_commit: return '' # TODO: what if parrent_dir does not exist? - dirents = seafile_api.list_dir_by_commit_and_path(repo_id, latest_commit.id, - parent_dir.encode('utf-8')) + dirents = seafile_api.list_dir_by_commit_and_path(repo_id, + latest_commit.id, parent_dir.encode('utf-8')) - def no_duplicate(name): - for dirent in dirents: - if dirent.obj_name == name: - return False - return True - - def make_new_name(filename, i): - base, ext = os.path.splitext(filename) - if ext: - new_base = "%s (%d)" % (base, i) - return new_base + ext - else: - return "%s (%d)" % (filename, i) - - if no_duplicate(filename): - return filename - else: - i = 1 - while True: - new_name = make_new_name (filename, i) - if no_duplicate(new_name): - return new_name - else: - i += 1 + exist_obj_names = [dirent.obj_name for dirent in dirents] + return get_no_duplicate_obj_name(obj_name, exist_obj_names) def get_user_repos(username, org_id=None): """ diff --git a/tests/api/test_file_ops.py b/tests/api/test_file_ops.py index 4edbf13ed4..df24512333 100644 --- a/tests/api/test_file_ops.py +++ b/tests/api/test_file_ops.py @@ -56,145 +56,123 @@ class FileOpsApiTest(BaseTestCase): def tearDown(self): self.remove_repo() - def test_can_move(self): - self.login_as(self.user) - - file_name = self.create_new_file() - - # check old file name exists in src repo - assert file_name in self.get_dirent_name_list(self.repo_id) - - dst_repo_id = self.create_new_repo() - data = { - 'file_names': file_name, - 'dst_repo': dst_repo_id, - 'dst_dir': '/', - } - - ### copy for first time ### - renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name) - resp = self.client.post(self.copy_url, data) - json_resp = json.loads(resp.content) - - self.assertEqual(200, resp.status_code) - assert json_resp[0]['obj_name'] == renamed_name - assert json_resp[0]['repo_id'] == dst_repo_id - assert json_resp[0]['parent_dir'] == '/' - - # check old file still existes - assert file_name in self.get_dirent_name_list(self.repo_id) - # check old file has been copyd to dst repo with a new name - assert renamed_name in self.get_dirent_name_list(dst_repo_id) - - ### copy for second time ### - renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name) - resp = self.client.post(self.copy_url, data) - json_resp = json.loads(resp.content) - - self.assertEqual(200, resp.status_code) - assert json_resp[0]['obj_name'] == renamed_name - assert json_resp[0]['repo_id'] == dst_repo_id - assert json_resp[0]['parent_dir'] == '/' - - # check old file still exists in src repo - assert file_name in self.get_dirent_name_list(self.repo_id) - # check old file has been copyd to dst repo with a new name - assert renamed_name in self.get_dirent_name_list(dst_repo_id) - - ### copy for third time ### - renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name) - resp = self.client.post(self.copy_url, data) - json_resp = json.loads(resp.content) - - self.assertEqual(200, resp.status_code) - assert json_resp[0]['obj_name'] == renamed_name - assert json_resp[0]['repo_id'] == dst_repo_id - assert json_resp[0]['parent_dir'] == '/' - - # check old file still exists in src repo - assert file_name in self.get_dirent_name_list(self.repo_id) - # check old file has been copyd to dst repo with a new name - assert renamed_name in self.get_dirent_name_list(dst_repo_id) - - ### then move ### - renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name) - resp = self.client.post(self.move_url, data) - json_resp = json.loads(resp.content) - - self.assertEqual(200, resp.status_code) - assert json_resp[0]['obj_name'] == renamed_name - assert json_resp[0]['repo_id'] == dst_repo_id - assert json_resp[0]['parent_dir'] == '/' - - # check old file NOT exists in src repo - assert file_name not in self.get_dirent_name_list(self.repo_id) - # check old file has been copyd to dst repo with a new name - assert renamed_name in self.get_dirent_name_list(dst_repo_id) - - self.remove_repo(dst_repo_id) - def test_can_copy(self): self.login_as(self.user) - file_name = self.create_new_file() + file_name_1 = self.create_new_file() + file_name_2 = self.create_new_file() # check old file name exists in src repo - assert file_name in self.get_dirent_name_list(self.repo_id) + assert file_name_1 in self.get_dirent_name_list(self.repo_id) + assert file_name_2 in self.get_dirent_name_list(self.repo_id) dst_repo_id = self.create_new_repo() + renamed_name_1 = check_filename_with_rename(dst_repo_id, '/', file_name_1) + renamed_name_2 = check_filename_with_rename(dst_repo_id, '/', file_name_2) + data = { - 'file_names': file_name, + 'file_names': file_name_1 + ':' + file_name_2, 'dst_repo': dst_repo_id, 'dst_dir': '/', } ### copy for first time ### - renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name) resp = self.client.post(self.copy_url, data) json_resp = json.loads(resp.content) - self.assertEqual(200, resp.status_code) - assert json_resp[0]['obj_name'] == renamed_name + assert json_resp[0]['repo_id'] == dst_repo_id assert json_resp[0]['parent_dir'] == '/' + assert json_resp[0]['obj_name'] == renamed_name_1 + assert json_resp[1]['obj_name'] == renamed_name_2 # check old file still existes - assert file_name in self.get_dirent_name_list(self.repo_id) + assert file_name_1 in self.get_dirent_name_list(self.repo_id) + assert file_name_2 in self.get_dirent_name_list(self.repo_id) # check old file has been copyd to dst repo with a new name - assert renamed_name in self.get_dirent_name_list(dst_repo_id) + assert renamed_name_1 in self.get_dirent_name_list(dst_repo_id) + assert renamed_name_2 in self.get_dirent_name_list(dst_repo_id) ### copy for second time ### - renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name) + renamed_name_1 = check_filename_with_rename(dst_repo_id, '/', file_name_1) + renamed_name_2 = check_filename_with_rename(dst_repo_id, '/', file_name_2) + resp = self.client.post(self.copy_url, data) json_resp = json.loads(resp.content) - self.assertEqual(200, resp.status_code) - assert json_resp[0]['obj_name'] == renamed_name + assert json_resp[0]['repo_id'] == dst_repo_id assert json_resp[0]['parent_dir'] == '/' + assert json_resp[0]['obj_name'] == renamed_name_1 + assert json_resp[1]['obj_name'] == renamed_name_2 - # check old file still exists in src repo - assert file_name in self.get_dirent_name_list(self.repo_id) + # check old file still existes + assert file_name_1 in self.get_dirent_name_list(self.repo_id) + assert file_name_2 in self.get_dirent_name_list(self.repo_id) # check old file has been copyd to dst repo with a new name - assert renamed_name in self.get_dirent_name_list(dst_repo_id) + assert renamed_name_1 in self.get_dirent_name_list(dst_repo_id) + assert renamed_name_2 in self.get_dirent_name_list(dst_repo_id) ### copy for third time ### - renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name) + renamed_name_1 = check_filename_with_rename(dst_repo_id, '/', file_name_1) + renamed_name_2 = check_filename_with_rename(dst_repo_id, '/', file_name_2) + resp = self.client.post(self.copy_url, data) json_resp = json.loads(resp.content) - self.assertEqual(200, resp.status_code) - assert json_resp[0]['obj_name'] == renamed_name + assert json_resp[0]['repo_id'] == dst_repo_id assert json_resp[0]['parent_dir'] == '/' + assert json_resp[0]['obj_name'] == renamed_name_1 + assert json_resp[1]['obj_name'] == renamed_name_2 - # check old file still exists in src repo - assert file_name in self.get_dirent_name_list(self.repo_id) + # check old file still existes + assert file_name_1 in self.get_dirent_name_list(self.repo_id) + assert file_name_2 in self.get_dirent_name_list(self.repo_id) # check old file has been copyd to dst repo with a new name - assert renamed_name in self.get_dirent_name_list(dst_repo_id) + assert renamed_name_1 in self.get_dirent_name_list(dst_repo_id) + assert renamed_name_2 in self.get_dirent_name_list(dst_repo_id) self.remove_repo(dst_repo_id) + def test_can_move(self): + self.login_as(self.user) + + file_name_1 = self.create_new_file() + file_name_2 = self.create_new_file() + + # check old file name exists in src repo + assert file_name_1 in self.get_dirent_name_list(self.repo_id) + assert file_name_2 in self.get_dirent_name_list(self.repo_id) + + dst_repo_id = self.create_new_repo() + renamed_name_1 = check_filename_with_rename(dst_repo_id, '/', file_name_1) + renamed_name_2 = check_filename_with_rename(dst_repo_id, '/', file_name_2) + + data = { + 'file_names': file_name_1 + ':' + file_name_2, + 'dst_repo': dst_repo_id, + 'dst_dir': '/', + } + + # move files + resp = self.client.post(self.move_url, data) + json_resp = json.loads(resp.content) + self.assertEqual(200, resp.status_code) + + assert json_resp[0]['repo_id'] == dst_repo_id + assert json_resp[0]['parent_dir'] == '/' + assert json_resp[0]['obj_name'] == renamed_name_1 + assert json_resp[1]['obj_name'] == renamed_name_2 + + # check old file NOT existes + assert file_name_1 not in self.get_dirent_name_list(self.repo_id) + assert file_name_2 not in self.get_dirent_name_list(self.repo_id) + # check old file has been copyd to dst repo with a new name + assert renamed_name_1 in self.get_dirent_name_list(dst_repo_id) + assert renamed_name_2 in self.get_dirent_name_list(dst_repo_id) + self.remove_repo(dst_repo_id) + def test_can_delete(self): self.login_as(self.user)