mirror of
https://github.com/haiwen/seahub.git
synced 2025-09-18 16:36:15 +00:00
Merge branch '6.0'
This commit is contained in:
@@ -112,10 +112,6 @@ def group_check(func):
|
||||
group.view_perm = "joined"
|
||||
group.is_staff = is_group_staff(group, request.user)
|
||||
return func(request, group, *args, **kwargs)
|
||||
if request.user.is_staff:
|
||||
# viewed by system admin
|
||||
group.view_perm = "sys_admin"
|
||||
return func(request, group, *args, **kwargs)
|
||||
|
||||
if group.is_pub:
|
||||
group.view_perm = "pub"
|
||||
|
@@ -570,47 +570,102 @@ def copy_move_common():
|
||||
result = {}
|
||||
content_type = 'application/json; charset=utf-8'
|
||||
|
||||
repo = get_repo(repo_id)
|
||||
if not repo:
|
||||
result['error'] = _(u'Library does not exist.')
|
||||
return HttpResponse(json.dumps(result), status=400,
|
||||
content_type=content_type)
|
||||
|
||||
# arguments validation
|
||||
# arguments check for src
|
||||
path = request.GET.get('path')
|
||||
obj_name = request.GET.get('obj_name')
|
||||
dst_repo_id = request.POST.get('dst_repo')
|
||||
dst_path = request.POST.get('dst_path')
|
||||
if not (path and obj_name and dst_repo_id and dst_path):
|
||||
if not (path and obj_name):
|
||||
result['error'] = _('Argument missing')
|
||||
return HttpResponse(json.dumps(result), status=400,
|
||||
content_type=content_type)
|
||||
|
||||
# resource check for src
|
||||
repo = get_repo(repo_id)
|
||||
if not repo:
|
||||
result['error'] = _(u'Library does not exist.')
|
||||
return HttpResponse(json.dumps(result), status=404,
|
||||
content_type=content_type)
|
||||
|
||||
full_obj_path = posixpath.join(path, obj_name)
|
||||
file_obj_id = seafile_api.get_file_id_by_path(repo_id, full_obj_path)
|
||||
dir_obj_id = seafile_api.get_dir_id_by_path(repo_id, full_obj_path)
|
||||
|
||||
if not file_obj_id and not dir_obj_id:
|
||||
result['error'] = _(u'"%s" does not exist.') % full_obj_path
|
||||
return HttpResponse(json.dumps(result), status=404,
|
||||
content_type=content_type)
|
||||
|
||||
# arguments chech for dst
|
||||
dst_repo_id = request.POST.get('dst_repo')
|
||||
dst_path = request.POST.get('dst_path')
|
||||
if not (dst_repo_id and dst_path):
|
||||
result['error'] = _('Argument missing')
|
||||
return HttpResponse(json.dumps(result), status=400,
|
||||
content_type=content_type)
|
||||
|
||||
# resource check for dst
|
||||
dst_repo = seafile_api.get_repo(dst_repo_id)
|
||||
if not dst_repo:
|
||||
result['error'] = _(u'Library does not exist.')
|
||||
return HttpResponse(json.dumps(result), status=404,
|
||||
content_type=content_type)
|
||||
|
||||
# resource check for dst
|
||||
# check file path
|
||||
if len(dst_path + obj_name) > settings.MAX_PATH:
|
||||
result['error'] = _('Destination path is too long.')
|
||||
return HttpResponse(json.dumps(result), status=400,
|
||||
content_type=content_type)
|
||||
|
||||
# resource check for dst
|
||||
# return error when dst is the same as src
|
||||
if repo_id == dst_repo_id and path == dst_path:
|
||||
result['error'] = _('Invalid destination path')
|
||||
return HttpResponse(json.dumps(result), status=400,
|
||||
content_type=content_type)
|
||||
|
||||
# permission check for dst
|
||||
# check whether user has write permission to dest repo
|
||||
# Leave src folder/file permission checking to corresponding views.
|
||||
# For 'move', check has read-write perm to src folder;
|
||||
# For 'cp', check has read perm to src folder.
|
||||
if check_folder_permission(request, dst_repo_id, dst_path) != 'rw':
|
||||
result['error'] = _('Permission denied')
|
||||
return HttpResponse(json.dumps(result), status=403,
|
||||
content_type=content_type)
|
||||
|
||||
# Leave src folder/file permission checking to corresponding
|
||||
# views.
|
||||
# For 'move', check has read-write perm to src folder;
|
||||
# For 'cp', check has read perm to src folder.
|
||||
# check quota for dst
|
||||
if file_obj_id:
|
||||
obj_size = seafile_api.get_file_size(repo.store_id,
|
||||
repo.version, file_obj_id)
|
||||
|
||||
return view_method(request, repo_id, path, dst_repo_id, dst_path,
|
||||
obj_name)
|
||||
if dir_obj_id:
|
||||
obj_size = seafile_api.get_dir_size(repo.store_id,
|
||||
repo.version, dir_obj_id)
|
||||
|
||||
# check quota
|
||||
out_of_quota = False
|
||||
try:
|
||||
if seafile_api.is_repo_owner(request.user.username, dst_repo_id):
|
||||
# if dst repo is my own repo, only check quota when copy.
|
||||
if view_method.__name__ in ('cp_file', 'cp_dir'):
|
||||
out_of_quota = seafile_api.check_quota(dst_repo_id, delta=obj_size)
|
||||
else:
|
||||
# if dst repo is NOT my own repo,
|
||||
# check quota when copy AND move.
|
||||
out_of_quota = seafile_api.check_quota(dst_repo_id, delta=obj_size)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
result['error'] = _(u'Internal server error')
|
||||
return HttpResponse(json.dumps(result), status=500,
|
||||
content_type=content_type)
|
||||
|
||||
if out_of_quota:
|
||||
result['error'] = _('Out of quota.')
|
||||
return HttpResponse(json.dumps(result), status=403,
|
||||
content_type=content_type)
|
||||
|
||||
return view_method(request, repo_id, path,
|
||||
dst_repo_id, dst_path, obj_name)
|
||||
|
||||
return _arguments_wrapper
|
||||
|
||||
@@ -671,6 +726,7 @@ def cp_file(request, src_repo_id, src_path, dst_repo_id, dst_path, obj_name):
|
||||
dst_repo_id, dst_path, new_obj_name,
|
||||
username, need_progress=1)
|
||||
except SearpcError as e:
|
||||
logger.error(e)
|
||||
res = None
|
||||
|
||||
if not res:
|
||||
@@ -757,6 +813,7 @@ def cp_dir(request, src_repo_id, src_path, dst_repo_id, dst_path, obj_name):
|
||||
dst_repo_id, dst_path, new_obj_name,
|
||||
username, need_progress=1)
|
||||
except SearpcError, e:
|
||||
logger.error(e)
|
||||
res = None
|
||||
|
||||
# res can be None or an object
|
||||
@@ -828,6 +885,44 @@ def dirents_copy_move_common():
|
||||
# operation, 1), if move file, check parent dir perm, 2), if move
|
||||
# folder, check that folder perm.
|
||||
|
||||
file_obj_size = 0
|
||||
for obj_name in obj_file_names:
|
||||
full_obj_path = posixpath.join(parent_dir, obj_name)
|
||||
file_obj_id = seafile_api.get_file_id_by_path(repo_id, full_obj_path)
|
||||
file_obj_size += seafile_api.get_file_size(
|
||||
repo.store_id, repo.version, file_obj_id)
|
||||
|
||||
dir_obj_size = 0
|
||||
for obj_name in obj_dir_names:
|
||||
full_obj_path = posixpath.join(parent_dir, obj_name)
|
||||
dir_obj_id = seafile_api.get_dir_id_by_path(repo_id, full_obj_path)
|
||||
dir_obj_size += seafile_api.get_dir_size(
|
||||
repo.store_id, repo.version, dir_obj_id)
|
||||
|
||||
# check quota
|
||||
out_of_quota = False
|
||||
try:
|
||||
if seafile_api.is_repo_owner(request.user.username, dst_repo_id):
|
||||
# if dst repo is my own repo, only check quota when copy.
|
||||
if view_method.__name__ == 'cp_dirents':
|
||||
out_of_quota = seafile_api.check_quota(
|
||||
dst_repo_id, delta=file_obj_size + dir_obj_size)
|
||||
else:
|
||||
# if dst repo is NOT my own repo,
|
||||
# check quota when copy AND move.
|
||||
out_of_quota = seafile_api.check_quota(
|
||||
dst_repo_id, delta=file_obj_size + dir_obj_size)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
result['error'] = _(u'Internal server error')
|
||||
return HttpResponse(json.dumps(result), status=500,
|
||||
content_type=content_type)
|
||||
|
||||
if out_of_quota:
|
||||
result['error'] = _('Out of quota.')
|
||||
return HttpResponse(json.dumps(result), status=403,
|
||||
content_type=content_type)
|
||||
|
||||
return view_method(request, repo_id, parent_dir, dst_repo_id,
|
||||
dst_path, obj_file_names, obj_dir_names)
|
||||
|
||||
|
30
tests/seahub/group/views/test_group_check.py
Normal file
30
tests/seahub/group/views/test_group_check.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from django.http import HttpResponse
|
||||
|
||||
from seahub.auth.models import AnonymousUser
|
||||
from seahub.group.views import group_check
|
||||
from seahub.test_utils import BaseTestCase
|
||||
|
||||
|
||||
@group_check
|
||||
def a_view(request, group):
|
||||
return HttpResponse('success')
|
||||
|
||||
class GroupCheckTest(BaseTestCase):
|
||||
def test_anonymous_user(self):
|
||||
self.fake_request.user = AnonymousUser()
|
||||
resp = a_view(self.fake_request, self.group.id)
|
||||
self.assertEqual(resp.status_code, 302)
|
||||
self.assertRegexpMatches(resp['Location'], '/accounts/login')
|
||||
|
||||
def test_group_user(self):
|
||||
self.fake_request.user = self.user
|
||||
resp = a_view(self.fake_request, self.group.id)
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
assert 'success' in resp.content
|
||||
|
||||
def test_admin_user(self):
|
||||
self.fake_request.user = self.admin
|
||||
resp = a_view(self.fake_request, self.group.id)
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
assert 'Permission denied' in resp.content
|
||||
assert 'success' not in resp.content
|
Reference in New Issue
Block a user