mirror of
https://github.com/haiwen/seahub.git
synced 2025-08-02 07:47:32 +00:00
commit
3aa44f4fa6
@ -1603,7 +1603,7 @@ class FileView(APIView):
|
||||
file_name, op, use_onetime)
|
||||
|
||||
def post(self, request, repo_id, format=None):
|
||||
# rename, move or create file
|
||||
# rename, move, copy or create file
|
||||
repo = get_repo(repo_id)
|
||||
if not repo:
|
||||
return api_error(status.HTTP_404_NOT_FOUND, 'Library not found.')
|
||||
@ -1712,6 +1712,57 @@ class FileView(APIView):
|
||||
uri = reverse('FileView', args=[dst_repo_id], request=request)
|
||||
resp['Location'] = uri + '?p=' + quote(dst_dir_utf8) + quote(new_filename_utf8)
|
||||
return resp
|
||||
|
||||
elif operation.lower() == 'copy':
|
||||
src_repo_id = repo_id
|
||||
src_dir = os.path.dirname(path)
|
||||
src_dir_utf8 = src_dir.encode('utf-8')
|
||||
dst_repo_id = request.POST.get('dst_repo', '')
|
||||
dst_dir = request.POST.get('dst_dir', '')
|
||||
dst_dir_utf8 = dst_dir.encode('utf-8')
|
||||
|
||||
if dst_dir[-1] != '/': # Append '/' to the end of directory if necessary
|
||||
dst_dir += '/'
|
||||
|
||||
if not (dst_repo_id and dst_dir):
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, 'Missing arguments.')
|
||||
|
||||
if src_repo_id == dst_repo_id and src_dir == dst_dir:
|
||||
return Response('success', status=status.HTTP_200_OK)
|
||||
|
||||
# check src folder permission
|
||||
if check_folder_permission(request, repo_id, path) is None:
|
||||
return api_error(status.HTTP_403_FORBIDDEN,
|
||||
'You do not have permission to copy file.')
|
||||
|
||||
# check dst folder permission
|
||||
if check_folder_permission(request, dst_repo_id, dst_dir) != 'rw':
|
||||
return api_error(status.HTTP_403_FORBIDDEN,
|
||||
'You do not have permission to copy file.')
|
||||
|
||||
filename = os.path.basename(path)
|
||||
filename_utf8 = filename.encode('utf-8')
|
||||
new_filename_utf8 = check_filename_with_rename_utf8(dst_repo_id,
|
||||
dst_dir,
|
||||
filename)
|
||||
try:
|
||||
seafile_api.copy_file(src_repo_id, src_dir_utf8,
|
||||
filename_utf8, dst_repo_id,
|
||||
dst_dir_utf8, new_filename_utf8,
|
||||
username, 0, synchronous=1)
|
||||
except SearpcError as e:
|
||||
logger.error(e)
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
"SearpcError:" + e.msg)
|
||||
|
||||
if request.GET.get('reloaddir', '').lower() == 'true':
|
||||
return reloaddir(request, dst_repo, dst_dir)
|
||||
else:
|
||||
resp = Response('success', status=status.HTTP_200_OK)
|
||||
uri = reverse('FileView', args=[dst_repo_id], request=request)
|
||||
resp['Location'] = uri + '?p=' + quote(dst_dir_utf8) + quote(new_filename_utf8)
|
||||
return resp
|
||||
|
||||
elif operation.lower() == 'create':
|
||||
if check_folder_permission(request, repo_id, parent_dir) != 'rw':
|
||||
return api_error(status.HTTP_403_FORBIDDEN,
|
||||
|
@ -4,15 +4,13 @@ Test file/dir operations.
|
||||
"""
|
||||
|
||||
import posixpath
|
||||
import random
|
||||
import re
|
||||
import pytest
|
||||
import urllib
|
||||
from urllib import urlencode, quote, quote
|
||||
from urllib import urlencode, quote
|
||||
import urlparse
|
||||
|
||||
from tests.common.utils import randstring, urljoin
|
||||
from tests.api.urls import DEFAULT_REPO_URL, REPOS_URL
|
||||
from tests.api.apitestbase import ApiTestBase, USERNAME
|
||||
from tests.api.apitestbase import ApiTestBase
|
||||
|
||||
class FilesApiTest(ApiTestBase):
|
||||
def test_rename_file(self):
|
||||
@ -45,34 +43,35 @@ class FilesApiTest(ApiTestBase):
|
||||
|
||||
def test_copy_file(self):
|
||||
with self.get_tmp_repo() as repo:
|
||||
fname, _ = self.create_file(repo)
|
||||
# TODO: create another repo here, and use it as dst_repo
|
||||
|
||||
# create sub folder(dpath)
|
||||
dpath, _ = self.create_dir(repo)
|
||||
fopurl = urljoin(repo.repo_url, 'fileops/copy/') + '?p=/'
|
||||
data = {
|
||||
'file_names': fname,
|
||||
'dst_repo': repo.repo_id,
|
||||
'dst_dir': dpath,
|
||||
}
|
||||
res = self.post(fopurl, data=data)
|
||||
self.assertEqual(res.text, '"success"')
|
||||
|
||||
# create tmp file in sub folder(dpath)
|
||||
tmp_file = 'tmp_file.txt'
|
||||
furl = repo.get_filepath_url(dpath + '/' + tmp_file)
|
||||
file_path = dpath + '/' + tmp_file
|
||||
furl = repo.get_filepath_url(file_path)
|
||||
data = {'operation': 'create'}
|
||||
res = self.post(furl, data=data, expected=201)
|
||||
|
||||
# copy tmp file(in dpath) to dst dir
|
||||
fopurl = urljoin(repo.repo_url, 'fileops/copy/') + '?p=' + quote(dpath)
|
||||
# copy tmp file from sub folder(dpath) to dst dir('/')
|
||||
data = {
|
||||
'file_names': tmp_file,
|
||||
'dst_repo': repo.repo_id,
|
||||
'dst_dir': dpath,
|
||||
'dst_dir': '/',
|
||||
'operation': 'copy',
|
||||
}
|
||||
res = self.post(fopurl, data=data)
|
||||
u = urlparse.urlparse(furl)
|
||||
parsed_furl = urlparse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
|
||||
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
|
||||
self.assertEqual(res.text, '"success"')
|
||||
|
||||
# get info of copied file in dst dir('/')
|
||||
fdurl = repo.file_url + u'detail/?p=/%s' % quote(tmp_file)
|
||||
detail = self.get(fdurl).json()
|
||||
self.assertIsNotNone(detail)
|
||||
self.assertIsNotNone(detail['id'])
|
||||
|
||||
def test_download_file(self):
|
||||
with self.get_tmp_repo() as repo:
|
||||
fname, furl = self.create_file(repo)
|
||||
|
Loading…
Reference in New Issue
Block a user