1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-04-28 03:10:45 +00:00

add async batch copy/move item api

This commit is contained in:
lian 2019-06-10 15:02:34 +08:00
parent c735506446
commit c06dc0f0a4
3 changed files with 767 additions and 3 deletions

View File

@ -33,8 +33,7 @@ from seahub.utils.repo import get_repo_owner, get_available_repo_perms, \
from seahub.views import check_folder_permission
from seahub.settings import MAX_PATH
from seahub.constants import PERMISSION_READ, PERMISSION_READ_WRITE, \
PERMISSION_ADMIN
from seahub.constants import PERMISSION_READ_WRITE, PERMISSION_READ
logger = logging.getLogger(__name__)
@ -1064,3 +1063,204 @@ class ReposBatchMoveItemView(APIView):
result['success'].append(common_dict)
return Response(result)
class ReposAsyncBatchCopyItemView(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAuthenticated, )
throttle_classes = (UserRateThrottle, )
def post(self, request):
""" Asynchronous multi copy files/folders.
Permission checking:
1. User must has `r/rw` permission for src folder.
2. User must has `rw` permission for dst folder.
Parameter:
{
"src_repo_id":"7460f7ac-a0ff-4585-8906-bb5a57d2e118",
"src_parent_dir":"/a/b/c/",
"src_dirents":["1.md", "2.md"],
"dst_repo_id":"a3fa768d-0f00-4343-8b8d-07b4077881db",
"dst_parent_dir":"/x/y/",
}
"""
# argument check
src_repo_id = request.data.get('src_repo_id', None)
if not src_repo_id:
error_msg = 'src_repo_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
src_parent_dir = request.data.get('src_parent_dir', None)
if not src_parent_dir:
error_msg = 'src_parent_dir invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
src_dirents = request.data.get('src_dirents', None)
if not src_dirents:
error_msg = 'src_dirents invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
dst_repo_id = request.data.get('dst_repo_id', None)
if not dst_repo_id:
error_msg = 'dst_repo_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
dst_parent_dir = request.data.get('dst_parent_dir', None)
if not dst_parent_dir:
error_msg = 'dst_parent_dir invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
# resource check
if not seafile_api.get_repo(src_repo_id):
error_msg = 'Library %s not found.' % src_repo_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
if not seafile_api.get_dir_id_by_path(src_repo_id, src_parent_dir):
error_msg = 'Folder %s not found.' % src_parent_dir
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
if not seafile_api.get_repo(dst_repo_id):
error_msg = 'Library %s not found.' % dst_repo_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
if not seafile_api.get_dir_id_by_path(dst_repo_id, dst_parent_dir):
error_msg = 'Folder %s not found.' % dst_parent_dir
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
# 1. User must has `r/rw` permission for src parent dir.
src_parent_permission = check_folder_permission(request, src_repo_id, src_parent_dir)
if src_parent_permission not in (PERMISSION_READ_WRITE,
PERMISSION_READ):
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
# 2. User must has `rw` permission for dst parent dir.
dst_parent_permission = check_folder_permission(request, dst_repo_id, dst_parent_dir)
if dst_parent_permission != PERMISSION_READ_WRITE:
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
result = {}
username = request.user.username
formated_src_dirents = [dirent.strip('/') for dirent in src_dirents]
src_multi = "\t".join(formated_src_dirents)
dst_multi = "\t".join(formated_src_dirents)
try:
res = seafile_api.copy_file(src_repo_id, src_parent_dir, src_multi,
dst_repo_id, dst_parent_dir, dst_multi,
username=username, need_progress=1,
synchronous=0)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
result = {}
result['task_id'] = res.task_id if res.background else ''
return Response(result)
class ReposAsyncBatchMoveItemView(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAuthenticated, )
throttle_classes = (UserRateThrottle, )
def post(self, request):
""" Asynchronous multi move files/folders.
Permission checking:
1. User must has `rw` permission for src folder.
2. User must has `rw` permission for dst folder.
Parameter:
{
"src_repo_id":"7460f7ac-a0ff-4585-8906-bb5a57d2e118",
"src_parent_dir":"/a/b/c/",
"src_dirents":["1.md", "2.md"],
"dst_repo_id":"a3fa768d-0f00-4343-8b8d-07b4077881db",
"dst_parent_dir":"/x/y/",
}
"""
# argument check
src_repo_id = request.data.get('src_repo_id', None)
if not src_repo_id:
error_msg = 'src_repo_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
src_parent_dir = request.data.get('src_parent_dir', None)
if not src_parent_dir:
error_msg = 'src_parent_dir invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
src_dirents = request.data.get('src_dirents', None)
if not src_dirents:
error_msg = 'src_dirents invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
dst_repo_id = request.data.get('dst_repo_id', None)
if not dst_repo_id:
error_msg = 'dst_repo_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
dst_parent_dir = request.data.get('dst_parent_dir', None)
if not dst_parent_dir:
error_msg = 'dst_parent_dir invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
# resource check
if not seafile_api.get_repo(src_repo_id):
error_msg = 'Library %s not found.' % src_repo_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
if not seafile_api.get_dir_id_by_path(src_repo_id, src_parent_dir):
error_msg = 'Folder %s not found.' % src_parent_dir
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
if not seafile_api.get_repo(dst_repo_id):
error_msg = 'Library %s not found.' % dst_repo_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
if not seafile_api.get_dir_id_by_path(dst_repo_id, dst_parent_dir):
error_msg = 'Folder %s not found.' % dst_parent_dir
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
# 1. User must has `rw` permission for src parent dir.
if check_folder_permission(request, src_repo_id, src_parent_dir) != PERMISSION_READ_WRITE:
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
# 2. User must has `rw` permission for dst parent dir.
if check_folder_permission(request, dst_repo_id, dst_parent_dir) != PERMISSION_READ_WRITE:
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
result = {}
username = request.user.username
formated_src_dirents = [dirent.strip('/') for dirent in src_dirents]
src_multi = "\t".join(formated_src_dirents)
dst_multi = "\t".join(formated_src_dirents)
try:
res = seafile_api.move_file(src_repo_id, src_parent_dir, src_multi,
dst_repo_id, dst_parent_dir, dst_multi,
replace=False, username=username,
need_progress=1, synchronous=0)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
result = {}
result['task_id'] = res.task_id if res.background else ''
return Response(result)

View File

@ -44,7 +44,8 @@ from seahub.api2.endpoints.upload_links import UploadLinks, UploadLink, \
UploadLinkUpload
from seahub.api2.endpoints.repos_batch import ReposBatchView, \
ReposBatchCopyDirView, ReposBatchCreateDirView, \
ReposBatchCopyItemView, ReposBatchMoveItemView
ReposBatchCopyItemView, ReposBatchMoveItemView, \
ReposAsyncBatchCopyItemView, ReposAsyncBatchMoveItemView
from seahub.api2.endpoints.repos import RepoView, ReposView
from seahub.api2.endpoints.file import FileView
from seahub.api2.endpoints.file_history import FileHistoryView, NewFileHistoryView
@ -313,6 +314,8 @@ urlpatterns = [
url(r'^api/v2.1/repos/batch-create-dir/$', ReposBatchCreateDirView.as_view(), name='api-v2.1-repos-batch-create-dir'),
url(r'^api/v2.1/repos/batch-copy-item/$', ReposBatchCopyItemView.as_view(), name='api-v2.1-repos-batch-copy-item'),
url(r'^api/v2.1/repos/batch-move-item/$', ReposBatchMoveItemView.as_view(), name='api-v2.1-repos-batch-move-item'),
url(r'^api/v2.1/repos/async-batch-copy-item/$', ReposAsyncBatchCopyItemView.as_view(), name='api-v2.1-repos-async-batch-copy-item'),
url(r'^api/v2.1/repos/async-batch-move-item/$', ReposAsyncBatchMoveItemView.as_view(), name='api-v2.1-repos-async-batch-move-item'),
## user::deleted repos
url(r'^api/v2.1/deleted-repos/$', DeletedRepos.as_view(), name='api2-v2.1-deleted-repos'),

View File

@ -421,3 +421,564 @@ class ReposBatchCreateDirViewTest(BaseTestCase):
path_2) is not None
assert seafile_api.get_dir_id_by_path(self.repo_id,
path_3) is not None
class ReposAsyncBatchCopyItemView(BaseTestCase):
def create_new_repo(self, username):
new_repo_id = seafile_api.create_repo(name=randstring(10),
desc='', username=username, passwd=None)
return new_repo_id
def setUp(self):
self.user_name = self.user.username
self.admin_name = self.admin.username
self.src_repo_id = self.repo.id
self.dst_repo_id = self.create_new_repo(self.user_name)
self.file_path = self.file
self.file_name = os.path.basename(self.file_path)
self.folder_path = self.folder
self.folder_name = os.path.basename(self.folder)
self.url = reverse('api-v2.1-repos-async-batch-copy-item')
def tearDown(self):
self.remove_repo(self.src_repo_id)
self.remove_repo(self.dst_repo_id)
def test_can_copy(self):
self.login_as(self.user)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data),
'application/json')
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
task_id = json_resp['task_id']
assert len(task_id) == 36
# progress_url = reverse('api-v2.1-query-copy-move-progress') + '?task_id=%s' % task_id
# count = 1
# while True:
# count += 1
# resp = self.client.get(progress_url)
# json_resp = json.loads(resp.content)
# if json_resp['done'] == 1 or count == 10:
# break
#
# # items remain in src folder
# assert seafile_api.get_dir_id_by_path(self.src_repo_id, self.folder_path) is not None
# assert seafile_api.get_file_id_by_path(self.src_repo_id, self.file_path) is not None
#
# # items in dst folder
# assert seafile_api.get_file_id_by_path(self.dst_repo_id, self.file_path) is not None
# assert seafile_api.get_dir_id_by_path(self.dst_repo_id, self.folder_path) is not None
def test_copy_with_invalid_parameter(self):
self.login_as(self.user)
data = {
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
def test_copy_with_repo_not_exist(self):
self.login_as(self.user)
invalid_repo_id = 'd53fe97e-919a-42f9-a29f-042d285ba6fb'
data = {
"src_repo_id": invalid_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(404, resp.status_code)
invalid_repo_id = 'd53fe97e-919a-42f9-a29f-042d285ba6fb'
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": invalid_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(404, resp.status_code)
def test_copy_with_folder_not_exist(self):
self.login_as(self.user)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": 'invalid_folder',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(404, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": 'invalid_folder',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(404, resp.status_code)
def test_copy_with_invalid_repo_permission(self):
tmp_repo_id = self.create_new_repo(self.admin_name)
self.login_as(self.user)
data = {
"src_repo_id": tmp_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": tmp_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
def test_copy_with_invalid_src_folder_permission(self):
self.login_as(self.user)
# share admin's tmp repo to user with 'cloud-edit' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'cloud-edit')
data = {
"src_repo_id": admin_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.src_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
self.remove_repo(admin_repo_id)
# share admin's tmp repo to user with 'preivew' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'preview')
data = {
"src_repo_id": admin_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.src_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
self.remove_repo(admin_repo_id)
def test_copy_with_invalid_dst_folder_permission(self):
self.login_as(self.user)
# share admin's tmp repo to user with 'r' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'r')
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": admin_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
self.remove_repo(admin_repo_id)
# share admin's tmp repo to user with 'cloud-edit' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'cloud-edit')
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": admin_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
self.remove_repo(admin_repo_id)
# share admin's tmp repo to user with 'preview' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'preivew')
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": admin_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
self.remove_repo(admin_repo_id)
class ReposAsyncBatchMoveItemView(BaseTestCase):
def create_new_repo(self, username):
new_repo_id = seafile_api.create_repo(name=randstring(10),
desc='', username=username, passwd=None)
return new_repo_id
def setUp(self):
self.user_name = self.user.username
self.admin_name = self.admin.username
self.src_repo_id = self.repo.id
self.dst_repo_id = self.create_new_repo(self.user_name)
self.file_path = self.file
self.file_name = os.path.basename(self.file_path)
self.folder_path = self.folder
self.folder_name = os.path.basename(self.folder)
self.url = reverse('api-v2.1-repos-async-batch-move-item')
def tearDown(self):
self.remove_repo(self.src_repo_id)
self.remove_repo(self.dst_repo_id)
def test_can_move(self):
self.login_as(self.user)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data),
'application/json')
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
task_id = json_resp['task_id']
assert len(task_id) == 36
# progress_url = reverse('api-v2.1-query-copy-move-progress') + '?task_id=%s' % task_id
# count = 1
# while True:
# count += 1
# resp = self.client.get(progress_url)
# json_resp = json.loads(resp.content)
# if json_resp['done'] == 1 or count == 10:
# break
#
# # items NOT in src folder
# assert seafile_api.get_dir_id_by_path(self.src_repo_id, self.folder_path) is None
# assert seafile_api.get_file_id_by_path(self.src_repo_id, self.file_path) is None
#
# # items in dst folder
# assert seafile_api.get_file_id_by_path(self.dst_repo_id, self.file_path) is not None
# assert seafile_api.get_dir_id_by_path(self.dst_repo_id, self.folder_path) is not None
def test_move_with_invalid_parameter(self):
self.login_as(self.user)
data = {
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(400, resp.status_code)
def test_move_with_repo_not_exist(self):
self.login_as(self.user)
invalid_repo_id = 'd53fe97e-919a-42f9-a29f-042d285ba6fb'
data = {
"src_repo_id": invalid_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(404, resp.status_code)
invalid_repo_id = 'd53fe97e-919a-42f9-a29f-042d285ba6fb'
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": invalid_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(404, resp.status_code)
def test_move_with_folder_not_exist(self):
self.login_as(self.user)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": 'invalid_folder',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(404, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": 'invalid_folder',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(404, resp.status_code)
def test_move_with_invalid_repo_permission(self):
tmp_repo_id = self.create_new_repo(self.admin_name)
self.login_as(self.user)
data = {
"src_repo_id": tmp_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.dst_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": tmp_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
def test_move_with_invalid_src_folder_permission(self):
self.login_as(self.user)
# share admin's tmp repo to user with 'r' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'r')
data = {
"src_repo_id": admin_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.src_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
self.remove_repo(admin_repo_id)
# share admin's tmp repo to user with 'cloud-edit' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'cloud-edit')
data = {
"src_repo_id": admin_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.src_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
self.remove_repo(admin_repo_id)
# share admin's tmp repo to user with 'preivew' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'preview')
data = {
"src_repo_id": admin_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": self.src_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
self.remove_repo(admin_repo_id)
def test_move_with_invalid_dst_folder_permission(self):
self.login_as(self.user)
# share admin's tmp repo to user with 'r' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'r')
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": admin_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
# share admin's tmp repo to user with 'cloud-edit' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'cloud-edit')
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": admin_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)
# share admin's tmp repo to user with 'preview' permission
admin_repo_id = self.create_new_repo(self.admin_name)
seafile_api.share_repo(admin_repo_id, self.admin_name,
self.user_name, 'preview')
data = {
"src_repo_id": self.src_repo_id,
"src_parent_dir": '/',
"src_dirents":[self.folder_name, self.file_name],
"dst_repo_id": admin_repo_id,
"dst_parent_dir": '/',
}
resp = self.client.post(self.url, json.dumps(data), 'application/json')
self.assertEqual(403, resp.status_code)