mirror of
https://github.com/haiwen/seahub.git
synced 2025-08-19 07:27:56 +00:00
Merge branch '6.0'
This commit is contained in:
commit
5c998cdb03
@ -116,6 +116,28 @@ class DirSharedItemsEndpoint(APIView):
|
|||||||
else:
|
else:
|
||||||
return seafile_api.get_repo_owner(repo_id)
|
return seafile_api.get_repo_owner(repo_id)
|
||||||
|
|
||||||
|
def has_shared_to_user(self, request, repo_id, path, username):
|
||||||
|
items = self.list_user_shared_items(request, repo_id, path)
|
||||||
|
|
||||||
|
has_shared = False
|
||||||
|
for item in items:
|
||||||
|
if username == item['user_info']['name']:
|
||||||
|
has_shared = True
|
||||||
|
break
|
||||||
|
|
||||||
|
return has_shared
|
||||||
|
|
||||||
|
def has_shared_to_group(self, request, repo_id, path, group_id):
|
||||||
|
items = self.list_group_shared_items(request, repo_id, path)
|
||||||
|
|
||||||
|
has_shared = False
|
||||||
|
for item in items:
|
||||||
|
if group_id == item['group_info']['id']:
|
||||||
|
has_shared = True
|
||||||
|
break
|
||||||
|
|
||||||
|
return has_shared
|
||||||
|
|
||||||
def get(self, request, repo_id, format=None):
|
def get(self, request, repo_id, format=None):
|
||||||
"""List shared items(shared to users/groups) for a folder/library.
|
"""List shared items(shared to users/groups) for a folder/library.
|
||||||
"""
|
"""
|
||||||
@ -149,21 +171,16 @@ class DirSharedItemsEndpoint(APIView):
|
|||||||
|
|
||||||
path = request.GET.get('p', '/')
|
path = request.GET.get('p', '/')
|
||||||
if seafile_api.get_dir_id_by_path(repo.id, path) is None:
|
if seafile_api.get_dir_id_by_path(repo.id, path) is None:
|
||||||
return api_error(status.HTTP_400_BAD_REQUEST, 'Directory not found.')
|
return api_error(status.HTTP_404_NOT_FOUND, 'Folder %s not found.' % path)
|
||||||
|
|
||||||
if username != self.get_repo_owner(request, repo_id):
|
if username != self.get_repo_owner(request, repo_id):
|
||||||
return api_error(status.HTTP_403_FORBIDDEN, 'Permission denied.')
|
return api_error(status.HTTP_403_FORBIDDEN, 'Permission denied.')
|
||||||
|
|
||||||
shared_to_user, shared_to_group = self.handle_shared_to_args(request)
|
|
||||||
|
|
||||||
permission = request.data.get('permission', 'r')
|
permission = request.data.get('permission', 'r')
|
||||||
if permission not in ['r', 'rw']:
|
if permission not in ['r', 'rw']:
|
||||||
return api_error(status.HTTP_400_BAD_REQUEST, 'permission invalid.')
|
return api_error(status.HTTP_400_BAD_REQUEST, 'permission invalid.')
|
||||||
|
|
||||||
path = request.GET.get('p', '/')
|
shared_to_user, shared_to_group = self.handle_shared_to_args(request)
|
||||||
if seafile_api.get_dir_id_by_path(repo.id, path) is None:
|
|
||||||
return api_error(status.HTTP_404_NOT_FOUND, 'Folder %s not found.' % path)
|
|
||||||
|
|
||||||
if shared_to_user:
|
if shared_to_user:
|
||||||
shared_to = request.GET.get('username')
|
shared_to = request.GET.get('username')
|
||||||
if shared_to is None or not is_valid_username(shared_to):
|
if shared_to is None or not is_valid_username(shared_to):
|
||||||
@ -268,6 +285,13 @@ class DirSharedItemsEndpoint(APIView):
|
|||||||
})
|
})
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if self.has_shared_to_user(request, repo_id, path, to_user):
|
||||||
|
result['failed'].append({
|
||||||
|
'email': to_user,
|
||||||
|
'error_msg': 'This item has been shared to %s.' % to_user
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if is_org_context(request):
|
if is_org_context(request):
|
||||||
org_id = request.user.org.org_id
|
org_id = request.user.org.org_id
|
||||||
@ -321,10 +345,18 @@ class DirSharedItemsEndpoint(APIView):
|
|||||||
gid = int(gid)
|
gid = int(gid)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return api_error(status.HTTP_400_BAD_REQUEST, 'group_id %s invalid.' % gid)
|
return api_error(status.HTTP_400_BAD_REQUEST, 'group_id %s invalid.' % gid)
|
||||||
|
|
||||||
group = seaserv.get_group(gid)
|
group = seaserv.get_group(gid)
|
||||||
if not group:
|
if not group:
|
||||||
return api_error(status.HTTP_404_NOT_FOUND, 'Group %s not found' % gid)
|
return api_error(status.HTTP_404_NOT_FOUND, 'Group %s not found' % gid)
|
||||||
|
|
||||||
|
if self.has_shared_to_group(request, repo_id, path, gid):
|
||||||
|
result['failed'].append({
|
||||||
|
'group_name': group.group_name,
|
||||||
|
'error_msg': 'This item has been shared to %s.' % group.group_name
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if is_org_context(request):
|
if is_org_context(request):
|
||||||
org_id = request.user.org.org_id
|
org_id = request.user.org.org_id
|
||||||
@ -386,7 +418,6 @@ class DirSharedItemsEndpoint(APIView):
|
|||||||
return api_error(status.HTTP_403_FORBIDDEN, 'Permission denied.')
|
return api_error(status.HTTP_403_FORBIDDEN, 'Permission denied.')
|
||||||
|
|
||||||
shared_to_user, shared_to_group = self.handle_shared_to_args(request)
|
shared_to_user, shared_to_group = self.handle_shared_to_args(request)
|
||||||
|
|
||||||
if shared_to_user:
|
if shared_to_user:
|
||||||
shared_to = request.GET.get('username')
|
shared_to = request.GET.get('username')
|
||||||
if shared_to is None or not is_valid_username(shared_to):
|
if shared_to is None or not is_valid_username(shared_to):
|
||||||
|
@ -103,8 +103,8 @@ urlpatterns = patterns('',
|
|||||||
|
|
||||||
# Deprecated
|
# Deprecated
|
||||||
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/fileops/delete/$', OpDeleteView.as_view()),
|
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/fileops/delete/$', OpDeleteView.as_view()),
|
||||||
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/fileops/copy/$', OpCopyView.as_view()),
|
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/fileops/copy/$', OpCopyView.as_view(), name="api2-fileops-copy"),
|
||||||
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/fileops/move/$', OpMoveView.as_view()),
|
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/fileops/move/$', OpMoveView.as_view(), name="api2-fileops-move"),
|
||||||
)
|
)
|
||||||
|
|
||||||
# serve office converter static files
|
# serve office converter static files
|
||||||
|
@ -345,6 +345,7 @@ class Search(APIView):
|
|||||||
file_id = seafile_api.get_file_id_by_path(e['repo_id'], path)
|
file_id = seafile_api.get_file_id_by_path(e['repo_id'], path)
|
||||||
e['oid'] = file_id
|
e['oid'] = file_id
|
||||||
repo = get_repo(e['repo_id'])
|
repo = get_repo(e['repo_id'])
|
||||||
|
e['repo_name'] = repo.name
|
||||||
e['size'] = get_file_size(repo.store_id, repo.version, file_id)
|
e['size'] = get_file_size(repo.store_id, repo.version, file_id)
|
||||||
except SearpcError, err:
|
except SearpcError, err:
|
||||||
pass
|
pass
|
||||||
@ -1607,7 +1608,7 @@ def reloaddir(request, repo, parent_dir):
|
|||||||
|
|
||||||
return get_dir_entrys_by_id(request, repo, parent_dir, dir_id)
|
return get_dir_entrys_by_id(request, repo, parent_dir, dir_id)
|
||||||
|
|
||||||
def reloaddir_if_necessary (request, repo, parent_dir):
|
def reloaddir_if_necessary(request, repo, parent_dir, obj_info=None):
|
||||||
|
|
||||||
reload_dir = False
|
reload_dir = False
|
||||||
s = request.GET.get('reloaddir', None)
|
s = request.GET.get('reloaddir', None)
|
||||||
@ -1615,6 +1616,9 @@ def reloaddir_if_necessary (request, repo, parent_dir):
|
|||||||
reload_dir = True
|
reload_dir = True
|
||||||
|
|
||||||
if not reload_dir:
|
if not reload_dir:
|
||||||
|
if obj_info:
|
||||||
|
return Response(obj_info)
|
||||||
|
else:
|
||||||
return Response('success')
|
return Response('success')
|
||||||
|
|
||||||
return reloaddir(request, repo, parent_dir)
|
return reloaddir(request, repo, parent_dir)
|
||||||
@ -1663,7 +1667,7 @@ class OpMoveView(APIView):
|
|||||||
"""
|
"""
|
||||||
Move files.
|
Move files.
|
||||||
"""
|
"""
|
||||||
authentication_classes = (TokenAuthentication, )
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
permission_classes = (IsAuthenticated, )
|
permission_classes = (IsAuthenticated, )
|
||||||
|
|
||||||
def post(self, request, repo_id, format=None):
|
def post(self, request, repo_id, format=None):
|
||||||
@ -1694,6 +1698,7 @@ class OpMoveView(APIView):
|
|||||||
return api_error(status.HTTP_400_BAD_REQUEST,
|
return api_error(status.HTTP_400_BAD_REQUEST,
|
||||||
'The destination directory is the same as the source.')
|
'The destination directory is the same as the source.')
|
||||||
|
|
||||||
|
obj_info_list = []
|
||||||
parent_dir_utf8 = parent_dir.encode('utf-8')
|
parent_dir_utf8 = parent_dir.encode('utf-8')
|
||||||
for file_name in file_names.split(':'):
|
for file_name in file_names.split(':'):
|
||||||
file_name = unquote(file_name.encode('utf-8'))
|
file_name = unquote(file_name.encode('utf-8'))
|
||||||
@ -1709,13 +1714,20 @@ class OpMoveView(APIView):
|
|||||||
return api_error(HTTP_520_OPERATION_FAILED,
|
return api_error(HTTP_520_OPERATION_FAILED,
|
||||||
"Failed to move file.")
|
"Failed to move file.")
|
||||||
|
|
||||||
return reloaddir_if_necessary (request, repo, parent_dir_utf8)
|
obj_info = {}
|
||||||
|
obj_info['repo_id'] = dst_repo
|
||||||
|
obj_info['parent_dir'] = dst_dir
|
||||||
|
obj_info['obj_name'] = new_filename
|
||||||
|
obj_info_list.append(obj_info)
|
||||||
|
|
||||||
|
return reloaddir_if_necessary(request, repo, parent_dir_utf8,
|
||||||
|
obj_info_list)
|
||||||
|
|
||||||
class OpCopyView(APIView):
|
class OpCopyView(APIView):
|
||||||
"""
|
"""
|
||||||
Copy files.
|
Copy files.
|
||||||
"""
|
"""
|
||||||
authentication_classes = (TokenAuthentication, )
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
permission_classes = (IsAuthenticated, )
|
permission_classes = (IsAuthenticated, )
|
||||||
|
|
||||||
def post(self, request, repo_id, format=None):
|
def post(self, request, repo_id, format=None):
|
||||||
@ -1749,6 +1761,7 @@ class OpCopyView(APIView):
|
|||||||
seafile_api.get_dir_id_by_path(dst_repo, dst_dir) is None:
|
seafile_api.get_dir_id_by_path(dst_repo, dst_dir) is None:
|
||||||
return api_error(status.HTTP_400_BAD_REQUEST, 'Path does not exist.')
|
return api_error(status.HTTP_400_BAD_REQUEST, 'Path does not exist.')
|
||||||
|
|
||||||
|
obj_info_list = []
|
||||||
parent_dir_utf8 = parent_dir.encode('utf-8')
|
parent_dir_utf8 = parent_dir.encode('utf-8')
|
||||||
for file_name in file_names.split(':'):
|
for file_name in file_names.split(':'):
|
||||||
file_name = unquote(file_name.encode('utf-8'))
|
file_name = unquote(file_name.encode('utf-8'))
|
||||||
@ -1763,7 +1776,14 @@ class OpCopyView(APIView):
|
|||||||
return api_error(HTTP_520_OPERATION_FAILED,
|
return api_error(HTTP_520_OPERATION_FAILED,
|
||||||
"Failed to copy file.")
|
"Failed to copy file.")
|
||||||
|
|
||||||
return reloaddir_if_necessary(request, repo, parent_dir_utf8)
|
obj_info = {}
|
||||||
|
obj_info['repo_id'] = dst_repo
|
||||||
|
obj_info['parent_dir'] = dst_dir
|
||||||
|
obj_info['obj_name'] = new_filename
|
||||||
|
obj_info_list.append(obj_info)
|
||||||
|
|
||||||
|
return reloaddir_if_necessary(request, repo, parent_dir_utf8,
|
||||||
|
obj_info_list)
|
||||||
|
|
||||||
|
|
||||||
class StarredFileView(APIView):
|
class StarredFileView(APIView):
|
||||||
@ -2025,6 +2045,7 @@ class FileView(APIView):
|
|||||||
parent_dir = os.path.dirname(path)
|
parent_dir = os.path.dirname(path)
|
||||||
operation = request.POST.get('operation', '')
|
operation = request.POST.get('operation', '')
|
||||||
|
|
||||||
|
file_info = {}
|
||||||
if operation.lower() == 'rename':
|
if operation.lower() == 'rename':
|
||||||
if check_folder_permission(request, repo_id, parent_dir) != 'rw':
|
if check_folder_permission(request, repo_id, parent_dir) != 'rw':
|
||||||
return api_error(status.HTTP_403_FORBIDDEN,
|
return api_error(status.HTTP_403_FORBIDDEN,
|
||||||
@ -2108,7 +2129,10 @@ class FileView(APIView):
|
|||||||
if request.GET.get('reloaddir', '').lower() == 'true':
|
if request.GET.get('reloaddir', '').lower() == 'true':
|
||||||
return reloaddir(request, dst_repo, dst_dir)
|
return reloaddir(request, dst_repo, dst_dir)
|
||||||
else:
|
else:
|
||||||
resp = Response('success', status=status.HTTP_301_MOVED_PERMANENTLY)
|
file_info['repo_id'] = dst_repo_id
|
||||||
|
file_info['parent_dir'] = dst_dir
|
||||||
|
file_info['obj_name'] = new_filename_utf8
|
||||||
|
resp = Response(file_info, status=status.HTTP_301_MOVED_PERMANENTLY)
|
||||||
uri = reverse('FileView', args=[dst_repo_id], request=request)
|
uri = reverse('FileView', args=[dst_repo_id], request=request)
|
||||||
resp['Location'] = uri + '?p=' + quote(dst_dir_utf8) + quote(new_filename_utf8)
|
resp['Location'] = uri + '?p=' + quote(dst_dir_utf8) + quote(new_filename_utf8)
|
||||||
return resp
|
return resp
|
||||||
@ -2158,7 +2182,10 @@ class FileView(APIView):
|
|||||||
if request.GET.get('reloaddir', '').lower() == 'true':
|
if request.GET.get('reloaddir', '').lower() == 'true':
|
||||||
return reloaddir(request, dst_repo, dst_dir)
|
return reloaddir(request, dst_repo, dst_dir)
|
||||||
else:
|
else:
|
||||||
resp = Response('success', status=status.HTTP_200_OK)
|
file_info['repo_id'] = dst_repo_id
|
||||||
|
file_info['parent_dir'] = dst_dir
|
||||||
|
file_info['obj_name'] = new_filename_utf8
|
||||||
|
resp = Response(file_info, status=status.HTTP_200_OK)
|
||||||
uri = reverse('FileView', args=[dst_repo_id], request=request)
|
uri = reverse('FileView', args=[dst_repo_id], request=request)
|
||||||
resp['Location'] = uri + '?p=' + quote(dst_dir_utf8) + quote(new_filename_utf8)
|
resp['Location'] = uri + '?p=' + quote(dst_dir_utf8) + quote(new_filename_utf8)
|
||||||
return resp
|
return resp
|
||||||
|
@ -88,7 +88,7 @@ def get_group_member_info(request, group_id, email, avatar_size=AVATAR_DEFAULT_S
|
|||||||
|
|
||||||
role = 'Member'
|
role = 'Member'
|
||||||
group = ccnet_api.get_group(int(group_id))
|
group = ccnet_api.get_group(int(group_id))
|
||||||
is_admin = ccnet_api.check_group_staff(int(group_id), email)
|
is_admin = bool(ccnet_api.check_group_staff(int(group_id), email))
|
||||||
if email == group.creator_name:
|
if email == group.creator_name:
|
||||||
role = 'Owner'
|
role = 'Owner'
|
||||||
elif is_admin:
|
elif is_admin:
|
||||||
|
@ -60,10 +60,6 @@
|
|||||||
<input type="checkbox" name="ftype" value="Audio" class="vam" />
|
<input type="checkbox" name="ftype" value="Audio" class="vam" />
|
||||||
<span class="vam">{% trans "Audio" %}</span>
|
<span class="vam">{% trans "Audio" %}</span>
|
||||||
</label>
|
</label>
|
||||||
<label class="checkbox-label">
|
|
||||||
<input type="checkbox" name="ftype" value="SVG" class="vam" />
|
|
||||||
<span class="vam">{% trans "svg" %}</span>
|
|
||||||
</label>
|
|
||||||
<label class="checkbox-label">
|
<label class="checkbox-label">
|
||||||
<input type="checkbox" name="ftype" value="PDF" class="vam" />
|
<input type="checkbox" name="ftype" value="PDF" class="vam" />
|
||||||
<span class="vam">{% trans "pdf" %}</span>
|
<span class="vam">{% trans "pdf" %}</span>
|
||||||
|
108
tests/api/test_file_ops.py
Normal file
108
tests/api/test_file_ops.py
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
|
||||||
|
from seaserv import seafile_api
|
||||||
|
|
||||||
|
from django.core.urlresolvers import reverse
|
||||||
|
|
||||||
|
from seahub.test_utils import BaseTestCase
|
||||||
|
|
||||||
|
class FileOpsApiTest(BaseTestCase):
|
||||||
|
|
||||||
|
def create_new_repo(self):
|
||||||
|
new_repo_id = seafile_api.create_repo(name='test-repo-2', desc='',
|
||||||
|
username=self.user.username, passwd=None)
|
||||||
|
|
||||||
|
return new_repo_id
|
||||||
|
|
||||||
|
def get_lib_file_name(self, repo_id):
|
||||||
|
|
||||||
|
url = reverse('list_lib_dir', args=[repo_id])
|
||||||
|
resp = self.client.get(url, HTTP_X_REQUESTED_WITH='XMLHttpRequest')
|
||||||
|
json_resp = json.loads(resp.content)
|
||||||
|
|
||||||
|
if len(json_resp['dirent_list']) > 0:
|
||||||
|
for dirent in json_resp['dirent_list']:
|
||||||
|
if dirent.has_key('is_file') and dirent['is_file']:
|
||||||
|
return dirent['obj_name']
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.user_name = self.user.username
|
||||||
|
self.admin_name = self.admin.username
|
||||||
|
|
||||||
|
self.repo_id = self.repo.id
|
||||||
|
self.file_path = self.file
|
||||||
|
self.file_name = os.path.basename(self.file_path)
|
||||||
|
|
||||||
|
self.folder_path = self.folder
|
||||||
|
|
||||||
|
self.copy_url = reverse('api2-fileops-copy', args=[self.repo_id])
|
||||||
|
self.move_url = reverse('api2-fileops-move', args=[self.repo_id])
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.remove_repo()
|
||||||
|
|
||||||
|
def test_can_move_file(self):
|
||||||
|
self.login_as(self.user)
|
||||||
|
|
||||||
|
# check old file name exist
|
||||||
|
assert self.file_name == self.get_lib_file_name(self.repo_id)
|
||||||
|
|
||||||
|
# move file
|
||||||
|
dst_repo_id = self.create_new_repo()
|
||||||
|
data = {
|
||||||
|
'file_names': self.file_name,
|
||||||
|
'dst_repo': dst_repo_id,
|
||||||
|
'dst_dir': '/',
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = self.client.post(self.move_url, data)
|
||||||
|
json_resp = json.loads(resp.content)
|
||||||
|
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
assert json_resp[0]['obj_name'] == self.file_name
|
||||||
|
assert json_resp[0]['repo_id'] == dst_repo_id
|
||||||
|
assert json_resp[0]['parent_dir'] == '/'
|
||||||
|
|
||||||
|
# check old file has been delete
|
||||||
|
assert self.get_lib_file_name(self.repo_id) == None
|
||||||
|
|
||||||
|
# check old file has been moved to dst repo
|
||||||
|
assert self.file_name == self.get_lib_file_name(dst_repo_id)
|
||||||
|
|
||||||
|
self.remove_repo(dst_repo_id)
|
||||||
|
|
||||||
|
def test_can_copy_file(self):
|
||||||
|
self.login_as(self.user)
|
||||||
|
|
||||||
|
# check old file name exist
|
||||||
|
assert self.file_name == self.get_lib_file_name(self.repo_id)
|
||||||
|
|
||||||
|
# copy file
|
||||||
|
dst_repo_id = self.create_new_repo()
|
||||||
|
data = {
|
||||||
|
'file_names': self.file_name,
|
||||||
|
'dst_repo': dst_repo_id,
|
||||||
|
'dst_dir': '/',
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = self.client.post(self.copy_url, data)
|
||||||
|
json_resp = json.loads(resp.content)
|
||||||
|
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
assert json_resp[0]['obj_name'] == self.file_name
|
||||||
|
assert json_resp[0]['repo_id'] == dst_repo_id
|
||||||
|
assert json_resp[0]['parent_dir'] == '/'
|
||||||
|
|
||||||
|
# check old file still existes
|
||||||
|
assert self.file_name == self.get_lib_file_name(self.repo_id)
|
||||||
|
|
||||||
|
# check old file has been copyd to dst repo
|
||||||
|
assert self.file_name == self.get_lib_file_name(dst_repo_id)
|
||||||
|
|
||||||
|
self.remove_repo(dst_repo_id)
|
@ -8,6 +8,7 @@ import pytest
|
|||||||
import urllib
|
import urllib
|
||||||
from urllib import urlencode, quote
|
from urllib import urlencode, quote
|
||||||
import urlparse
|
import urlparse
|
||||||
|
from nose.tools import assert_in
|
||||||
|
|
||||||
from tests.common.utils import randstring, urljoin
|
from tests.common.utils import randstring, urljoin
|
||||||
from tests.api.apitestbase import ApiTestBase
|
from tests.api.apitestbase import ApiTestBase
|
||||||
@ -32,15 +33,70 @@ class FilesApiTest(ApiTestBase):
|
|||||||
|
|
||||||
def test_move_file(self):
|
def test_move_file(self):
|
||||||
with self.get_tmp_repo() as repo:
|
with self.get_tmp_repo() as repo:
|
||||||
_, furl = self.create_file(repo)
|
|
||||||
# TODO: create another repo here, and use it as dst_repo
|
# TODO: create another repo here, and use it as dst_repo
|
||||||
|
|
||||||
|
# create sub folder(dpath)
|
||||||
|
dpath, _ = self.create_dir(repo)
|
||||||
|
|
||||||
|
# create tmp file in sub folder(dpath)
|
||||||
|
tmp_file = 'tmp_file.txt'
|
||||||
|
file_path = dpath + '/' + tmp_file
|
||||||
|
furl = repo.get_filepath_url(file_path)
|
||||||
|
data = {'operation': 'create'}
|
||||||
|
res = self.post(furl, data=data, expected=201)
|
||||||
|
|
||||||
|
# copy tmp file from sub folder(dpath) to dst dir('/')
|
||||||
data = {
|
data = {
|
||||||
'operation': 'move',
|
|
||||||
'dst_repo': repo.repo_id,
|
'dst_repo': repo.repo_id,
|
||||||
'dst_dir': '/',
|
'dst_dir': '/',
|
||||||
|
'operation': 'copy',
|
||||||
}
|
}
|
||||||
res = self.post(furl, data=data)
|
u = urlparse.urlparse(furl)
|
||||||
self.assertEqual(res.text, '"success"')
|
parsed_furl = urlparse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
|
||||||
|
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
|
||||||
|
assert_in(tmp_file, res.text)
|
||||||
|
|
||||||
|
# get info of copied file in dst dir('/')
|
||||||
|
fdurl = repo.file_url + u'detail/?p=/%s' % quote(tmp_file)
|
||||||
|
detail = self.get(fdurl).json()
|
||||||
|
self.assertIsNotNone(detail)
|
||||||
|
self.assertIsNotNone(detail['id'])
|
||||||
|
|
||||||
|
# copy tmp file from sub folder(dpath) to dst dir('/') again
|
||||||
|
# for test can rename file if a file with the same name is dst dir
|
||||||
|
data = {
|
||||||
|
'dst_repo': repo.repo_id,
|
||||||
|
'dst_dir': '/',
|
||||||
|
'operation': 'copy',
|
||||||
|
}
|
||||||
|
u = urlparse.urlparse(furl)
|
||||||
|
parsed_furl = urlparse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
|
||||||
|
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
|
||||||
|
assert_in('tmp_file (1).txt', res.text)
|
||||||
|
|
||||||
|
# copy tmp file from sub folder(dpath) to dst dir('/') again
|
||||||
|
# for test can rename file if a file with the same name is dst dir
|
||||||
|
data = {
|
||||||
|
'dst_repo': repo.repo_id,
|
||||||
|
'dst_dir': '/',
|
||||||
|
'operation': 'copy',
|
||||||
|
}
|
||||||
|
u = urlparse.urlparse(furl)
|
||||||
|
parsed_furl = urlparse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
|
||||||
|
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
|
||||||
|
assert_in('tmp_file (2).txt', res.text)
|
||||||
|
|
||||||
|
# then move file to dst dir
|
||||||
|
data = {
|
||||||
|
'dst_repo': repo.repo_id,
|
||||||
|
'dst_dir': '/',
|
||||||
|
'operation': 'move',
|
||||||
|
}
|
||||||
|
u = urlparse.urlparse(furl)
|
||||||
|
parsed_furl = urlparse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
|
||||||
|
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
|
||||||
|
assert_in('tmp_file%20%283%29.txt', res.text)
|
||||||
|
|
||||||
|
|
||||||
def test_copy_file(self):
|
def test_copy_file(self):
|
||||||
with self.get_tmp_repo() as repo:
|
with self.get_tmp_repo() as repo:
|
||||||
@ -65,7 +121,7 @@ class FilesApiTest(ApiTestBase):
|
|||||||
u = urlparse.urlparse(furl)
|
u = urlparse.urlparse(furl)
|
||||||
parsed_furl = urlparse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
|
parsed_furl = urlparse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
|
||||||
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
|
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
|
||||||
self.assertEqual(res.text, '"success"')
|
assert_in(tmp_file, res.text)
|
||||||
|
|
||||||
# get info of copied file in dst dir('/')
|
# get info of copied file in dst dir('/')
|
||||||
fdurl = repo.file_url + u'detail/?p=/%s' % quote(tmp_file)
|
fdurl = repo.file_url + u'detail/?p=/%s' % quote(tmp_file)
|
||||||
@ -73,6 +129,30 @@ class FilesApiTest(ApiTestBase):
|
|||||||
self.assertIsNotNone(detail)
|
self.assertIsNotNone(detail)
|
||||||
self.assertIsNotNone(detail['id'])
|
self.assertIsNotNone(detail['id'])
|
||||||
|
|
||||||
|
# copy tmp file from sub folder(dpath) to dst dir('/') again
|
||||||
|
# for test can rename file if a file with the same name is dst dir
|
||||||
|
data = {
|
||||||
|
'dst_repo': repo.repo_id,
|
||||||
|
'dst_dir': '/',
|
||||||
|
'operation': 'copy',
|
||||||
|
}
|
||||||
|
u = urlparse.urlparse(furl)
|
||||||
|
parsed_furl = urlparse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
|
||||||
|
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
|
||||||
|
assert_in('tmp_file (1).txt', res.text)
|
||||||
|
|
||||||
|
# copy tmp file from sub folder(dpath) to dst dir('/') again
|
||||||
|
# for test can rename file if a file with the same name is dst dir
|
||||||
|
data = {
|
||||||
|
'dst_repo': repo.repo_id,
|
||||||
|
'dst_dir': '/',
|
||||||
|
'operation': 'copy',
|
||||||
|
}
|
||||||
|
u = urlparse.urlparse(furl)
|
||||||
|
parsed_furl = urlparse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
|
||||||
|
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
|
||||||
|
assert_in('tmp_file (2).txt', res.text)
|
||||||
|
|
||||||
def test_download_file(self):
|
def test_download_file(self):
|
||||||
with self.get_tmp_repo() as repo:
|
with self.get_tmp_repo() as repo:
|
||||||
fname, furl = self.create_file(repo)
|
fname, furl = self.create_file(repo)
|
||||||
|
Loading…
Reference in New Issue
Block a user