mirror of
https://github.com/haiwen/seahub.git
synced 2025-08-17 14:37:58 +00:00
Merge pull request #1675 from haiwen/multi-copy-move
update multi copy/move dirents api
This commit is contained in:
commit
0100d6da8b
@ -66,7 +66,8 @@ from seahub.utils import gen_file_get_url, gen_token, gen_file_upload_url, \
|
|||||||
gen_file_share_link, gen_dir_share_link, is_org_context, gen_shared_link, \
|
gen_file_share_link, gen_dir_share_link, is_org_context, gen_shared_link, \
|
||||||
get_org_user_events, calculate_repos_last_modify, send_perm_audit_msg, \
|
get_org_user_events, calculate_repos_last_modify, send_perm_audit_msg, \
|
||||||
gen_shared_upload_link, convert_cmmt_desc_link, is_valid_dirent_name, \
|
gen_shared_upload_link, convert_cmmt_desc_link, is_valid_dirent_name, \
|
||||||
is_org_repo_creation_allowed, is_windows_operating_system
|
is_org_repo_creation_allowed, is_windows_operating_system, \
|
||||||
|
get_no_duplicate_obj_name
|
||||||
from seahub.utils.devices import do_unlink_device
|
from seahub.utils.devices import do_unlink_device
|
||||||
from seahub.utils.repo import get_sub_repo_abbrev_origin_path
|
from seahub.utils.repo import get_sub_repo_abbrev_origin_path
|
||||||
from seahub.utils.star import star_file, unstar_file
|
from seahub.utils.star import star_file, unstar_file
|
||||||
@ -1671,11 +1672,8 @@ class OpDeleteView(APIView):
|
|||||||
return api_error(status.HTTP_404_NOT_FOUND,
|
return api_error(status.HTTP_404_NOT_FOUND,
|
||||||
'File or directory not found.')
|
'File or directory not found.')
|
||||||
|
|
||||||
multi_files = ''
|
|
||||||
for file_name in file_names.split(':'):
|
|
||||||
multi_files += file_name + '\t'
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
multi_files = "\t".join(file_names.split(':'))
|
||||||
seafile_api.del_file(repo_id, parent_dir,
|
seafile_api.del_file(repo_id, parent_dir,
|
||||||
multi_files, username)
|
multi_files, username)
|
||||||
except SearpcError as e:
|
except SearpcError as e:
|
||||||
@ -1694,20 +1692,41 @@ class OpMoveView(APIView):
|
|||||||
|
|
||||||
def post(self, request, repo_id, format=None):
|
def post(self, request, repo_id, format=None):
|
||||||
|
|
||||||
repo = get_repo(repo_id)
|
|
||||||
if not repo:
|
|
||||||
return api_error(status.HTTP_404_NOT_FOUND, 'Library not found.')
|
|
||||||
|
|
||||||
username = request.user.username
|
username = request.user.username
|
||||||
parent_dir = request.GET.get('p', '/')
|
parent_dir = request.GET.get('p', '/')
|
||||||
dst_repo = request.POST.get('dst_repo', None)
|
dst_repo = request.POST.get('dst_repo', None)
|
||||||
dst_dir = request.POST.get('dst_dir', None)
|
dst_dir = request.POST.get('dst_dir', None)
|
||||||
file_names = request.POST.get("file_names", None)
|
obj_names = request.POST.get("file_names", None)
|
||||||
|
|
||||||
if not parent_dir or not file_names or not dst_repo or not dst_dir:
|
# argument check
|
||||||
|
if not parent_dir or not obj_names or not dst_repo or not dst_dir:
|
||||||
return api_error(status.HTTP_400_BAD_REQUEST,
|
return api_error(status.HTTP_400_BAD_REQUEST,
|
||||||
'Missing argument.')
|
'Missing argument.')
|
||||||
|
|
||||||
|
if repo_id == dst_repo and parent_dir == dst_dir:
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST,
|
||||||
|
'The destination directory is the same as the source.')
|
||||||
|
|
||||||
|
# src resource check
|
||||||
|
repo = get_repo(repo_id)
|
||||||
|
if not repo:
|
||||||
|
error_msg = 'Library %s not found.' % repo_id
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
if not seafile_api.get_dir_id_by_path(repo_id, parent_dir):
|
||||||
|
error_msg = 'Folder %s not found.' % parent_dir
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
# dst resource check
|
||||||
|
if not get_repo(dst_repo):
|
||||||
|
error_msg = 'Library %s not found.' % dst_repo
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
if not seafile_api.get_dir_id_by_path(dst_repo, dst_dir):
|
||||||
|
error_msg = 'Folder %s not found.' % dst_dir
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
# permission check
|
||||||
if check_folder_permission(request, repo_id, parent_dir) != 'rw':
|
if check_folder_permission(request, repo_id, parent_dir) != 'rw':
|
||||||
return api_error(status.HTTP_403_FORBIDDEN,
|
return api_error(status.HTTP_403_FORBIDDEN,
|
||||||
'You do not have permission to move file in this folder.')
|
'You do not have permission to move file in this folder.')
|
||||||
@ -1716,31 +1735,47 @@ class OpMoveView(APIView):
|
|||||||
return api_error(status.HTTP_403_FORBIDDEN,
|
return api_error(status.HTTP_403_FORBIDDEN,
|
||||||
'You do not have permission to move file to destination folder.')
|
'You do not have permission to move file to destination folder.')
|
||||||
|
|
||||||
if repo_id == dst_repo and parent_dir == dst_dir:
|
# check if all file/dir existes
|
||||||
|
obj_names = obj_names.strip(':').split(':')
|
||||||
|
dirents = seafile_api.list_dir_by_path(repo_id, parent_dir)
|
||||||
|
exist_obj_names = [dirent.obj_name for dirent in dirents]
|
||||||
|
if not set(obj_names).issubset(exist_obj_names):
|
||||||
return api_error(status.HTTP_400_BAD_REQUEST,
|
return api_error(status.HTTP_400_BAD_REQUEST,
|
||||||
'The destination directory is the same as the source.')
|
'file_names invalid.')
|
||||||
|
|
||||||
|
# make new name
|
||||||
|
dst_dirents = seafile_api.list_dir_by_path(dst_repo, dst_dir)
|
||||||
|
dst_obj_names = [dirent.obj_name for dirent in dst_dirents]
|
||||||
|
|
||||||
|
new_obj_names = []
|
||||||
|
for obj_name in obj_names:
|
||||||
|
new_obj_name = get_no_duplicate_obj_name(obj_name, dst_obj_names)
|
||||||
|
new_obj_names.append(new_obj_name)
|
||||||
|
|
||||||
|
# move file
|
||||||
|
try:
|
||||||
|
src_multi_objs = "\t".join(obj_names)
|
||||||
|
dst_multi_objs = "\t".join(new_obj_names)
|
||||||
|
|
||||||
|
seafile_api.move_file(repo_id, parent_dir, src_multi_objs,
|
||||||
|
dst_repo, dst_dir, dst_multi_objs, replace=False,
|
||||||
|
username=username, need_progress=0, synchronous=1)
|
||||||
|
except SearpcError as e:
|
||||||
|
logger.error(e)
|
||||||
|
return api_error(HTTP_520_OPERATION_FAILED,
|
||||||
|
"Failed to move file.")
|
||||||
|
|
||||||
obj_info_list = []
|
obj_info_list = []
|
||||||
for file_name in file_names.split(':'):
|
for new_obj_name in new_obj_names:
|
||||||
new_filename = check_filename_with_rename(dst_repo, dst_dir, file_name)
|
|
||||||
try:
|
|
||||||
seafile_api.move_file(repo_id, parent_dir, file_name,
|
|
||||||
dst_repo, dst_dir, new_filename,
|
|
||||||
replace=False, username=username,
|
|
||||||
need_progress=0, synchronous=1)
|
|
||||||
except SearpcError as e:
|
|
||||||
logger.error(e)
|
|
||||||
return api_error(HTTP_520_OPERATION_FAILED,
|
|
||||||
"Failed to move file.")
|
|
||||||
|
|
||||||
obj_info = {}
|
obj_info = {}
|
||||||
obj_info['repo_id'] = dst_repo
|
obj_info['repo_id'] = dst_repo
|
||||||
obj_info['parent_dir'] = dst_dir
|
obj_info['parent_dir'] = dst_dir
|
||||||
obj_info['obj_name'] = new_filename
|
obj_info['obj_name'] = new_obj_name
|
||||||
obj_info_list.append(obj_info)
|
obj_info_list.append(obj_info)
|
||||||
|
|
||||||
return reloaddir_if_necessary(request, repo, parent_dir, obj_info_list)
|
return reloaddir_if_necessary(request, repo, parent_dir, obj_info_list)
|
||||||
|
|
||||||
|
|
||||||
class OpCopyView(APIView):
|
class OpCopyView(APIView):
|
||||||
"""
|
"""
|
||||||
Copy files.
|
Copy files.
|
||||||
@ -1750,20 +1785,41 @@ class OpCopyView(APIView):
|
|||||||
|
|
||||||
def post(self, request, repo_id, format=None):
|
def post(self, request, repo_id, format=None):
|
||||||
|
|
||||||
repo = get_repo(repo_id)
|
|
||||||
if not repo:
|
|
||||||
return api_error(status.HTTP_404_NOT_FOUND, 'Library not found.')
|
|
||||||
|
|
||||||
username = request.user.username
|
username = request.user.username
|
||||||
parent_dir = request.GET.get('p', '/')
|
parent_dir = request.GET.get('p', '/')
|
||||||
dst_repo = request.POST.get('dst_repo', None)
|
dst_repo = request.POST.get('dst_repo', None)
|
||||||
dst_dir = request.POST.get('dst_dir', None)
|
dst_dir = request.POST.get('dst_dir', None)
|
||||||
file_names = request.POST.get("file_names", None)
|
obj_names = request.POST.get("file_names", None)
|
||||||
|
|
||||||
if not parent_dir or not file_names or not dst_repo or not dst_dir:
|
# argument check
|
||||||
|
if not parent_dir or not obj_names or not dst_repo or not dst_dir:
|
||||||
return api_error(status.HTTP_400_BAD_REQUEST,
|
return api_error(status.HTTP_400_BAD_REQUEST,
|
||||||
'Missing argument.')
|
'Missing argument.')
|
||||||
|
|
||||||
|
if repo_id == dst_repo and parent_dir == dst_dir:
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST,
|
||||||
|
'The destination directory is the same as the source.')
|
||||||
|
|
||||||
|
# src resource check
|
||||||
|
repo = get_repo(repo_id)
|
||||||
|
if not repo:
|
||||||
|
error_msg = 'Library %s not found.' % repo_id
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
if not seafile_api.get_dir_id_by_path(repo_id, parent_dir):
|
||||||
|
error_msg = 'Folder %s not found.' % parent_dir
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
# dst resource check
|
||||||
|
if not get_repo(dst_repo):
|
||||||
|
error_msg = 'Library %s not found.' % dst_repo
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
if not seafile_api.get_dir_id_by_path(dst_repo, dst_dir):
|
||||||
|
error_msg = 'Folder %s not found.' % dst_dir
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
# permission check
|
||||||
if check_folder_permission(request, repo_id, parent_dir) is None:
|
if check_folder_permission(request, repo_id, parent_dir) is None:
|
||||||
return api_error(status.HTTP_403_FORBIDDEN,
|
return api_error(status.HTTP_403_FORBIDDEN,
|
||||||
'You do not have permission to copy file of this folder.')
|
'You do not have permission to copy file of this folder.')
|
||||||
@ -1772,29 +1828,41 @@ class OpCopyView(APIView):
|
|||||||
return api_error(status.HTTP_403_FORBIDDEN,
|
return api_error(status.HTTP_403_FORBIDDEN,
|
||||||
'You do not have permission to copy file to destination folder.')
|
'You do not have permission to copy file to destination folder.')
|
||||||
|
|
||||||
if not get_repo(dst_repo):
|
# check if all file/dir existes
|
||||||
return api_error(status.HTTP_404_NOT_FOUND, 'Library not found.')
|
obj_names = obj_names.strip(':').split(':')
|
||||||
|
dirents = seafile_api.list_dir_by_path(repo_id, parent_dir)
|
||||||
|
exist_obj_names = [dirent.obj_name for dirent in dirents]
|
||||||
|
if not set(obj_names).issubset(exist_obj_names):
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST,
|
||||||
|
'file_names invalid.')
|
||||||
|
|
||||||
if seafile_api.get_dir_id_by_path(repo_id, parent_dir) is None or \
|
# make new name
|
||||||
seafile_api.get_dir_id_by_path(dst_repo, dst_dir) is None:
|
dst_dirents = seafile_api.list_dir_by_path(dst_repo, dst_dir)
|
||||||
return api_error(status.HTTP_400_BAD_REQUEST, 'Path does not exist.')
|
dst_obj_names = [dirent.obj_name for dirent in dst_dirents]
|
||||||
|
|
||||||
|
new_obj_names = []
|
||||||
|
for obj_name in obj_names:
|
||||||
|
new_obj_name = get_no_duplicate_obj_name(obj_name, dst_obj_names)
|
||||||
|
new_obj_names.append(new_obj_name)
|
||||||
|
|
||||||
|
# copy file
|
||||||
|
try:
|
||||||
|
src_multi_objs = "\t".join(obj_names)
|
||||||
|
dst_multi_objs = "\t".join(new_obj_names)
|
||||||
|
|
||||||
|
seafile_api.copy_file(repo_id, parent_dir, src_multi_objs,
|
||||||
|
dst_repo, dst_dir, dst_multi_objs, username, 0, synchronous=1)
|
||||||
|
except SearpcError as e:
|
||||||
|
logger.error(e)
|
||||||
|
return api_error(HTTP_520_OPERATION_FAILED,
|
||||||
|
"Failed to copy file.")
|
||||||
|
|
||||||
obj_info_list = []
|
obj_info_list = []
|
||||||
for file_name in file_names.split(':'):
|
for new_obj_name in new_obj_names:
|
||||||
new_filename = check_filename_with_rename(dst_repo, dst_dir, file_name)
|
|
||||||
try:
|
|
||||||
seafile_api.copy_file(repo_id, parent_dir, file_name,
|
|
||||||
dst_repo, dst_dir, new_filename,
|
|
||||||
username, 0, synchronous=1)
|
|
||||||
except SearpcError as e:
|
|
||||||
logger.error(e)
|
|
||||||
return api_error(HTTP_520_OPERATION_FAILED,
|
|
||||||
"Failed to copy file.")
|
|
||||||
|
|
||||||
obj_info = {}
|
obj_info = {}
|
||||||
obj_info['repo_id'] = dst_repo
|
obj_info['repo_id'] = dst_repo
|
||||||
obj_info['parent_dir'] = dst_dir
|
obj_info['parent_dir'] = dst_dir
|
||||||
obj_info['obj_name'] = new_filename
|
obj_info['obj_name'] = new_obj_name
|
||||||
obj_info_list.append(obj_info)
|
obj_info_list.append(obj_info)
|
||||||
|
|
||||||
return reloaddir_if_necessary(request, repo, parent_dir, obj_info_list)
|
return reloaddir_if_necessary(request, repo, parent_dir, obj_info_list)
|
||||||
|
@ -267,39 +267,44 @@ def is_ldap_user(user):
|
|||||||
"""
|
"""
|
||||||
return user.source == 'LDAP' or user.source == 'LDAPImport'
|
return user.source == 'LDAP' or user.source == 'LDAPImport'
|
||||||
|
|
||||||
def check_filename_with_rename(repo_id, parent_dir, filename):
|
def get_no_duplicate_obj_name(obj_name, exist_obj_names):
|
||||||
|
|
||||||
|
def no_duplicate(obj_name):
|
||||||
|
for exist_obj_name in exist_obj_names:
|
||||||
|
if exist_obj_name == obj_name:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def make_new_name(obj_name, i):
|
||||||
|
base, ext = os.path.splitext(obj_name)
|
||||||
|
if ext:
|
||||||
|
new_base = "%s (%d)" % (base, i)
|
||||||
|
return new_base + ext
|
||||||
|
else:
|
||||||
|
return "%s (%d)" % (obj_name, i)
|
||||||
|
|
||||||
|
if no_duplicate(obj_name):
|
||||||
|
return obj_name
|
||||||
|
else:
|
||||||
|
i = 1
|
||||||
|
while True:
|
||||||
|
new_name = make_new_name(obj_name, i)
|
||||||
|
if no_duplicate(new_name):
|
||||||
|
return new_name
|
||||||
|
else:
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
def check_filename_with_rename(repo_id, parent_dir, obj_name):
|
||||||
cmmts = seafile_api.get_commit_list(repo_id, 0, 1)
|
cmmts = seafile_api.get_commit_list(repo_id, 0, 1)
|
||||||
latest_commit = cmmts[0] if cmmts else None
|
latest_commit = cmmts[0] if cmmts else None
|
||||||
if not latest_commit:
|
if not latest_commit:
|
||||||
return ''
|
return ''
|
||||||
# TODO: what if parrent_dir does not exist?
|
# TODO: what if parrent_dir does not exist?
|
||||||
dirents = seafile_api.list_dir_by_commit_and_path(repo_id, latest_commit.id,
|
dirents = seafile_api.list_dir_by_commit_and_path(repo_id,
|
||||||
parent_dir.encode('utf-8'))
|
latest_commit.id, parent_dir.encode('utf-8'))
|
||||||
|
|
||||||
def no_duplicate(name):
|
exist_obj_names = [dirent.obj_name for dirent in dirents]
|
||||||
for dirent in dirents:
|
return get_no_duplicate_obj_name(obj_name, exist_obj_names)
|
||||||
if dirent.obj_name == name:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def make_new_name(filename, i):
|
|
||||||
base, ext = os.path.splitext(filename)
|
|
||||||
if ext:
|
|
||||||
new_base = "%s (%d)" % (base, i)
|
|
||||||
return new_base + ext
|
|
||||||
else:
|
|
||||||
return "%s (%d)" % (filename, i)
|
|
||||||
|
|
||||||
if no_duplicate(filename):
|
|
||||||
return filename
|
|
||||||
else:
|
|
||||||
i = 1
|
|
||||||
while True:
|
|
||||||
new_name = make_new_name (filename, i)
|
|
||||||
if no_duplicate(new_name):
|
|
||||||
return new_name
|
|
||||||
else:
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
def get_user_repos(username, org_id=None):
|
def get_user_repos(username, org_id=None):
|
||||||
"""
|
"""
|
||||||
|
@ -56,145 +56,123 @@ class FileOpsApiTest(BaseTestCase):
|
|||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.remove_repo()
|
self.remove_repo()
|
||||||
|
|
||||||
def test_can_move(self):
|
|
||||||
self.login_as(self.user)
|
|
||||||
|
|
||||||
file_name = self.create_new_file()
|
|
||||||
|
|
||||||
# check old file name exists in src repo
|
|
||||||
assert file_name in self.get_dirent_name_list(self.repo_id)
|
|
||||||
|
|
||||||
dst_repo_id = self.create_new_repo()
|
|
||||||
data = {
|
|
||||||
'file_names': file_name,
|
|
||||||
'dst_repo': dst_repo_id,
|
|
||||||
'dst_dir': '/',
|
|
||||||
}
|
|
||||||
|
|
||||||
### copy for first time ###
|
|
||||||
renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name)
|
|
||||||
resp = self.client.post(self.copy_url, data)
|
|
||||||
json_resp = json.loads(resp.content)
|
|
||||||
|
|
||||||
self.assertEqual(200, resp.status_code)
|
|
||||||
assert json_resp[0]['obj_name'] == renamed_name
|
|
||||||
assert json_resp[0]['repo_id'] == dst_repo_id
|
|
||||||
assert json_resp[0]['parent_dir'] == '/'
|
|
||||||
|
|
||||||
# check old file still existes
|
|
||||||
assert file_name in self.get_dirent_name_list(self.repo_id)
|
|
||||||
# check old file has been copyd to dst repo with a new name
|
|
||||||
assert renamed_name in self.get_dirent_name_list(dst_repo_id)
|
|
||||||
|
|
||||||
### copy for second time ###
|
|
||||||
renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name)
|
|
||||||
resp = self.client.post(self.copy_url, data)
|
|
||||||
json_resp = json.loads(resp.content)
|
|
||||||
|
|
||||||
self.assertEqual(200, resp.status_code)
|
|
||||||
assert json_resp[0]['obj_name'] == renamed_name
|
|
||||||
assert json_resp[0]['repo_id'] == dst_repo_id
|
|
||||||
assert json_resp[0]['parent_dir'] == '/'
|
|
||||||
|
|
||||||
# check old file still exists in src repo
|
|
||||||
assert file_name in self.get_dirent_name_list(self.repo_id)
|
|
||||||
# check old file has been copyd to dst repo with a new name
|
|
||||||
assert renamed_name in self.get_dirent_name_list(dst_repo_id)
|
|
||||||
|
|
||||||
### copy for third time ###
|
|
||||||
renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name)
|
|
||||||
resp = self.client.post(self.copy_url, data)
|
|
||||||
json_resp = json.loads(resp.content)
|
|
||||||
|
|
||||||
self.assertEqual(200, resp.status_code)
|
|
||||||
assert json_resp[0]['obj_name'] == renamed_name
|
|
||||||
assert json_resp[0]['repo_id'] == dst_repo_id
|
|
||||||
assert json_resp[0]['parent_dir'] == '/'
|
|
||||||
|
|
||||||
# check old file still exists in src repo
|
|
||||||
assert file_name in self.get_dirent_name_list(self.repo_id)
|
|
||||||
# check old file has been copyd to dst repo with a new name
|
|
||||||
assert renamed_name in self.get_dirent_name_list(dst_repo_id)
|
|
||||||
|
|
||||||
### then move ###
|
|
||||||
renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name)
|
|
||||||
resp = self.client.post(self.move_url, data)
|
|
||||||
json_resp = json.loads(resp.content)
|
|
||||||
|
|
||||||
self.assertEqual(200, resp.status_code)
|
|
||||||
assert json_resp[0]['obj_name'] == renamed_name
|
|
||||||
assert json_resp[0]['repo_id'] == dst_repo_id
|
|
||||||
assert json_resp[0]['parent_dir'] == '/'
|
|
||||||
|
|
||||||
# check old file NOT exists in src repo
|
|
||||||
assert file_name not in self.get_dirent_name_list(self.repo_id)
|
|
||||||
# check old file has been copyd to dst repo with a new name
|
|
||||||
assert renamed_name in self.get_dirent_name_list(dst_repo_id)
|
|
||||||
|
|
||||||
self.remove_repo(dst_repo_id)
|
|
||||||
|
|
||||||
def test_can_copy(self):
|
def test_can_copy(self):
|
||||||
self.login_as(self.user)
|
self.login_as(self.user)
|
||||||
|
|
||||||
file_name = self.create_new_file()
|
file_name_1 = self.create_new_file()
|
||||||
|
file_name_2 = self.create_new_file()
|
||||||
|
|
||||||
# check old file name exists in src repo
|
# check old file name exists in src repo
|
||||||
assert file_name in self.get_dirent_name_list(self.repo_id)
|
assert file_name_1 in self.get_dirent_name_list(self.repo_id)
|
||||||
|
assert file_name_2 in self.get_dirent_name_list(self.repo_id)
|
||||||
|
|
||||||
dst_repo_id = self.create_new_repo()
|
dst_repo_id = self.create_new_repo()
|
||||||
|
renamed_name_1 = check_filename_with_rename(dst_repo_id, '/', file_name_1)
|
||||||
|
renamed_name_2 = check_filename_with_rename(dst_repo_id, '/', file_name_2)
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
'file_names': file_name,
|
'file_names': file_name_1 + ':' + file_name_2,
|
||||||
'dst_repo': dst_repo_id,
|
'dst_repo': dst_repo_id,
|
||||||
'dst_dir': '/',
|
'dst_dir': '/',
|
||||||
}
|
}
|
||||||
|
|
||||||
### copy for first time ###
|
### copy for first time ###
|
||||||
renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name)
|
|
||||||
resp = self.client.post(self.copy_url, data)
|
resp = self.client.post(self.copy_url, data)
|
||||||
json_resp = json.loads(resp.content)
|
json_resp = json.loads(resp.content)
|
||||||
|
|
||||||
self.assertEqual(200, resp.status_code)
|
self.assertEqual(200, resp.status_code)
|
||||||
assert json_resp[0]['obj_name'] == renamed_name
|
|
||||||
assert json_resp[0]['repo_id'] == dst_repo_id
|
assert json_resp[0]['repo_id'] == dst_repo_id
|
||||||
assert json_resp[0]['parent_dir'] == '/'
|
assert json_resp[0]['parent_dir'] == '/'
|
||||||
|
assert json_resp[0]['obj_name'] == renamed_name_1
|
||||||
|
assert json_resp[1]['obj_name'] == renamed_name_2
|
||||||
|
|
||||||
# check old file still existes
|
# check old file still existes
|
||||||
assert file_name in self.get_dirent_name_list(self.repo_id)
|
assert file_name_1 in self.get_dirent_name_list(self.repo_id)
|
||||||
|
assert file_name_2 in self.get_dirent_name_list(self.repo_id)
|
||||||
# check old file has been copyd to dst repo with a new name
|
# check old file has been copyd to dst repo with a new name
|
||||||
assert renamed_name in self.get_dirent_name_list(dst_repo_id)
|
assert renamed_name_1 in self.get_dirent_name_list(dst_repo_id)
|
||||||
|
assert renamed_name_2 in self.get_dirent_name_list(dst_repo_id)
|
||||||
|
|
||||||
### copy for second time ###
|
### copy for second time ###
|
||||||
renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name)
|
renamed_name_1 = check_filename_with_rename(dst_repo_id, '/', file_name_1)
|
||||||
|
renamed_name_2 = check_filename_with_rename(dst_repo_id, '/', file_name_2)
|
||||||
|
|
||||||
resp = self.client.post(self.copy_url, data)
|
resp = self.client.post(self.copy_url, data)
|
||||||
json_resp = json.loads(resp.content)
|
json_resp = json.loads(resp.content)
|
||||||
|
|
||||||
self.assertEqual(200, resp.status_code)
|
self.assertEqual(200, resp.status_code)
|
||||||
assert json_resp[0]['obj_name'] == renamed_name
|
|
||||||
assert json_resp[0]['repo_id'] == dst_repo_id
|
assert json_resp[0]['repo_id'] == dst_repo_id
|
||||||
assert json_resp[0]['parent_dir'] == '/'
|
assert json_resp[0]['parent_dir'] == '/'
|
||||||
|
assert json_resp[0]['obj_name'] == renamed_name_1
|
||||||
|
assert json_resp[1]['obj_name'] == renamed_name_2
|
||||||
|
|
||||||
# check old file still exists in src repo
|
# check old file still existes
|
||||||
assert file_name in self.get_dirent_name_list(self.repo_id)
|
assert file_name_1 in self.get_dirent_name_list(self.repo_id)
|
||||||
|
assert file_name_2 in self.get_dirent_name_list(self.repo_id)
|
||||||
# check old file has been copyd to dst repo with a new name
|
# check old file has been copyd to dst repo with a new name
|
||||||
assert renamed_name in self.get_dirent_name_list(dst_repo_id)
|
assert renamed_name_1 in self.get_dirent_name_list(dst_repo_id)
|
||||||
|
assert renamed_name_2 in self.get_dirent_name_list(dst_repo_id)
|
||||||
|
|
||||||
### copy for third time ###
|
### copy for third time ###
|
||||||
renamed_name = check_filename_with_rename(dst_repo_id, '/', file_name)
|
renamed_name_1 = check_filename_with_rename(dst_repo_id, '/', file_name_1)
|
||||||
|
renamed_name_2 = check_filename_with_rename(dst_repo_id, '/', file_name_2)
|
||||||
|
|
||||||
resp = self.client.post(self.copy_url, data)
|
resp = self.client.post(self.copy_url, data)
|
||||||
json_resp = json.loads(resp.content)
|
json_resp = json.loads(resp.content)
|
||||||
|
|
||||||
self.assertEqual(200, resp.status_code)
|
self.assertEqual(200, resp.status_code)
|
||||||
assert json_resp[0]['obj_name'] == renamed_name
|
|
||||||
assert json_resp[0]['repo_id'] == dst_repo_id
|
assert json_resp[0]['repo_id'] == dst_repo_id
|
||||||
assert json_resp[0]['parent_dir'] == '/'
|
assert json_resp[0]['parent_dir'] == '/'
|
||||||
|
assert json_resp[0]['obj_name'] == renamed_name_1
|
||||||
|
assert json_resp[1]['obj_name'] == renamed_name_2
|
||||||
|
|
||||||
# check old file still exists in src repo
|
# check old file still existes
|
||||||
assert file_name in self.get_dirent_name_list(self.repo_id)
|
assert file_name_1 in self.get_dirent_name_list(self.repo_id)
|
||||||
|
assert file_name_2 in self.get_dirent_name_list(self.repo_id)
|
||||||
# check old file has been copyd to dst repo with a new name
|
# check old file has been copyd to dst repo with a new name
|
||||||
assert renamed_name in self.get_dirent_name_list(dst_repo_id)
|
assert renamed_name_1 in self.get_dirent_name_list(dst_repo_id)
|
||||||
|
assert renamed_name_2 in self.get_dirent_name_list(dst_repo_id)
|
||||||
|
|
||||||
self.remove_repo(dst_repo_id)
|
self.remove_repo(dst_repo_id)
|
||||||
|
|
||||||
|
def test_can_move(self):
|
||||||
|
self.login_as(self.user)
|
||||||
|
|
||||||
|
file_name_1 = self.create_new_file()
|
||||||
|
file_name_2 = self.create_new_file()
|
||||||
|
|
||||||
|
# check old file name exists in src repo
|
||||||
|
assert file_name_1 in self.get_dirent_name_list(self.repo_id)
|
||||||
|
assert file_name_2 in self.get_dirent_name_list(self.repo_id)
|
||||||
|
|
||||||
|
dst_repo_id = self.create_new_repo()
|
||||||
|
renamed_name_1 = check_filename_with_rename(dst_repo_id, '/', file_name_1)
|
||||||
|
renamed_name_2 = check_filename_with_rename(dst_repo_id, '/', file_name_2)
|
||||||
|
|
||||||
|
data = {
|
||||||
|
'file_names': file_name_1 + ':' + file_name_2,
|
||||||
|
'dst_repo': dst_repo_id,
|
||||||
|
'dst_dir': '/',
|
||||||
|
}
|
||||||
|
|
||||||
|
# move files
|
||||||
|
resp = self.client.post(self.move_url, data)
|
||||||
|
json_resp = json.loads(resp.content)
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
|
||||||
|
assert json_resp[0]['repo_id'] == dst_repo_id
|
||||||
|
assert json_resp[0]['parent_dir'] == '/'
|
||||||
|
assert json_resp[0]['obj_name'] == renamed_name_1
|
||||||
|
assert json_resp[1]['obj_name'] == renamed_name_2
|
||||||
|
|
||||||
|
# check old file NOT existes
|
||||||
|
assert file_name_1 not in self.get_dirent_name_list(self.repo_id)
|
||||||
|
assert file_name_2 not in self.get_dirent_name_list(self.repo_id)
|
||||||
|
# check old file has been copyd to dst repo with a new name
|
||||||
|
assert renamed_name_1 in self.get_dirent_name_list(dst_repo_id)
|
||||||
|
assert renamed_name_2 in self.get_dirent_name_list(dst_repo_id)
|
||||||
|
self.remove_repo(dst_repo_id)
|
||||||
|
|
||||||
def test_can_delete(self):
|
def test_can_delete(self):
|
||||||
self.login_as(self.user)
|
self.login_as(self.user)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user