1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-03 07:55:36 +00:00

[api] rm virtual repo related code (#1279)

This commit is contained in:
lian
2016-07-13 17:34:16 +08:00
committed by Daniel Pan
parent 9ca09845ad
commit fe236eca03
2 changed files with 352 additions and 181 deletions

View File

@@ -1,6 +1,5 @@
import logging import logging
import json import json
import os
from django.http import HttpResponse from django.http import HttpResponse
from pysearpc import SearpcError from pysearpc import SearpcError
@@ -110,39 +109,6 @@ class DirSharedItemsEndpoint(APIView):
return (shared_to_user, shared_to_group) return (shared_to_user, shared_to_group)
def get_sub_repo_by_path(self, request, repo, path):
if path == '/':
raise Exception("Invalid path")
# get or create sub repo
username = request.user.username
if is_org_context(request):
org_id = request.user.org.org_id
sub_repo = seaserv.seafserv_threaded_rpc.get_org_virtual_repo(
org_id, repo.id, path, username)
else:
sub_repo = seafile_api.get_virtual_repo(repo.id, path, username)
return sub_repo
def get_or_create_sub_repo_by_path(self, request, repo, path):
username = request.user.username
sub_repo = self.get_sub_repo_by_path(request, repo, path)
if not sub_repo:
name = os.path.basename(path)
# create a sub-lib,
# use name as 'repo_name' & 'repo_desc' for sub_repo
if is_org_context(request):
org_id = request.user.org.org_id
sub_repo_id = seaserv.seafserv_threaded_rpc.create_org_virtual_repo(
org_id, repo.id, path, name, name, username)
else:
sub_repo_id = seafile_api.create_virtual_repo(repo.id, path,
name, name, username)
sub_repo = seafile_api.get_repo(sub_repo_id)
return sub_repo
def get_repo_owner(self, request, repo_id): def get_repo_owner(self, request, repo_id):
if is_org_context(request): if is_org_context(request):
return seafile_api.get_org_repo_owner(repo_id) return seafile_api.get_org_repo_owner(repo_id)
@@ -197,20 +163,6 @@ class DirSharedItemsEndpoint(APIView):
if seafile_api.get_dir_id_by_path(repo.id, path) is None: if seafile_api.get_dir_id_by_path(repo.id, path) is None:
return api_error(status.HTTP_404_NOT_FOUND, 'Folder %s not found.' % path) return api_error(status.HTTP_404_NOT_FOUND, 'Folder %s not found.' % path)
if path == '/':
shared_repo = repo
else:
try:
sub_repo = self.get_sub_repo_by_path(request, repo, path)
if sub_repo:
shared_repo = sub_repo
else:
# unlikely to happen
return api_error(status.HTTP_404_NOT_FOUND, 'Failed to get sub repo')
except SearpcError as e:
logger.error(e)
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, 'Internal Server Error')
if shared_to_user: if shared_to_user:
shared_to = request.GET.get('username') shared_to = request.GET.get('username')
if shared_to is None or not is_valid_username(shared_to): if shared_to is None or not is_valid_username(shared_to):
@@ -223,11 +175,19 @@ class DirSharedItemsEndpoint(APIView):
if is_org_context(request): if is_org_context(request):
org_id = request.user.org.org_id org_id = request.user.org.org_id
seaserv.seafserv_threaded_rpc.org_set_share_permission( if path == '/':
org_id, shared_repo.id, username, shared_to, permission) seafile_api.org_set_share_permission(
org_id, repo_id, username, shared_to, permission)
else: else:
seafile_api.set_share_permission(shared_repo.id, username, seafile_api.org_update_share_subdir_perm_for_user(
shared_to, permission) org_id, repo_id, path, username, shared_to, permission)
else:
if path == '/':
seafile_api.set_share_permission(
repo_id, username, shared_to, permission)
else:
seafile_api.update_share_subdir_perm_for_user(
repo_id, path, username, shared_to, permission)
send_perm_audit_msg('modify-repo-perm', username, shared_to, send_perm_audit_msg('modify-repo-perm', username, shared_to,
repo_id, path, permission) repo_id, path, permission)
@@ -244,11 +204,18 @@ class DirSharedItemsEndpoint(APIView):
if is_org_context(request): if is_org_context(request):
org_id = request.user.org.org_id org_id = request.user.org.org_id
if path == '/':
seaserv.seafserv_threaded_rpc.set_org_group_repo_permission( seaserv.seafserv_threaded_rpc.set_org_group_repo_permission(
org_id, gid, shared_repo.id, permission) org_id, gid, repo.id, permission)
else: else:
seafile_api.set_group_repo_permission(gid, shared_repo.id, seafile_api.org_update_share_subdir_perm_for_group(
permission) org_id, repo_id, path, username, gid, permission)
else:
if path == '/':
seafile_api.set_group_repo_permission(gid, repo.id, permission)
else:
seafile_api.update_share_subdir_perm_for_group(
repo_id, path, username, gid, permission)
send_perm_audit_msg('modify-repo-perm', username, gid, send_perm_audit_msg('modify-repo-perm', username, gid,
repo_id, path, permission) repo_id, path, permission)
@@ -269,15 +236,6 @@ class DirSharedItemsEndpoint(APIView):
if username != self.get_repo_owner(request, repo_id): if username != self.get_repo_owner(request, repo_id):
return api_error(status.HTTP_403_FORBIDDEN, 'Permission denied.') return api_error(status.HTTP_403_FORBIDDEN, 'Permission denied.')
if path != '/':
try:
sub_repo = self.get_or_create_sub_repo_by_path(request, repo, path)
except SearpcError as e:
logger.error(e)
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, 'Failed to get sub repo.')
else:
sub_repo = None
share_type = request.data.get('share_type') share_type = request.data.get('share_type')
if share_type != 'user' and share_type != 'group': if share_type != 'user' and share_type != 'group':
return api_error(status.HTTP_400_BAD_REQUEST, 'share_type invalid.') return api_error(status.HTTP_400_BAD_REQUEST, 'share_type invalid.')
@@ -286,7 +244,6 @@ class DirSharedItemsEndpoint(APIView):
if permission not in ['r', 'rw']: if permission not in ['r', 'rw']:
return api_error(status.HTTP_400_BAD_REQUEST, 'permission invalid.') return api_error(status.HTTP_400_BAD_REQUEST, 'permission invalid.')
shared_repo = repo if path == '/' else sub_repo
result = {} result = {}
result['failed'] = [] result['failed'] = []
result['success'] = [] result['success'] = []
@@ -313,18 +270,27 @@ class DirSharedItemsEndpoint(APIView):
try: try:
if is_org_context(request): if is_org_context(request):
org_id = request.user.org.org_id org_id = request.user.org.org_id
if path == '/':
seaserv.seafserv_threaded_rpc.org_add_share( seaserv.seafserv_threaded_rpc.org_add_share(
org_id, shared_repo.id, username, to_user, org_id, repo_id, username, to_user,
permission) permission)
else: else:
seafile_api.share_repo(shared_repo.id, username, seafile_api.org_share_subdir_to_user(
to_user, permission) org_id, repo_id, path, username, to_user,
permission)
else:
if path == '/':
seafile_api.share_repo(
repo_id, username, to_user, permission)
else:
seafile_api.share_subdir_to_user(repo_id,
path, username, to_user, permission)
# send a signal when sharing repo successful # send a signal when sharing repo successful
share_repo_to_user_successful.send(sender=None, share_repo_to_user_successful.send(sender=None,
from_user=username, from_user=username,
to_user=to_user, to_user=to_user,
repo=shared_repo) repo=repo)
result['success'].append({ result['success'].append({
"share_type": "user", "share_type": "user",
"user_info": { "user_info": {
@@ -358,15 +324,24 @@ class DirSharedItemsEndpoint(APIView):
try: try:
if is_org_context(request): if is_org_context(request):
org_id = request.user.org.org_id org_id = request.user.org.org_id
seafile_api.add_org_group_repo(shared_repo.repo_id, if path == '/':
org_id, gid, username, seafile_api.add_org_group_repo(
repo_id, org_id, gid, username, permission)
else:
seafile_api.org_share_subdir_to_group(
org_id, repo_id, path, username, gid,
permission) permission)
else: else:
seafile_api.set_group_repo(shared_repo.repo_id, gid, if path == '/':
username, permission) seafile_api.set_group_repo(
repo_id, gid, username, permission)
else:
seafile_api.share_subdir_to_group(
repo_id, path, username, gid, permission)
share_repo_to_group_successful.send(sender=None, share_repo_to_group_successful.send(sender=None,
from_user=username, group_id=gid, repo=shared_repo) from_user=username, group_id=gid, repo=repo)
result['success'].append({ result['success'].append({
"share_type": "group", "share_type": "group",
@@ -405,19 +380,6 @@ class DirSharedItemsEndpoint(APIView):
shared_to_user, shared_to_group = self.handle_shared_to_args(request) shared_to_user, shared_to_group = self.handle_shared_to_args(request)
if path == '/':
shared_repo = repo
else:
try:
sub_repo = self.get_sub_repo_by_path(request, repo, path)
if sub_repo:
shared_repo = sub_repo
else:
return api_error(status.HTTP_404_NOT_FOUND, 'Sub-library not found.')
except SearpcError as e:
logger.error(e)
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, 'Failed to get sub-library.')
if shared_to_user: if shared_to_user:
shared_to = request.GET.get('username') shared_to = request.GET.get('username')
if shared_to is None or not is_valid_username(shared_to): if shared_to is None or not is_valid_username(shared_to):
@@ -425,14 +387,23 @@ class DirSharedItemsEndpoint(APIView):
# if user not found, permission will be None # if user not found, permission will be None
permission = seafile_api.check_permission_by_path( permission = seafile_api.check_permission_by_path(
shared_repo.id, '/', shared_to) repo_id, '/', shared_to)
if is_org_context(request): if is_org_context(request):
org_id = request.user.org.org_id org_id = request.user.org.org_id
if path == '/':
seaserv.seafserv_threaded_rpc.org_remove_share( seaserv.seafserv_threaded_rpc.org_remove_share(
org_id, shared_repo.id, username, shared_to) org_id, repo_id, username, shared_to)
else: else:
seaserv.remove_share(shared_repo.id, username, shared_to) seafile_api.org_unshare_subdir_for_user(
org_id, repo_id, path, username, shared_to)
else:
if path == '/':
seaserv.remove_share(repo_id, username, shared_to)
else:
seafile_api.unshare_subdir_for_user(
repo_id, path, username, shared_to)
send_perm_audit_msg('delete-repo-perm', username, shared_to, send_perm_audit_msg('delete-repo-perm', username, shared_to,
repo_id, path, permission) repo_id, path, permission)
@@ -449,10 +420,10 @@ class DirSharedItemsEndpoint(APIView):
if is_org_context(request): if is_org_context(request):
org_id = request.user.org.org_id org_id = request.user.org.org_id
shared_groups = seafile_api.list_org_repo_shared_group( shared_groups = seafile_api.list_org_repo_shared_group(
org_id, username, shared_repo.id) org_id, username, repo_id)
else: else:
shared_groups = seafile_api.list_repo_shared_group( shared_groups = seafile_api.list_repo_shared_group(
username, shared_repo.id) username, repo_id)
for e in shared_groups: for e in shared_groups:
if e.group_id == group_id: if e.group_id == group_id:
@@ -461,9 +432,17 @@ class DirSharedItemsEndpoint(APIView):
if is_org_context(request): if is_org_context(request):
org_id = request.user.org.org_id org_id = request.user.org.org_id
seaserv.del_org_group_repo(shared_repo.id, org_id, group_id) if path == '/':
seaserv.del_org_group_repo(repo_id, org_id, group_id)
else: else:
seafile_api.unset_group_repo(shared_repo.id, group_id, username) seafile_api.org_unshare_subdir_for_group(
org_id, repo_id, path, username, group_id)
else:
if path == '/':
seafile_api.unset_group_repo(repo_id, group_id, username)
else:
seafile_api.unshare_subdir_for_group(
repo_id, path, username, group_id)
send_perm_audit_msg('delete-repo-perm', username, group_id, send_perm_audit_msg('delete-repo-perm', username, group_id,
repo_id, path, permission) repo_id, path, permission)

View File

@@ -7,55 +7,104 @@ from tests.common.utils import randstring
from seahub.test_utils import BaseTestCase from seahub.test_utils import BaseTestCase
class DirSharedItemsTest(BaseTestCase): class DirSharedItemsTest(BaseTestCase):
def setUp(self):
self.user_name = self.user.username
self.admin_name = self.admin.username
self.repo_id = self.repo.id
self.folder_path = self.folder
self.group_id = self.group.id
def tearDown(self): def tearDown(self):
self.remove_repo() self.remove_repo()
self.remove_group()
def _add_shared_items(self): def share_repo_to_user_and_group(self):
sub_repo_id = seafile_api.create_virtual_repo(self.repo.id,
self.folder, # share repo to user
self.repo.name, '', seafile_api.share_repo(self.repo_id,
self.user.username) self.user_name, self.admin_name, 'rw')
# A user shares a folder to admin with permission 'rw'.
seafile_api.share_repo(sub_repo_id, self.user.username, # share repo to group
self.admin.username, 'rw') seafile_api.set_group_repo(self.repo_id,
# A user shares a folder to group with permission 'rw'. self.group_id, self.user_name, 'rw')
seafile_api.set_group_repo(sub_repo_id, self.group.id,
self.user.username, 'rw') def share_folder_to_user_and_group(self):
# share folder to user
seafile_api.share_subdir_to_user(self.repo_id,
self.folder_path, self.user_name, self.admin_name, 'rw')
# share folder to group
seafile_api.share_subdir_to_group(self.repo_id,
self.folder_path, self.user_name, self.group_id, 'rw')
# test get request
def test_can_list_repo_share_info(self):
self.share_repo_to_user_and_group()
def test_can_list_all(self):
self._add_shared_items()
self.login_as(self.user) self.login_as(self.user)
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % ( resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=/&share_type=user,group' % self.repo_id)
self.repo.id,
self.folder))
self.assertEqual(200, resp.status_code) self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content) json_resp = json.loads(resp.content)
assert len(json_resp) == 2 assert len(json_resp) == 2
def test_list_without_repo_permission(self): def test_can_list_folder_share_info(self):
self._add_shared_items()
self.share_folder_to_user_and_group()
self.login_as(self.user)
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % (
self.repo_id, self.folder))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp) == 2
def test_list_with_invalid_user_permission(self):
self.share_folder_to_user_and_group()
self.login_as(self.admin) self.login_as(self.admin)
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % ( resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user,group' % (
self.repo.id, self.repo_id, self.folder))
self.folder))
self.assertEqual(403, resp.status_code) self.assertEqual(403, resp.status_code)
def test_can_list_without_share_type_arg(self): def test_list_without_share_type_arg(self):
self._add_shared_items()
self.share_folder_to_user_and_group()
self.login_as(self.user) self.login_as(self.user)
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s' % ( resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s' % (
self.repo.id, self.repo_id, self.folder))
self.folder))
self.assertEqual(200, resp.status_code) self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content) json_resp = json.loads(resp.content)
assert len(json_resp) == 2 assert len(json_resp) == 2
# test put request
def test_can_share_repo_to_users(self):
self.login_as(self.user)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=/' % self.repo_id,
"share_type=user&username=%s" % self.admin.email,
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['success']) == 1
assert json_resp['success'][0]['permission'] == 'r'
def test_can_share_folder_to_users(self): def test_can_share_folder_to_users(self):
self.login_as(self.user) self.login_as(self.user)
@@ -70,37 +119,7 @@ class DirSharedItemsTest(BaseTestCase):
assert len(json_resp['success']) == 1 assert len(json_resp['success']) == 1
assert json_resp['success'][0]['permission'] == 'r' assert json_resp['success'][0]['permission'] == 'r'
def test_share_folder_to_invalid_email(self): def test_can_share_repo_to_groups(self):
self.login_as(self.user)
invalid_email = '%s' % randstring(6)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id,
self.folder),
"share_type=user&username=%s" % invalid_email,
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['failed']) == 1
assert invalid_email in json_resp['failed'][0]['email']
def test_share_folder_to_unregistered_user(self):
self.login_as(self.user)
unregistered_user = '%s@%s.com' % (randstring(6), randstring(6))
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id,
self.folder),
"share_type=user&username=%s" % unregistered_user,
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['failed']) == 1
assert unregistered_user in json_resp['failed'][0]['email']
def test_can_share_root_to_groups(self):
self.login_as(self.user) self.login_as(self.user)
grp1 = self.group grp1 = self.group
@@ -135,8 +154,73 @@ class DirSharedItemsTest(BaseTestCase):
assert len(json_resp['success']) == 2 assert len(json_resp['success']) == 2
assert json_resp['success'][0]['permission'] == 'rw' assert json_resp['success'][0]['permission'] == 'rw'
def test_can_modify_user_shared_repo(self): def test_share_with_invalid_email(self):
self._add_shared_items() self.login_as(self.user)
invalid_email = '%s' % randstring(6)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id,
self.folder),
"share_type=user&username=%s" % invalid_email,
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['failed']) == 1
assert invalid_email in json_resp['failed'][0]['email']
def test_share_with_unregistered_user(self):
self.login_as(self.user)
unregistered_user = '%s@%s.com' % (randstring(6), randstring(6))
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=%s' % (self.repo.id,
self.folder),
"share_type=user&username=%s" % unregistered_user,
'application/x-www-form-urlencoded',
)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert len(json_resp['failed']) == 1
assert unregistered_user in json_resp['failed'][0]['email']
def test_share_with_invalid_ownership(self):
self.share_repo_to_user_and_group()
# admin can visit user sub-folder with 'rw' permission
assert seafile_api.check_permission_by_path(self.repo.id,
'/', self.admin.username) == 'rw'
self.login_as(self.admin)
resp = self.client.put(
'/api2/repos/%s/dir/shared_items/?p=/' % self.repo.id,
"share_type=user&username=%s" % self.admin.username,
'application/x-www-form-urlencoded',
)
self.assertEqual(403, resp.status_code)
# test post request
def test_can_modify_user_repo_share_perm(self):
self.share_repo_to_user_and_group()
self.login_as(self.user)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=/&share_type=user&username=%s' % (
self.repo.id,
self.admin.username), {
'permission': 'r'
}
)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=/&share_type=user' % self.repo_id)
json_resp = json.loads(resp.content)
assert json_resp[0]['permission'] == 'r'
def test_can_modify_user_folder_share_perm(self):
self.share_folder_to_user_and_group()
self.login_as(self.user) self.login_as(self.user)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % ( resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
@@ -155,8 +239,59 @@ class DirSharedItemsTest(BaseTestCase):
json_resp = json.loads(resp.content) json_resp = json.loads(resp.content)
assert json_resp[0]['permission'] == 'r' assert json_resp[0]['permission'] == 'r'
def test_modify_shared_repo_with_unregistered_user(self): def test_can_modify_group_repo_share_perm(self):
self._add_shared_items() self.share_repo_to_user_and_group()
self.login_as(self.user)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=/&share_type=group&group_id=%d' % (
self.repo.id,
self.group.id), {
'permission': 'r'
}
)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=/&share_type=group' % self.repo_id)
json_resp = json.loads(resp.content)
assert json_resp[0]['permission'] == 'r'
def test_can_modify_group_folder_share_perm(self):
self.share_folder_to_user_and_group()
self.login_as(self.user)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % (
self.repo.id,
self.folder,
self.group.id), {
'permission': 'r'
}
)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' %
(self.repo_id, self.folder))
json_resp = json.loads(resp.content)
assert json_resp[0]['permission'] == 'r'
def test_modify_with_invalid_email(self):
self.share_folder_to_user_and_group()
self.login_as(self.user)
invalid_email = '%s' % randstring(6)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
self.repo.id,
self.folder,
invalid_email), {
'permission': 'r'
}
)
self.assertEqual(400, resp.status_code)
def test_modify_with_unregistered_user(self):
self.share_folder_to_user_and_group()
self.login_as(self.user) self.login_as(self.user)
unregistered_user = '%s@%s.com' % (randstring(6), randstring(6)) unregistered_user = '%s@%s.com' % (randstring(6), randstring(6))
@@ -170,28 +305,41 @@ class DirSharedItemsTest(BaseTestCase):
) )
self.assertEqual(400, resp.status_code) self.assertEqual(400, resp.status_code)
def test_can_modify_group_shared_repo(self): def test_modify_with_invalid_ownership(self):
self._add_shared_items()
self.login_as(self.user)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % ( self.share_repo_to_user_and_group()
# admin can visit user sub-folder with 'rw' permission
assert seafile_api.check_permission_by_path(self.repo.id,
'/', self.admin.username) == 'rw'
self.login_as(self.admin)
resp = self.client.post('/api2/repos/%s/dir/shared_items/?p=/&share_type=user&username=%s' % (
self.repo.id, self.repo.id,
self.folder, self.admin.username), {
self.group.id), {
'permission': 'r' 'permission': 'r'
} }
) )
self.assertEqual(403, resp.status_code)
# test delete request
def test_can_unshare_repo_to_user(self):
self.share_repo_to_user_and_group()
self.login_as(self.user)
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=/&share_type=user&username=%s' % (
self.repo.id, self.admin.username))
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content) json_resp = json.loads(resp.content)
assert json_resp['success'] is True assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % ( resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=/&share_type=user' % self.repo_id)
self.repo.id,
self.folder))
json_resp = json.loads(resp.content) json_resp = json.loads(resp.content)
assert json_resp[0]['permission'] == 'r' assert len(json_resp) == 0
def test_can_unshare_repo_to_user(self): def test_can_unshare_folder_to_user(self):
self._add_shared_items() self.share_folder_to_user_and_group()
self.login_as(self.user) self.login_as(self.user)
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % ( resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
@@ -210,21 +358,25 @@ class DirSharedItemsTest(BaseTestCase):
json_resp = json.loads(resp.content) json_resp = json.loads(resp.content)
assert len(json_resp) == 0 assert len(json_resp) == 0
def test_unshare_repo_with_unregistered_user(self): def test_can_unshare_repo_to_group(self):
self._add_shared_items() self.share_repo_to_user_and_group()
self.login_as(self.user) self.login_as(self.user)
unregistered_user = '%s@%s.com' % (randstring(6), randstring(6)) resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=/&share_type=group&group_id=%d' % (
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
self.repo.id, self.repo.id,
self.folder, self.group.id
unregistered_user
)) ))
self.assertEqual(200, resp.status_code) self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['success'] is True
def test_can_unshare_repo_to_group(self): resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=/&share_type=group' % self.repo.id)
self._add_shared_items()
json_resp = json.loads(resp.content)
assert len(json_resp) == 0
def test_can_unshare_folder_to_group(self):
self.share_folder_to_user_and_group()
self.login_as(self.user) self.login_as(self.user)
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % ( resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group&group_id=%d' % (
@@ -237,8 +389,48 @@ class DirSharedItemsTest(BaseTestCase):
assert json_resp['success'] is True assert json_resp['success'] is True
resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % ( resp = self.client.get('/api2/repos/%s/dir/shared_items/?p=%s&share_type=group' % (
self.repo.id, self.repo.id, self.folder))
self.folder))
json_resp = json.loads(resp.content) json_resp = json.loads(resp.content)
assert len(json_resp) == 0 assert len(json_resp) == 0
def test_unshare_with_invalid_email(self):
self.share_folder_to_user_and_group()
self.login_as(self.user)
invalid_email = '%s' % randstring(6)
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
self.repo.id,
self.folder,
invalid_email
))
self.assertEqual(400, resp.status_code)
def test_unshare_with_unregistered_user(self):
self.share_folder_to_user_and_group()
self.login_as(self.user)
unregistered_user = '%s@%s.com' % (randstring(6), randstring(6))
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=%s&share_type=user&username=%s' % (
self.repo.id,
self.folder,
unregistered_user
))
self.assertEqual(200, resp.status_code)
def test_unshare_with_invalid_ownership(self):
self.share_repo_to_user_and_group()
# admin can visit user sub-folder with 'rw' permission
assert seafile_api.check_permission_by_path(self.repo.id,
'/', self.admin.username) == 'rw'
self.login_as(self.admin)
resp = self.client.delete('/api2/repos/%s/dir/shared_items/?p=/&share_type=user&username=%s' % (
self.repo.id,
self.admin.username
))
self.assertEqual(403, resp.status_code)