mirror of
https://github.com/haiwen/seafile-server.git
synced 2025-09-16 15:18:58 +00:00
Add test scripts of seaf-server api.
This commit is contained in:
@@ -67,6 +67,21 @@ class SeafileAPI(object):
|
||||
"""
|
||||
return seafserv_rpc.get_decrypt_key(repo_id, username)
|
||||
|
||||
def change_repo_passwd(self, repo_id, old_passwd, new_passwd, user):
|
||||
return seafserv_threaded_rpc.change_repo_passwd(repo_id, old_passwd,
|
||||
new_passwd, user)
|
||||
def check_passwd(self, repo_id, magic):
|
||||
return seafserv_threaded_rpc.check_passwd(repo_id, magic)
|
||||
|
||||
def set_passwd(self, repo_id, user, passwd):
|
||||
return seafserv_threaded_rpc.set_passwd(repo_id, user, passwd)
|
||||
|
||||
def unset_passwd(self, repo_id, user, passwd):
|
||||
return seafserv_threaded_rpc.unset_passwd(repo_id, user, passwd)
|
||||
|
||||
def generate_magic_and_random_key(self, enc_version, repo_id, password):
|
||||
return seafserv_threaded_rpc.generate_magic_and_random_key(enc_version, repo_id, password)
|
||||
|
||||
# repo manipulation
|
||||
|
||||
def create_repo(self, name, desc, username, passwd):
|
||||
@@ -148,9 +163,8 @@ class SeafileAPI(object):
|
||||
ret = None
|
||||
return ret
|
||||
|
||||
def change_repo_passwd(self, repo_id, old_passwd, new_passwd, user):
|
||||
return seafserv_threaded_rpc.change_repo_passwd(repo_id, old_passwd,
|
||||
new_passwd, user)
|
||||
def get_system_default_repo_id (self):
|
||||
return seafserv_threaded_rpc.get_system_default_repo_id()
|
||||
|
||||
# File property and dir listing
|
||||
|
||||
@@ -228,6 +242,18 @@ class SeafileAPI(object):
|
||||
def list_dir_with_perm(self, repo_id, dir_path, dir_id, user, offset=-1, limit=-1):
|
||||
return seafserv_threaded_rpc.list_dir_with_perm (repo_id, dir_path, dir_id, user, offset, limit)
|
||||
|
||||
def mkdir_with_parents (self, repo_id, parent_dir, relative_path, username):
|
||||
return seafserv_threaded_rpc.mkdir_with_parents(repo_id, parent_dir, relative_path, username)
|
||||
|
||||
def get_file_count_info_by_path(self, repo_id, path):
|
||||
return seafserv_threaded_rpc.get_file_count_info_by_path(repo_id, path)
|
||||
|
||||
def get_total_storage (self):
|
||||
return seafserv_threaded_rpc.get_total_storage()
|
||||
|
||||
def get_total_file_number (self):
|
||||
return seafserv_threaded_rpc.get_total_file_number()
|
||||
|
||||
# file/dir operations
|
||||
|
||||
def post_file(self, repo_id, tmp_file_path, parent_dir, filename, username):
|
||||
@@ -416,6 +442,9 @@ class SeafileAPI(object):
|
||||
"""
|
||||
return seafserv_threaded_rpc.list_repo_shared_to(from_user, repo_id)
|
||||
|
||||
def repo_has_been_shared(self, repo_id, including_groups=False):
|
||||
return True if seafserv_threaded_rpc.repo_has_been_shared(repo_id, 1 if including_groups else 0) else False
|
||||
|
||||
# share repo to group
|
||||
def group_share_repo(self, repo_id, group_id, username, permission):
|
||||
# deprecated, use ``set_group_repo``
|
||||
@@ -546,6 +575,33 @@ class SeafileAPI(object):
|
||||
"""
|
||||
return seafserv_threaded_rpc.get_shared_groups_for_subdir(repo_id, path, from_user)
|
||||
|
||||
def get_shared_users_by_repo(self, repo_id):
|
||||
users = []
|
||||
# get users that the repo is shared to
|
||||
shared_users = seafserv_threaded_rpc.get_shared_users_by_repo (repo_id)
|
||||
for user in shared_users:
|
||||
users.append(user.user)
|
||||
|
||||
# get users in groups that the repo is shared to
|
||||
group_ids = seafserv_threaded_rpc.get_shared_groups_by_repo(repo_id)
|
||||
if not group_ids:
|
||||
return users
|
||||
|
||||
ids = []
|
||||
for group_id in group_ids.split('\n'):
|
||||
if not group_id:
|
||||
continue
|
||||
ids.append(int(group_id))
|
||||
|
||||
json_ids = json.dumps(ids)
|
||||
group_users = ccnet_threaded_rpc.get_groups_members(json_ids)
|
||||
|
||||
for user in group_users:
|
||||
if user.user_name not in users:
|
||||
users.append(user.user_name)
|
||||
|
||||
return users
|
||||
|
||||
# organization wide repo
|
||||
def add_inner_pub_repo(self, repo_id, permission):
|
||||
return seafserv_threaded_rpc.set_inner_pub_repo(repo_id, permission)
|
||||
@@ -640,19 +696,6 @@ class SeafileAPI(object):
|
||||
def check_quota(self, repo_id, delta=0):
|
||||
return seafserv_threaded_rpc.check_quota(repo_id, delta)
|
||||
|
||||
# encrypted repo password management
|
||||
def check_passwd(self, repo_id, magic):
|
||||
return seafserv_threaded_rpc.check_passwd(repo_id, magic)
|
||||
|
||||
def set_passwd(self, repo_id, user, passwd):
|
||||
return seafserv_threaded_rpc.set_passwd(repo_id, user, passwd)
|
||||
|
||||
def unset_passwd(self, repo_id, user, passwd):
|
||||
return seafserv_threaded_rpc.unset_passwd(repo_id, user, passwd)
|
||||
|
||||
def generate_magic_and_random_key(self, enc_version, repo_id, password):
|
||||
return seafserv_threaded_rpc.generate_magic_and_random_key(enc_version, repo_id, password)
|
||||
|
||||
# virtual repo
|
||||
def create_virtual_repo(self, origin_repo_id, path, repo_name, repo_desc, owner, passwd=''):
|
||||
return seafserv_threaded_rpc.create_virtual_repo(origin_repo_id,
|
||||
@@ -686,30 +729,16 @@ class SeafileAPI(object):
|
||||
def get_trash_repos_by_owner(self, owner):
|
||||
return seafserv_threaded_rpc.get_trash_repos_by_owner(owner)
|
||||
|
||||
def get_trash_repo_owner (self, repo_id):
|
||||
return seafserv_threaded_rpc.get_trash_repo_owner(repo_id)
|
||||
|
||||
def empty_repo_trash(self):
|
||||
return seafserv_threaded_rpc.empty_repo_trash()
|
||||
|
||||
def empty_repo_trash_by_owner(self, owner):
|
||||
return seafserv_threaded_rpc.empty_repo_trash_by_owner(owner)
|
||||
|
||||
def get_total_file_number (self):
|
||||
return seafserv_threaded_rpc.get_total_file_number()
|
||||
|
||||
def get_total_storage (self):
|
||||
return seafserv_threaded_rpc.get_total_storage()
|
||||
|
||||
def get_system_default_repo_id (self):
|
||||
return seafserv_threaded_rpc.get_system_default_repo_id()
|
||||
|
||||
def get_file_count_info_by_path(self, repo_id, path):
|
||||
return seafserv_threaded_rpc.get_file_count_info_by_path(repo_id, path)
|
||||
|
||||
def get_trash_repo_owner (self, repo_id):
|
||||
return seafserv_threaded_rpc.get_trash_repo_owner(repo_id)
|
||||
|
||||
def mkdir_with_parents (self, repo_id, parent_dir, relative_path, username):
|
||||
return seafserv_threaded_rpc.mkdir_with_parents(repo_id, parent_dir, relative_path, username)
|
||||
|
||||
# Server config
|
||||
def get_server_config_int (self, group, key):
|
||||
return seafserv_threaded_rpc.get_server_config_int (group, key)
|
||||
|
||||
@@ -738,36 +767,6 @@ class SeafileAPI(object):
|
||||
def del_org_group_repo(self, repo_id, org_id, group_id):
|
||||
seafserv_threaded_rpc.del_org_group_repo(repo_id, org_id, group_id)
|
||||
|
||||
def repo_has_been_shared(self, repo_id, including_groups=False):
|
||||
return True if seafserv_threaded_rpc.repo_has_been_shared(repo_id, 1 if including_groups else 0) else False
|
||||
|
||||
def get_shared_users_by_repo(self, repo_id):
|
||||
users = []
|
||||
# get users that the repo is shared to
|
||||
shared_users = seafserv_threaded_rpc.get_shared_users_by_repo (repo_id)
|
||||
for user in shared_users:
|
||||
users.append(user.user)
|
||||
|
||||
# get users in groups that the repo is shared to
|
||||
group_ids = seafserv_threaded_rpc.get_shared_groups_by_repo(repo_id)
|
||||
if not group_ids:
|
||||
return users
|
||||
|
||||
ids = []
|
||||
for group_id in group_ids.split('\n'):
|
||||
if not group_id:
|
||||
continue
|
||||
ids.append(int(group_id))
|
||||
|
||||
json_ids = json.dumps(ids)
|
||||
group_users = ccnet_threaded_rpc.get_groups_members(json_ids)
|
||||
|
||||
for user in group_users:
|
||||
if user.user_name not in users:
|
||||
users.append(user.user_name)
|
||||
|
||||
return users
|
||||
|
||||
def org_get_shared_users_by_repo(self, org_id, repo_id):
|
||||
users = []
|
||||
# get users that the repo is shared to
|
||||
|
@@ -3960,7 +3960,7 @@ seaf_get_group_shared_repo_by_path (SeafRepoManager *mgr,
|
||||
GList *repo = NULL;
|
||||
GObject *ret = NULL;
|
||||
|
||||
/* If path is not NULL, 'repo_id' represents for the repo we want,
|
||||
/* If path is NULL, 'repo_id' represents for the repo we want,
|
||||
* otherwise, 'repo_id' represents for the origin repo,
|
||||
* find virtual repo by path first.
|
||||
*/
|
||||
|
@@ -37,6 +37,21 @@ def create_users():
|
||||
ADMIN_USER, ADMIN_PASSWORD, is_staff=True, is_active=True
|
||||
)
|
||||
|
||||
@pytest.yield_fixture(scope='function')
|
||||
def encrypted_repo():
|
||||
repo = create_and_get_repo(
|
||||
'test_repo_{}'.format(randstring(10)), '', USER, passwd='123'
|
||||
)
|
||||
try:
|
||||
seafile_api.post_dir(repo.id, '/', 'dir1', USER)
|
||||
seafile_api.post_dir(repo.id, '/', 'dir2', USER)
|
||||
seafile_api.post_dir(repo.id, '/dir1', 'subdir1', USER)
|
||||
seafile_api.post_dir(repo.id, '/dir2', 'subdir2', USER)
|
||||
yield repo
|
||||
finally:
|
||||
if seafile_api.get_repo(repo.id):
|
||||
# The repo may be deleted in the test case
|
||||
seafile_api.remove_repo(repo.id)
|
||||
|
||||
@pytest.yield_fixture(scope='function')
|
||||
def repo():
|
||||
@@ -62,6 +77,3 @@ def group():
|
||||
finally:
|
||||
if ccnet_api.get_group(group.id):
|
||||
ccnet_api.remove_group(group.id)
|
||||
|
||||
|
||||
|
||||
|
@@ -0,0 +1,120 @@
|
||||
import pytest
|
||||
import os
|
||||
import time
|
||||
from tests.config import USER
|
||||
from seaserv import seafile_api as api
|
||||
|
||||
file_name = 'test.txt'
|
||||
dir_name = 'test_dir'
|
||||
file_content = 'test file content'
|
||||
file_path = os.getcwd() + '/' + file_name
|
||||
|
||||
def create_the_file ():
|
||||
fp = open(file_path, 'w')
|
||||
fp.write(file_content)
|
||||
fp.close()
|
||||
|
||||
def test_file_property_and_dir_listing ():
|
||||
|
||||
t_repo_version = 1
|
||||
t_repo_id = api.create_repo('test_file_property_and_dir_listing', '', USER, passwd=None)
|
||||
|
||||
create_the_file()
|
||||
|
||||
api.post_file(t_repo_id, file_path, '/', file_name, USER)
|
||||
api.post_dir(t_repo_id, '/', dir_name, USER)
|
||||
api.post_file(t_repo_id, file_path, '/' + dir_name, file_name, USER)
|
||||
|
||||
#test is_valid_filename
|
||||
t_valid_file_name = 'valid_filename'
|
||||
t_invalid_file_name = '/invalid_filename'
|
||||
assert api.is_valid_filename(t_repo_id, t_valid_file_name)
|
||||
assert api.is_valid_filename(t_repo_id, t_invalid_file_name) == 0
|
||||
|
||||
#test get_file_id_by_path
|
||||
t_file_id = api.get_file_id_by_path(t_repo_id, '/test.txt')
|
||||
assert t_file_id
|
||||
|
||||
#test get_dir_id_by_path
|
||||
t_dir_id = api.get_dir_id_by_path(t_repo_id, '/test_dir')
|
||||
assert t_dir_id
|
||||
|
||||
#test get_file_size
|
||||
t_file_size = len(file_content)
|
||||
assert t_file_size == api.get_file_size(t_repo_id, t_repo_version, t_file_id)
|
||||
|
||||
#test get_dir_size
|
||||
t_dir_size = len(file_content)
|
||||
assert t_dir_size == api.get_dir_size(t_repo_id, t_repo_version, t_dir_id)
|
||||
|
||||
#test get_file_count_info_by_path
|
||||
t_file_count_info = api.get_file_count_info_by_path(t_repo_id , '/')
|
||||
assert t_file_count_info.file_count == 2
|
||||
assert t_file_count_info.dir_count == 1
|
||||
assert t_file_count_info.size == t_file_size + t_dir_size
|
||||
|
||||
#test get_file_id_by_commit_and_path
|
||||
t_file_id_tmp = t_file_id
|
||||
t_repo = api.get_repo(t_repo_id)
|
||||
assert t_repo
|
||||
t_commit_id = t_repo.head_cmmt_id
|
||||
t_file_id = api.get_file_id_by_commit_and_path(t_repo_id,
|
||||
t_commit_id,
|
||||
'/test.txt')
|
||||
|
||||
assert t_file_id == t_file_id_tmp
|
||||
|
||||
#test get_dirent_by_path
|
||||
std_file_mode = 0100000 | 0644
|
||||
t_dirent_obj = api.get_dirent_by_path(t_repo_id, '/test.txt')
|
||||
assert t_dirent_obj
|
||||
assert t_dirent_obj.obj_id == t_file_id
|
||||
assert t_dirent_obj.obj_name == 'test.txt'
|
||||
assert t_dirent_obj.mode == std_file_mode
|
||||
assert t_dirent_obj.version == t_repo_version
|
||||
assert t_dirent_obj.size == t_file_size
|
||||
assert t_dirent_obj.modifier == USER
|
||||
|
||||
#test list_file_by_file_id
|
||||
t_block_list = api.list_file_by_file_id(t_repo_id, t_file_id)
|
||||
assert t_block_list
|
||||
|
||||
#test list_blocks_by_file_id
|
||||
t_block_list = api.list_blocks_by_file_id(t_repo_id, t_file_id)
|
||||
assert t_block_list
|
||||
|
||||
#test list_dir_by_dir_id
|
||||
t_dir_list = api.list_dir_by_dir_id(t_repo_id, t_dir_id)
|
||||
assert len(t_dir_list) == 1
|
||||
|
||||
#test list_dir_by_path
|
||||
t_dir_list = api.list_dir_by_path(t_repo_id, '/test_dir')
|
||||
assert len(t_dir_list) == 1
|
||||
|
||||
#test get_dir_id_by_commit_and_path
|
||||
t_dir_id = api.get_dir_id_by_commit_and_path(t_repo_id, t_commit_id, '/test_dir')
|
||||
assert t_dir_id
|
||||
|
||||
#test list_dir_by_commit_and_path
|
||||
t_dir_list = api.list_dir_by_commit_and_path(t_repo_id, t_commit_id, '/test_dir')
|
||||
assert len(t_dir_list) == 1
|
||||
|
||||
#test list_dir_with_perm
|
||||
t_dir_list = api.list_dir_with_perm(t_repo_id, '/test_dir', t_dir_id, USER)
|
||||
assert len(t_dir_list) == 1
|
||||
|
||||
#test mkdir_with_parent
|
||||
api.mkdir_with_parents (t_repo_id, '/test_dir', 'test_subdir', USER)
|
||||
t_dir_id = api.get_dir_id_by_path(t_repo_id, '/test_dir/test_subdir')
|
||||
assert t_dir_id
|
||||
|
||||
#test get_total_storage
|
||||
t_total_size = api.get_total_storage()
|
||||
t_repo_size = api.get_repo_size(t_repo_id)
|
||||
assert t_total_size == t_repo_size
|
||||
|
||||
#get_total_file_number
|
||||
time.sleep(1)
|
||||
assert api.get_total_file_number() == 2
|
||||
|
||||
api.remove_repo(t_repo_id)
|
17
tests/test_password/test_password.py
Normal file
17
tests/test_password/test_password.py
Normal file
@@ -0,0 +1,17 @@
|
||||
import pytest
|
||||
from seaserv import seafile_api as api
|
||||
|
||||
def test_password (encrypted_repo):
|
||||
|
||||
old_passwd = '123'
|
||||
new_passwd = '456'
|
||||
|
||||
assert api.set_passwd(encrypted_repo.id, encrypted_repo.name, old_passwd) == 0
|
||||
assert api.get_decrypt_key(encrypted_repo.id, encrypted_repo.name)
|
||||
api.change_repo_passwd(encrypted_repo.repo_id, old_passwd, new_passwd, encrypted_repo.name) == 0
|
||||
assert api.set_passwd(encrypted_repo.id, encrypted_repo.name, new_passwd) == 0
|
||||
|
||||
assert api.is_password_set(encrypted_repo.id, encrypted_repo.name)
|
||||
assert api.unset_passwd(encrypted_repo.id, encrypted_repo.name, new_passwd) == 0
|
||||
assert api.is_password_set(encrypted_repo.id, encrypted_repo.name) == 0
|
||||
|
97
tests/test_repo_manipulation/test_repo_manipulation.py
Normal file
97
tests/test_repo_manipulation/test_repo_manipulation.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import pytest
|
||||
from tests.config import USER, USER2
|
||||
from seaserv import seafile_api as api
|
||||
|
||||
def test_repo_manipulation():
|
||||
|
||||
#test get_system_default_repo_id
|
||||
t_default_repo_id = api.get_system_default_repo_id()
|
||||
assert t_default_repo_id
|
||||
|
||||
#test create_repo
|
||||
t_repo_id = api.create_repo('test_repo_manipulation', '', USER, passwd=None)
|
||||
assert t_repo_id
|
||||
|
||||
#test counts_repo
|
||||
t_repo_count = 0
|
||||
t_repo_count = api.count_repos()
|
||||
assert t_repo_count != 0
|
||||
|
||||
#test get_repo ,edit_repo
|
||||
t_new_name = 'n_name'
|
||||
t_new_desc = 'n_desc'
|
||||
t_repo_version = 1
|
||||
t_repo = api.get_repo(t_repo_id)
|
||||
assert t_repo
|
||||
|
||||
api.edit_repo(t_repo_id, t_new_name, t_new_desc, USER)
|
||||
t_repo = api.get_repo(t_repo_id)
|
||||
assert t_repo.name == t_new_name and t_repo.desc == t_new_desc
|
||||
|
||||
#test revert_repo and get_commit
|
||||
t_commit_id_before_changing = t_repo.head_cmmt_id
|
||||
|
||||
api.post_dir(t_repo_id, '/', 'dir1', USER)
|
||||
t_repo = api.get_repo(t_repo_id)
|
||||
|
||||
api.revert_repo(t_repo_id, t_commit_id_before_changing, USER)
|
||||
|
||||
t_repo = api.get_repo(t_repo_id)
|
||||
t_commit_id_after_revert = t_repo.head_cmmt_id
|
||||
|
||||
t_commit_before_changing = api.get_commit(t_repo_id, t_repo_version, t_commit_id_before_changing)
|
||||
t_commit_after_revert = api.get_commit(t_repo_id, t_repo_version, t_commit_id_after_revert)
|
||||
assert t_commit_before_changing.root_id == t_commit_after_revert.root_id
|
||||
|
||||
#test is_repo_owner
|
||||
assert api.is_repo_owner(USER, t_repo_id)
|
||||
assert api.is_repo_owner(USER2, t_repo_id) == 0
|
||||
|
||||
#test get_repo_owner
|
||||
owner_get = api.get_repo_owner(t_repo_id)
|
||||
assert owner_get == USER
|
||||
|
||||
#test set_repo_owner
|
||||
api.set_repo_owner(t_repo_id, USER2)
|
||||
assert api.is_repo_owner(USER2, t_repo_id)
|
||||
|
||||
#test create_enc_repo
|
||||
t_enc_repo_id = '826d1b7b-f110-46f2-8d5e-7b5ac3e11f4d'
|
||||
t_enc_version = 2
|
||||
t_passwd = '123'
|
||||
magic_and_random_key = api.generate_magic_and_random_key (t_enc_version, t_enc_repo_id, t_passwd)
|
||||
t_magic = magic_and_random_key.magic
|
||||
t_random_key = magic_and_random_key.random_key
|
||||
t_enc_repo_id = api.create_enc_repo (t_enc_repo_id, 'test_encrypted_repo', '', USER, t_magic, t_random_key, t_enc_version)
|
||||
assert t_enc_repo_id == '826d1b7b-f110-46f2-8d5e-7b5ac3e11f4d'
|
||||
|
||||
#test get_repo_list
|
||||
t_start = -1;
|
||||
t_limit = -1;
|
||||
t_repo_list = api.get_repo_list(t_start, t_limit)
|
||||
assert t_repo_list and len(t_repo_list)
|
||||
|
||||
t_start = 1;
|
||||
t_limit = 1;
|
||||
t_repo_list = api.get_repo_list(t_start, t_limit)
|
||||
assert t_repo_list and len(t_repo_list) == 1
|
||||
|
||||
#test get_owned_repo_list
|
||||
t_repo_list = api.get_owned_repo_list(USER2)
|
||||
assert t_repo_list and len(t_repo_list)
|
||||
|
||||
#test get_commit_list
|
||||
t_offset = 0;
|
||||
t_limit = 0;
|
||||
t_commit_list = api.get_commit_list(t_repo_id, t_offset, t_limit)
|
||||
assert t_commit_list and len(t_commit_list) == 4
|
||||
|
||||
t_offset = 1;
|
||||
t_limit = 1;
|
||||
t_commit_list = api.get_commit_list(t_repo_id, t_offset, t_limit)
|
||||
assert t_commit_list and len(t_commit_list) == 1
|
||||
|
||||
#test remove_repo
|
||||
api.remove_repo(t_repo_id)
|
||||
t_repo = api.get_repo(t_repo_id)
|
||||
assert t_repo == None
|
41
tests/test_server_config/test_server_config.py
Normal file
41
tests/test_server_config/test_server_config.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import pytest
|
||||
from seaserv import seafile_api as api
|
||||
|
||||
def test_server_config():
|
||||
|
||||
#test_set_server_config_int and get_server_config_int
|
||||
t_group = 't_group'
|
||||
t_key = 't_key'
|
||||
t_value = 1
|
||||
api.set_server_config_int(t_group, t_key, t_value)
|
||||
t_ret = api.get_server_config_int(t_group, t_key)
|
||||
assert t_ret == t_value
|
||||
|
||||
#test_set_server_config_int64 and get_server_config_int64
|
||||
t_group = 't_group'
|
||||
t_key = 't_key'
|
||||
t_value = 9223372036854775807
|
||||
api.set_server_config_int64(t_group, t_key, t_value)
|
||||
t_ret = api.get_server_config_int64(t_group, t_key)
|
||||
assert t_ret == t_value
|
||||
|
||||
#test_set_server_config_string and get_server_config_string
|
||||
t_group = 't_group'
|
||||
t_key = 't_key'
|
||||
t_value = 't_value'
|
||||
api.set_server_config_string(t_group, t_key, t_value)
|
||||
t_ret = api.get_server_config_string(t_group, t_key)
|
||||
assert t_ret == t_value
|
||||
|
||||
#test_set_server_config_boolean and get_server_config_boolean
|
||||
t_group = 't_group'
|
||||
t_key = 't_key'
|
||||
t_value = True
|
||||
api.set_server_config_boolean(t_group, t_key, t_value)
|
||||
t_ret = api.get_server_config_boolean(t_group, t_key)
|
||||
assert t_ret == t_value
|
||||
|
||||
t_value = False
|
||||
api.set_server_config_boolean(t_group, t_key, t_value)
|
||||
t_ret = api.get_server_config_boolean(t_group, t_key)
|
||||
assert t_ret == t_value
|
@@ -11,12 +11,25 @@ def test_share_repo_to_user(repo, permission):
|
||||
assert api.check_permission(repo.id, USER) == 'rw'
|
||||
assert api.check_permission(repo.id, USER2) is None
|
||||
|
||||
assert api.repo_has_been_shared(repo.id) == False
|
||||
|
||||
api.share_repo(repo.id, USER, USER2, permission)
|
||||
assert api.check_permission(repo.id, USER2) == permission
|
||||
|
||||
assert api.repo_has_been_shared(repo.id)
|
||||
|
||||
repos = api.get_share_in_repo_list(USER2, 0, 1)
|
||||
assert_repo_with_permission(repo, repos, permission)
|
||||
|
||||
repos = api.get_share_out_repo_list(USER, 0, 1)
|
||||
assert_repo_with_permission(repo, repos, permission)
|
||||
|
||||
users = api.list_repo_shared_to(USER, repo.id)
|
||||
assert len (users) == 1
|
||||
assert users[0].repo_id == repo.id
|
||||
assert users[0].user == USER2
|
||||
assert users[0].perm == permission
|
||||
|
||||
api.remove_share(repo.id, USER, USER2)
|
||||
assert api.check_permission(repo.id, USER2) is None
|
||||
|
||||
@@ -38,6 +51,17 @@ def test_share_repo_to_group(repo, group, permission):
|
||||
repos = api.get_repos_by_group(group.id)
|
||||
assert_repo_with_permission(repo, repos, permission)
|
||||
|
||||
group_ids = api.get_shared_group_ids_by_repo(repo.id)
|
||||
assert group_ids and group_ids[len (group_ids) - 1] == '\n'
|
||||
|
||||
group_list = api.list_repo_shared_group_by_user(USER, repo.id)
|
||||
assert len(group_list) == 1
|
||||
group_list = api.list_repo_shared_group_by_user(USER2, repo.id)
|
||||
assert len(group_list) == 0
|
||||
|
||||
repo_get = api.get_group_shared_repo_by_path (repo.id, None, group.id)
|
||||
assert repo_get and repo_get.repo_id == repo.id
|
||||
|
||||
ccnet_api.group_add_member(group.id, USER, USER2)
|
||||
group_list = ccnet_api.get_groups(USER2)
|
||||
assert len(group_list) == 1
|
||||
@@ -49,7 +73,26 @@ def test_share_repo_to_group(repo, group, permission):
|
||||
|
||||
assert api.check_permission(repo.id, USER2) == permission
|
||||
|
||||
api.group_unshare_repo(repo.id, group.id, USER);
|
||||
group_list = api.get_group_repos_by_user (USER)
|
||||
assert len(group_list) == 1
|
||||
|
||||
repoids = api.get_group_repoids(group.id)
|
||||
assert len(repoids) == 1
|
||||
|
||||
repos = api.get_group_repos_by_owner(USER)
|
||||
assert len(repos) == 1
|
||||
api.remove_group_repos_by_owner(group.id, USER)
|
||||
repos = api.get_group_repos_by_owner(USER)
|
||||
assert len(repos) == 0
|
||||
|
||||
api.set_group_repo(repo.id, group.id, USER, permission)
|
||||
repos = api.get_repos_by_group(group.id)
|
||||
assert len(repos) == 1
|
||||
api.remove_group_repos(group.id)
|
||||
repos = api.get_repos_by_group(group.id)
|
||||
assert len(repos) == 0
|
||||
|
||||
api.group_unshare_repo(repo.id, group.id, USER)
|
||||
repos = api.get_repos_by_group(group.id)
|
||||
assert len(repos) == 0
|
||||
|
||||
@@ -65,6 +108,9 @@ def test_share_dir_to_user(repo, permission):
|
||||
vir_repo_2 = api.get_shared_repo_by_path(repo.id, '/dir2', USER2)
|
||||
assert vir_repo_2.permission == permission
|
||||
|
||||
users = api.get_shared_users_for_subdir(repo.id, '/dir1', USER)
|
||||
assert len(users) == 1 and users[0].user == USER2
|
||||
|
||||
assert api.del_file(repo.id, '/', 'dir1', USER) == 0
|
||||
assert api.unshare_subdir_for_user(repo.id, '/dir2', USER, USER2) == 0
|
||||
|
||||
@@ -80,8 +126,74 @@ def test_share_dir_to_group(repo, group, permission):
|
||||
assert api.check_permission(v_repo_id_1, USER2) == permission
|
||||
assert api.check_permission(v_repo_id_2, USER2) == permission
|
||||
|
||||
repo_get = api.get_group_shared_repo_by_path (repo.id, '/dir1', group.id)
|
||||
assert repo_get and repo_get.repo_id == v_repo_id_1
|
||||
|
||||
users = api.get_shared_groups_for_subdir(repo.id, '/dir1', USER)
|
||||
assert len(users) == 1
|
||||
|
||||
assert api.del_file(repo.id, '/', 'dir1', USER) == 0
|
||||
assert api.unshare_subdir_for_group(repo.id, '/dir2', USER, group.id) == 0
|
||||
|
||||
assert api.check_permission(v_repo_id_1, USER2) is None
|
||||
assert api.check_permission(v_repo_id_2, USER2) is None
|
||||
|
||||
@pytest.mark.parametrize('permission_to_share, permission_to_set', [('r', 'rw'), ('rw', 'r')])
|
||||
def test_set_share_permission(repo, permission_to_share, permission_to_set):
|
||||
assert api.check_permission(repo.id, USER2) == None
|
||||
|
||||
api.share_repo(repo.id, USER, USER2, permission_to_share)
|
||||
assert api.check_permission(repo.id, USER2) == permission_to_share
|
||||
|
||||
api.set_share_permission(repo.id, USER, USER2, permission_to_set)
|
||||
assert api.check_permission(repo.id, USER2) == permission_to_set
|
||||
|
||||
api.remove_share(repo.id, USER, USER2)
|
||||
|
||||
@pytest.mark.parametrize('permission_to_share, permission_to_set', [('r', 'rw'), ('rw', 'r')])
|
||||
def set_group_repo_permission(repo, group, permission_to_share, permission_to_set):
|
||||
ccnet_api.group_add_member(group.id, USER, USER2)
|
||||
assert api.check_permission(repo.id, USER2) == None
|
||||
|
||||
api.set_group_repo(repo.id, group.id, USER, permission_to_share)
|
||||
assert api.check_permission(repo.id, USER2) == permission_to_share
|
||||
|
||||
api.set_group_repo_permission(group.id, repo.id, permission_to_set)
|
||||
assert api.check_permission(repo.id, USER2) == permission_to_set
|
||||
|
||||
api.group_unshare_repo(repo.id, group.id, USER)
|
||||
|
||||
@pytest.mark.parametrize('permission_to_share, permission_to_update', [('r', 'rw'), ('rw', 'r')])
|
||||
def test_update_share_subdir_perm_for_user(repo, permission_to_share, permission_to_update):
|
||||
v_repo_id = api.share_subdir_to_user(repo.id, '/dir1', USER, USER2, permission_to_share)
|
||||
assert api.check_permission(v_repo_id, USER2) == permission_to_share
|
||||
|
||||
api.update_share_subdir_perm_for_user(repo.id, '/dir1', USER, USER2, permission_to_update)
|
||||
assert api.check_permission(v_repo_id, USER2) == permission_to_update
|
||||
|
||||
api.unshare_subdir_for_user(repo.id, '/dir1', USER, USER2) == 0
|
||||
|
||||
@pytest.mark.parametrize('permission_to_share, permission_to_update', [('r', 'rw'), ('rw', 'r')])
|
||||
def test_update_share_subdir_perm_for_group(repo, group, permission_to_update, permission_to_share):
|
||||
ccnet_api.group_add_member(group.id, USER, USER2)
|
||||
v_repo_id = api.share_subdir_to_group(repo.id, '/dir1', USER, group.id, permission_to_share)
|
||||
assert api.check_permission(v_repo_id, USER2) == permission_to_share
|
||||
|
||||
api.update_share_subdir_perm_for_group(repo.id, '/dir1', USER, group.id, permission_to_update)
|
||||
assert api.check_permission(v_repo_id, USER2) == permission_to_update
|
||||
|
||||
api.unshare_subdir_for_group(repo.id, '/dir1', USER, group.id)
|
||||
|
||||
@pytest.mark.parametrize('permission', ['r', 'rw'])
|
||||
def test_get_shared_users_by_repo(repo, group, permission):
|
||||
ccnet_api.group_add_member(group.id, USER, USER2)
|
||||
t_users = api.get_shared_users_by_repo(repo.id)
|
||||
assert len(t_users) == 0
|
||||
|
||||
api.share_repo(repo.id, USER, USER2, permission)
|
||||
api.set_group_repo(repo.id, group.id, ADMIN_USER, permission)
|
||||
t_users = api.get_shared_users_by_repo(repo.id)
|
||||
assert len(t_users) == 2
|
||||
|
||||
api.remove_share(repo.id, USER, USER2)
|
||||
api.group_unshare_repo(repo.id, group.id, USER)
|
||||
|
50
tests/test_trashed_repos/test_trashed_repos.py
Normal file
50
tests/test_trashed_repos/test_trashed_repos.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import pytest
|
||||
from tests.config import USER
|
||||
from seaserv import seafile_api as api
|
||||
|
||||
def test_trashed_repos(repo):
|
||||
|
||||
#test get_trash_repo_list
|
||||
t_start = -1
|
||||
t_limit = -1
|
||||
t_trash_repos_tmp = api.get_trash_repo_list(t_start, t_limit)
|
||||
api.remove_repo(repo.id)
|
||||
t_trash_repos = api.get_trash_repo_list(t_start, t_limit)
|
||||
assert len(t_trash_repos) == len(t_trash_repos_tmp) + 1
|
||||
t_trash_repos_tmp = t_trash_repos
|
||||
|
||||
#test get_trash_repo_owner
|
||||
t_owner = api.get_trash_repo_owner(repo.id)
|
||||
assert t_owner == USER
|
||||
|
||||
#test restore_repo_from_trash
|
||||
t_repo_get = api.get_repo(repo.id)
|
||||
assert t_repo_get == None
|
||||
api.restore_repo_from_trash(repo.id)
|
||||
t_repo_get = api.get_repo(repo.id)
|
||||
assert t_repo_get and t_repo_get.repo_id == repo.id
|
||||
|
||||
#test del_repo_from_trash
|
||||
api.del_repo_from_trash(repo.id)
|
||||
t_trash_repos = api.get_trash_repo_list(t_start, t_limit)
|
||||
assert len(t_trash_repos) == len(t_trash_repos_tmp) - 1
|
||||
|
||||
#test get_trash_repos_by_owner
|
||||
t_trash_repos_by_owner_tmp = api.get_trash_repos_by_owner(USER)
|
||||
api.remove_repo(repo.id)
|
||||
t_trash_repos_by_owner = api.get_trash_repos_by_owner(USER)
|
||||
assert len(t_trash_repos_by_owner) == len(t_trash_repos_by_owner_tmp) + 1
|
||||
|
||||
#test empty_repo_trash
|
||||
api.empty_repo_trash()
|
||||
t_trash_repos = api.get_trash_repo_list(t_start, t_limit)
|
||||
assert len(t_trash_repos) == 0
|
||||
|
||||
#test empty_repo_trash_by_owner
|
||||
t_repo_id = api.create_repo('test_trashed_repos', '', USER, passwd=None)
|
||||
api.remove_repo(t_repo_id)
|
||||
t_trash_repos_by_owner = api.get_trash_repos_by_owner(USER)
|
||||
assert len(t_trash_repos_by_owner) != 0
|
||||
api.empty_repo_trash_by_owner(USER)
|
||||
t_trash_repos_by_owner = api.get_trash_repos_by_owner(USER)
|
||||
assert len(t_trash_repos_by_owner) == 0
|
Reference in New Issue
Block a user