1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-08-31 14:42:10 +00:00

Add permission check and quota check

This commit is contained in:
zhengxie
2015-07-14 11:01:48 +08:00
parent 1c6475d7d1
commit 0b61259b9d
5 changed files with 240 additions and 188 deletions

View File

@@ -13,17 +13,24 @@ import seaserv
from seaserv import seafile_api
from seahub.api2.authentication import TokenAuthentication
from seahub.api2.permissions import IsRepoAccessible
from seahub.api2.utils import api_error
from seahub.base.templatetags.seahub_tags import email2nickname
from seahub.utils import is_org_context, is_valid_username
from seahub.share.signals import share_repo_to_user_successful
from seahub.share.views import check_user_share_quota
from seahub.utils import (is_org_context, is_valid_username,
send_perm_audit_msg)
logger = logging.getLogger(__name__)
json_content_type = 'application/json; charset=utf-8'
class DirSharedItemsEndpoint(APIView):
"""Support uniform interface(list, share, unshare, modify) for sharing
library/folder to users/groups.
"""
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAuthenticated,)
permission_classes = (IsAuthenticated, IsRepoAccessible)
throttle_classes = (UserRateThrottle, )
def list_user_shared_items(self, request, repo_id, path):
@@ -64,9 +71,6 @@ class DirSharedItemsEndpoint(APIView):
})
return ret
# def add_user_shared_item(self, request, repo_id, path):
# pass
def handle_shared_to_args(self, request):
share_type = request.GET.get('share_type', None)
shared_to_user = False
@@ -120,6 +124,8 @@ class DirSharedItemsEndpoint(APIView):
return sub_repo
def get(self, request, repo_id, format=None):
"""List shared items(shared to users/groups) for a folder/library.
"""
repo = seafile_api.get_repo(repo_id)
if not repo:
return api_error(status.HTTP_400_BAD_REQUEST, 'Repo not found.')
@@ -184,6 +190,9 @@ class DirSharedItemsEndpoint(APIView):
seafile_api.set_share_permission(shared_repo.id, username,
shared_to, permission)
send_perm_audit_msg('modify-repo-perm', username, shared_to,
shared_repo.id, path, permission)
if shared_to_group:
gid = request.GET.get('group_id')
try:
@@ -202,6 +211,9 @@ class DirSharedItemsEndpoint(APIView):
seafile_api.set_group_repo_permission(gid, shared_repo.id,
permission)
send_perm_audit_msg('modify-repo-perm', username, gid,
shared_repo.id, path, permission)
return HttpResponse(json.dumps({'success': True}), status=200,
content_type=json_content_type)
@@ -211,8 +223,6 @@ class DirSharedItemsEndpoint(APIView):
if not repo:
return api_error(status.HTTP_400_BAD_REQUEST, 'Repo not found.')
# TODO: perm check, quota check
path = request.GET.get('p', '/')
if seafile_api.get_dir_id_by_path(repo.id, path) is None:
return api_error(status.HTTP_400_BAD_REQUEST, 'Directory not found.')
@@ -239,12 +249,20 @@ class DirSharedItemsEndpoint(APIView):
if share_type == 'user':
share_to_users = request.DATA.getlist('username')
for to_user in share_to_users:
if not check_user_share_quota(username, shared_repo, users=[to_user]):
return api_error(status.HTTP_403_FORBIDDEN,
'Failed to share: No enough quota.')
try:
if is_org_context(request):
org_id = request.user.org.org_id
# org_share_repo(org_id, shared_repo.id, username, to_user, permission)
seaserv.seafserv_threaded_rpc.org_add_share(
org_id, shared_repo.id, username, to_user,
permission)
else:
seafile_api.share_repo(shared_repo.repo_id, username, to_user, permission)
seafile_api.share_repo(shared_repo.id, username,
to_user, permission)
# send a signal when sharing repo successful
share_repo_to_user_successful.send(sender=None,
from_user=username,
@@ -258,6 +276,9 @@ class DirSharedItemsEndpoint(APIView):
},
"permission": permission
})
send_perm_audit_msg('add-repo-perm', username, to_user,
shared_repo.id, path, permission)
except SearpcError as e:
logger.error(e)
failed.append(to_user)
@@ -274,6 +295,10 @@ class DirSharedItemsEndpoint(APIView):
if not group:
return api_error(status.HTTP_400_BAD_REQUEST, 'Group not found: %s' % gid)
if not check_user_share_quota(username, shared_repo, groups=[group]):
return api_error(status.HTTP_403_FORBIDDEN,
'Failed to share: No enough quota.')
try:
if is_org_context(request):
org_id = request.user.org.org_id
@@ -283,7 +308,6 @@ class DirSharedItemsEndpoint(APIView):
else:
seafile_api.set_group_repo(shared_repo.repo_id, gid,
username, permission)
# todo: perm audit msg
success.append({
"share_type": "group",
@@ -293,6 +317,9 @@ class DirSharedItemsEndpoint(APIView):
},
"permission": permission
})
send_perm_audit_msg('add-repo-perm', username, gid,
shared_repo.id, path, permission)
except SearpcError as e:
logger.error(e)
failed.append(group.group_name)
@@ -335,10 +362,16 @@ class DirSharedItemsEndpoint(APIView):
if is_org_context(request):
org_id = request.user.org.org_id
# org_remove_share(org_id, repo_id, from_email, shared_to)
seaserv.seafserv_threaded_rpc.org_remove_share(
org_id, shared_repo.id, username, shared_to)
else:
seaserv.remove_share(shared_repo.id, username, shared_to)
permission = seafile_api.check_permission_by_path(repo.id, path,
shared_to)
send_perm_audit_msg('delete-repo-perm', username, shared_to,
shared_repo.id, path, permission)
if shared_to_group:
group_id = request.GET.get('group_id')
try:
@@ -346,11 +379,21 @@ class DirSharedItemsEndpoint(APIView):
except ValueError:
return api_error(status.HTTP_400_BAD_REQUEST, 'Bad group id')
# hacky way to get group repo permission
permission = ''
for e in seafile_api.list_repo_shared_group(username, shared_repo.id):
if e.group_id == group_id:
permission = e.perm
break
if is_org_context(request):
org_id = request.user.org.org_id
seaserv.del_org_group_repo(shared_repo.id, org_id, group_id)
else:
seafile_api.unset_group_repo(shared_repo.id, group_id, username)
send_perm_audit_msg('delete-repo-perm', username, group_id,
shared_repo.id, path, permission)
return HttpResponse(json.dumps({'success': True}), status=200,
content_type=json_content_type)

View File

@@ -41,7 +41,7 @@ urlpatterns = patterns('',
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/dir/$', DirView.as_view(), name='DirView'),
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/dir/sub_repo/$', DirSubRepoView.as_view()),
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/dir/share/$', DirShareView.as_view()),
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/dir/shared_items/$', DirSharedItemsEndpoint.as_view()),
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/dir/shared_items/$', DirSharedItemsEndpoint.as_view(), name="api2-dir-shared-items"),
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/dir/download/$', DirDownloadView.as_view()),
url(r'^repos/(?P<repo_id>[-0-9-a-f]{36})/thumbnail/$', ThumbnailView.as_view(), name='api2-thumbnail'),
url(r'^starredfiles/', StarredFileView.as_view(), name='starredfiles'),

View File

@@ -61,7 +61,7 @@ class Fixtures(Exam):
def create_repo(self, **kwargs):
repo_id = seafile_api.create_repo('test-repo', '',
'test@test.com', None)
self.user.username, None)
return repo_id
def remove_repo(self, repo_id=None):

View File

@@ -0,0 +1,184 @@
import json
from seaserv import seafile_api
from seahub.test_utils import BaseTestCase
class DirSharedItemsTest(BaseTestCase):
def tearDown(self):
self.remove_repo()
def _add_shared_items(self):
sub_repo_id = seafile_api.create_virtual_repo(self.repo.id,
self.folder,
self.repo.name, '',
self.user.username)
# A user shares a folder to admin with permission 'rw'.
seafile_api.share_repo(sub_repo_id, self.user.username,
self.admin.username, 'rw')
# A user shares a folder to group with permission 'rw'.
seafile_api.set_group_repo(sub_repo_id, self.group.id,
self.user.username, 'rw')
def test_can_list_all(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % (
self.repo.id,
self.folder))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp) == 2
def test_list_without_repo_permission(self):
self._add_shared_items()
self.login_as(self.admin)
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % (
self.repo.id,
self.folder))
self.assertEqual(403, resp.status_code)
def test_can_list_without_share_type_arg(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s' % (
self.repo.id,
self.folder))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp) == 2
def test_can_share_folder_to_users(self):
self.login_as(self.user)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id,
self.folder),
"share_type=user&username=a@a.com&username=b@b.com",
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['success']) == 2
assert json_resp['success'][0]['permission'] == 'r'
def test_can_share_root_to_groups(self):
self.login_as(self.user)
grp1 = self.group
grp2 = self.create_group(group_name="test-grp2",
username=self.user.username)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=/' % (self.repo.id),
"share_type=group&group_id=%d&group_id=%d&permission=rw" % (grp1.id, grp2.id),
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['success']) == 2
assert json_resp['success'][0]['permission'] == 'rw'
def test_can_share_folder_to_groups(self):
self.login_as(self.user)
grp1 = self.group
grp2 = self.create_group(group_name="test-grp2",
username=self.user.username)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id,
self.folder),
"share_type=group&group_id=%d&group_id=%d&permission=rw" % (grp1.id, grp2.id),
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['success']) == 2
assert json_resp['success'][0]['permission'] == 'rw'
def test_can_modify_user_shared_repo(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
self.repo.id,
self.folder,
self.admin.username), {
'permission': 'r'
}
)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user' % (
self.repo.id,
self.folder))
json_resp = json.loads(resp.content)
assert json_resp[0]['permission'] == 'r'
def test_can_modify_group_shared_repo(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % (
self.repo.id,
self.folder,
self.group.id), {
'permission': 'r'
}
)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % (
self.repo.id,
self.folder))
json_resp = json.loads(resp.content)
assert json_resp[0]['permission'] == 'r'
def test_can_unshare_repo_to_user(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
self.repo.id,
self.folder,
self.admin.username
))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user' % (
self.repo.id,
self.folder))
json_resp = json.loads(resp.content)
assert len(json_resp) == 0
def test_can_unshare_repo_to_group(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % (
self.repo.id,
self.folder,
self.group.id
))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % (
self.repo.id,
self.folder))
json_resp = json.loads(resp.content)
assert len(json_resp) == 0

View File

@@ -1,9 +1,4 @@
#coding: UTF-8
import json
from seaserv import seafile_api
from seahub.test_utils import BaseTestCase
from tests.common.utils import urljoin
from tests.api.apitestbase import ApiTestBase
from tests.api.urls import SHARED_LINKS_URL, SHARED_LIBRARIES_URL, \
@@ -32,173 +27,3 @@ class SharesApiTest(ApiTestBase):
self.assertIsNotNone(fileshare['token'])
self.assertIsNotNone(fileshare['view_cnt'])
self.assertIsNotNone(fileshare['path'])
class DirSharedItemsTest(BaseTestCase):
def tearDown(self):
self.remove_repo()
def _add_shared_items(self):
sub_repo_id = seafile_api.create_virtual_repo(self.repo.id,
self.folder,
self.repo.name, '',
self.user.username)
# A user shares a folder to admin with permission 'rw'.
seafile_api.share_repo(sub_repo_id, self.user.username,
self.admin.username, 'rw')
# A user shares a folder to group with permission 'rw'.
seafile_api.set_group_repo(sub_repo_id, self.group.id,
self.user.username, 'rw')
def test_can_list_all(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % (
self.repo.id,
self.folder))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp) == 2
def test_can_list_without_share_type_arg(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s' % (
self.repo.id,
self.folder))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp) == 2
def test_can_share_folder_to_users(self):
self.login_as(self.user)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id,
self.folder),
"share_type=user&username=a@a.com&username=b@b.com",
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['success']) == 2
assert json_resp['success'][0]['permission'] == 'r'
def test_can_share_root_to_groups(self):
self.login_as(self.user)
grp1 = self.group
grp2 = self.create_group(group_name="test-grp2",
username=self.user.username)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=/' % (self.repo.id),
"share_type=group&group_id=%d&group_id=%d&permission=rw" % (grp1.id, grp2.id),
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['success']) == 2
assert json_resp['success'][0]['permission'] == 'rw'
def test_can_share_folder_to_groups(self):
self.login_as(self.user)
grp1 = self.group
grp2 = self.create_group(group_name="test-grp2",
username=self.user.username)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id,
self.folder),
"share_type=group&group_id=%d&group_id=%d&permission=rw" % (grp1.id, grp2.id),
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['success']) == 2
assert json_resp['success'][0]['permission'] == 'rw'
def test_can_modify_user_shared_repo(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
self.repo.id,
self.folder,
self.admin.username), {
'permission': 'r'
}
)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user' % (
self.repo.id,
self.folder))
json_resp = json.loads(resp.content)
assert json_resp[0]['permission'] == 'r'
def test_can_modify_group_shared_repo(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % (
self.repo.id,
self.folder,
self.group.id), {
'permission': 'r'
}
)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % (
self.repo.id,
self.folder))
json_resp = json.loads(resp.content)
assert json_resp[0]['permission'] == 'r'
def test_can_unshare_repo_to_user(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
self.repo.id,
self.folder,
self.admin.username
))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user' % (
self.repo.id,
self.folder))
json_resp = json.loads(resp.content)
assert len(json_resp) == 0
def test_can_unshare_repo_to_group(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % (
self.repo.id,
self.folder,
self.group.id
))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % (
self.repo.id,
self.folder))
json_resp = json.loads(resp.content)
assert len(json_resp) == 0