mirror of
https://github.com/haiwen/seahub.git
synced 2025-10-20 10:20:42 +00:00
@@ -19,7 +19,7 @@ from seahub.base.accounts import User
|
||||
from seahub.share.signals import share_repo_to_user_successful, \
|
||||
share_repo_to_group_successful
|
||||
from seahub.utils import is_org_context, send_perm_audit_msg, \
|
||||
normalize_dir_path
|
||||
normalize_dir_path, get_folder_permission_recursively
|
||||
from seahub.views import check_folder_permission
|
||||
from seahub.settings import MAX_PATH
|
||||
|
||||
@@ -440,3 +440,130 @@ class ReposBatchCopyDirView(APIView):
|
||||
result['success'].append(common_dict)
|
||||
|
||||
return Response(result)
|
||||
|
||||
|
||||
class ReposBatchCreateDirView(APIView):
|
||||
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAuthenticated, )
|
||||
throttle_classes = (UserRateThrottle, )
|
||||
|
||||
def post(self, request):
|
||||
""" Multi create folders.
|
||||
|
||||
Permission checking:
|
||||
1. user with `rw` permission for every layer of subdirectories.
|
||||
|
||||
Parameter:
|
||||
{
|
||||
"repo_id": "4dfdf5b6-806f-4a35-b2b7-604051d2114e",
|
||||
"paths": ["/1/2/", "/3/4/", "/5/6"]
|
||||
}
|
||||
"""
|
||||
|
||||
# argument check
|
||||
path_list = request.data.get('paths', None)
|
||||
if not path_list:
|
||||
error_msg = 'paths invalid.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
repo_id = request.data.get('repo_id', None)
|
||||
if not repo_id:
|
||||
error_msg = 'repo_id invalid.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
# resource check
|
||||
repo = seafile_api.get_repo(repo_id)
|
||||
if not repo:
|
||||
error_msg = 'Library %s not found.' % repo_id
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
# permission check
|
||||
if check_folder_permission(request, repo_id, '/') != 'rw':
|
||||
error_msg = 'Permission denied.'
|
||||
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
|
||||
|
||||
result = {}
|
||||
result['failed'] = []
|
||||
result['success'] = []
|
||||
username = request.user.username
|
||||
|
||||
for path in path_list:
|
||||
|
||||
common_dict = {
|
||||
'repo_id': repo_id,
|
||||
'path': path,
|
||||
}
|
||||
|
||||
path = normalize_dir_path(path)
|
||||
obj_name_list = path.strip('/').split('/')
|
||||
|
||||
for obj_name in obj_name_list:
|
||||
try:
|
||||
# check if path is valid
|
||||
is_valid_name = seafile_api.is_valid_filename(
|
||||
'fake_repo_id', obj_name)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_dict = {
|
||||
'error_msg': 'Internal Server Error'
|
||||
}
|
||||
common_dict.update(error_dict)
|
||||
result['failed'].append(common_dict)
|
||||
continue
|
||||
|
||||
if not is_valid_name:
|
||||
error_dict = {
|
||||
'error_msg': 'path invalid.'
|
||||
}
|
||||
common_dict.update(error_dict)
|
||||
result['failed'].append(common_dict)
|
||||
continue
|
||||
|
||||
if seafile_api.get_dir_id_by_path(repo_id, path):
|
||||
error_dict = {
|
||||
'error_msg': 'Folder already exists.'
|
||||
}
|
||||
common_dict.update(error_dict)
|
||||
result['failed'].append(common_dict)
|
||||
continue
|
||||
|
||||
# check parent directory's permission
|
||||
parent_dir = os.path.dirname(path.rstrip('/'))
|
||||
try:
|
||||
permission = get_folder_permission_recursively(
|
||||
username, repo_id, parent_dir)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_dict = {
|
||||
'error_msg': 'Internal Server Error'
|
||||
}
|
||||
common_dict.update(error_dict)
|
||||
result['failed'].append(common_dict)
|
||||
continue
|
||||
|
||||
if permission != 'rw':
|
||||
error_dict = {
|
||||
'error_msg': 'Permission denied.'
|
||||
}
|
||||
common_dict.update(error_dict)
|
||||
result['failed'].append(common_dict)
|
||||
continue
|
||||
|
||||
try:
|
||||
# TODO
|
||||
# move seafile_api.mkdir_with_parents() to CE version
|
||||
# rename obj name if name is existed
|
||||
seafile_api.mkdir_with_parents(repo_id, '/', path.strip('/'), username)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_dict = {
|
||||
'error_msg': 'Internal Server Error'
|
||||
}
|
||||
common_dict.update(error_dict)
|
||||
result['failed'].append(common_dict)
|
||||
continue
|
||||
|
||||
result['success'].append(common_dict)
|
||||
|
||||
return Response(result)
|
||||
|
@@ -27,7 +27,7 @@ from seahub.api2.endpoints.shared_folders import SharedFolders
|
||||
from seahub.api2.endpoints.shared_repos import SharedRepos, SharedRepo
|
||||
from seahub.api2.endpoints.upload_links import UploadLinks, UploadLink
|
||||
from seahub.api2.endpoints.repos_batch import ReposBatchView, \
|
||||
ReposBatchCopyDirView
|
||||
ReposBatchCopyDirView, ReposBatchCreateDirView
|
||||
from seahub.api2.endpoints.repos import RepoView
|
||||
from seahub.api2.endpoints.file import FileView
|
||||
from seahub.api2.endpoints.dir import DirView, DirDetailView
|
||||
@@ -217,6 +217,7 @@ urlpatterns = patterns(
|
||||
## user::repos-batch-operate
|
||||
url(r'^api/v2.1/repos/batch/$', ReposBatchView.as_view(), name='api-v2.1-repos-batch'),
|
||||
url(r'^api/v2.1/repos/batch-copy-dir/$', ReposBatchCopyDirView.as_view(), name='api-v2.1-repos-batch-copy-dir'),
|
||||
url(r'^api/v2.1/repos/batch-create-dir/$', ReposBatchCreateDirView.as_view(), name='api-v2.1-repos-batch-create-dir'),
|
||||
|
||||
## user::deleted repos
|
||||
url(r'^api/v2.1/deleted-repos/$', DeletedRepos.as_view(), name='api2-v2.1-deleted-repos'),
|
||||
|
@@ -1332,3 +1332,21 @@ def is_windows_operating_system(request):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_folder_permission_recursively(username, repo_id, path):
|
||||
""" Get folder permission recursively
|
||||
|
||||
Ger permission from the innermost layer of subdirectories to root
|
||||
directory.
|
||||
"""
|
||||
if not path or not isinstance(path, basestring):
|
||||
raise Exception('path invalid.')
|
||||
|
||||
if not seafile_api.get_dir_id_by_path(repo_id, path):
|
||||
# get current folder's parent directory
|
||||
path = os.path.dirname(path.rstrip('/'))
|
||||
return get_folder_permission_recursively(
|
||||
username, repo_id, path)
|
||||
else:
|
||||
return seafile_api.check_permission_by_path(
|
||||
repo_id, path, username)
|
||||
|
@@ -7,6 +7,12 @@ from tests.common.utils import randstring
|
||||
from seahub.test_utils import BaseTestCase
|
||||
from seahub.utils import normalize_dir_path
|
||||
|
||||
try:
|
||||
from seahub.settings import LOCAL_PRO_DEV_ENV
|
||||
except ImportError:
|
||||
LOCAL_PRO_DEV_ENV = False
|
||||
|
||||
|
||||
class ReposBatchViewTest(BaseTestCase):
|
||||
|
||||
def create_new_repo(self, username):
|
||||
@@ -222,6 +228,9 @@ class ReposBatchCopyDirView(BaseTestCase):
|
||||
|
||||
def test_copy_dir(self):
|
||||
|
||||
if not LOCAL_PRO_DEV_ENV:
|
||||
return
|
||||
|
||||
self.login_as(self.user)
|
||||
|
||||
# create two folders in src repo
|
||||
@@ -325,3 +334,90 @@ class ReposBatchCopyDirView(BaseTestCase):
|
||||
"The source path can not be '/'."
|
||||
|
||||
self.remove_repo(tmp_repo_id)
|
||||
|
||||
|
||||
class ReposBatchCreateDirViewTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.user_name = self.user.username
|
||||
self.admin_name = self.admin.username
|
||||
self.repo_id = self.repo.id
|
||||
self.url = reverse('api-v2.1-repos-batch-create-dir')
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_repo()
|
||||
self.remove_group()
|
||||
|
||||
def get_random_path(self):
|
||||
return '/%s/%s/%s/' % (randstring(2), \
|
||||
randstring(2), randstring(2))
|
||||
|
||||
def test_create_dir(self):
|
||||
|
||||
if not LOCAL_PRO_DEV_ENV:
|
||||
return
|
||||
|
||||
path_1 = self.get_random_path()
|
||||
path_2 = self.get_random_path()
|
||||
path_3 = self.get_random_path()
|
||||
|
||||
self.login_as(self.user)
|
||||
|
||||
data = {
|
||||
'repo_id': self.repo_id,
|
||||
'paths': [path_1, path_2, path_3],
|
||||
}
|
||||
resp = self.client.post(self.url, data)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp['success']) == 3
|
||||
assert len(json_resp['failed']) == 0
|
||||
|
||||
assert seafile_api.get_dir_id_by_path(self.repo_id,
|
||||
path_1) is not None
|
||||
assert seafile_api.get_dir_id_by_path(self.repo_id,
|
||||
path_2) is not None
|
||||
assert seafile_api.get_dir_id_by_path(self.repo_id,
|
||||
path_3) is not None
|
||||
|
||||
def test_create_dir_with_invalid_repo_permission(self):
|
||||
|
||||
# admin has NO permission for user's repo
|
||||
self.login_as(self.admin)
|
||||
|
||||
data = {
|
||||
'repo_id': self.repo_id,
|
||||
'paths': 'path',
|
||||
}
|
||||
resp = self.client.post(self.url, data)
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
def test_create_dir_with_invalid_folder_permission(self):
|
||||
|
||||
if not LOCAL_PRO_DEV_ENV:
|
||||
return
|
||||
|
||||
path_1 = self.get_random_path()
|
||||
path_2 = self.get_random_path()
|
||||
path_3 = self.get_random_path()
|
||||
|
||||
self.login_as(self.user)
|
||||
|
||||
data = {
|
||||
'repo_id': self.repo_id,
|
||||
'paths': [path_1, path_2, path_3],
|
||||
}
|
||||
resp = self.client.post(self.url, data)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp['success']) == 3
|
||||
assert len(json_resp['failed']) == 0
|
||||
|
||||
assert seafile_api.get_dir_id_by_path(self.repo_id,
|
||||
path_1) is not None
|
||||
assert seafile_api.get_dir_id_by_path(self.repo_id,
|
||||
path_2) is not None
|
||||
assert seafile_api.get_dir_id_by_path(self.repo_id,
|
||||
path_3) is not None
|
||||
|
Reference in New Issue
Block a user