mirror of
https://github.com/haiwen/seahub.git
synced 2025-09-02 15:38:15 +00:00
improve tests
This commit is contained in:
@@ -1,77 +1,141 @@
|
|||||||
import requests
|
#coding: UTF-8
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
import requests
|
||||||
import unittest
|
import unittest
|
||||||
from nose.tools import assert_equal # pylint: disable=E0611
|
from nose.tools import assert_equal, assert_in # pylint: disable=E0611
|
||||||
|
|
||||||
from common.common import BASE_URL, USERNAME, PASSWORD, IS_PRO
|
from tests.common.common import USERNAME, PASSWORD, IS_PRO
|
||||||
from common.utils import apiurl
|
from tests.common.utils import apiurl, urljoin, randstring
|
||||||
|
from tests.api.urls import TOKEN_URL, GROUPS_URL, ACCOUNTS_URL, REPOS_URL
|
||||||
|
|
||||||
BASE_URL = BASE_URL
|
class ApiTestBase(unittest.TestCase):
|
||||||
PING_URL = apiurl('/api2/ping/')
|
|
||||||
TOKEN_URL = apiurl('/api2/auth-token/')
|
|
||||||
AUTH_PING_URL = BASE_URL + u'/api2/auth/ping/'
|
|
||||||
|
|
||||||
ACCOUNTS_URL = BASE_URL + u'/api2/accounts/'
|
|
||||||
ACCOUNT_INFO_URL = BASE_URL + u'/api2/account/info/'
|
|
||||||
USERMSGS_URL = BASE_URL + u'/api2/user/msgs/' + USERNAME + u'/'
|
|
||||||
USERMSGS_COUNT_URL = BASE_URL + u'/api2/unseen_messages/'
|
|
||||||
GROUP_URL = BASE_URL + u'/api2/group/'
|
|
||||||
GROUPS_URL = BASE_URL + u'/api2/groups/'
|
|
||||||
GROUPMSGS_URL = BASE_URL + u'/api2/group/msgs/'
|
|
||||||
GROUPMSGS_NREPLY_URL = BASE_URL + u'/api2/new_replies/'
|
|
||||||
AVATAR_BASE_URL = BASE_URL + u'/api2/avatars/'
|
|
||||||
|
|
||||||
DEFAULT_LIBRARY_URL = BASE_URL + u'/api2/default-repo/'
|
|
||||||
LIBRARIES_URL = BASE_URL + u'/api2/repos/'
|
|
||||||
VIRTUAL_LIBRARIES_URL = BASE_URL + u'/api2/virtual-repos/'
|
|
||||||
STARREDFILES_URL = BASE_URL + u'/api2/starredfiles/'
|
|
||||||
SHARED_LINKS_URL = BASE_URL + u'/api2/shared-links/'
|
|
||||||
SHARED_LIBRARIES_URL = BASE_URL + u'/api2/shared-repos/'
|
|
||||||
BESHARED_LIBRARIES_URL = BASE_URL + u'/api2/beshared-repos/'
|
|
||||||
SHARED_FILES_URL = BASE_URL + u'/api2/shared-files/'
|
|
||||||
F_URL = BASE_URL + u'/api2/f/'
|
|
||||||
S_F_URL = BASE_URL + u'/api2/s/f/'
|
|
||||||
|
|
||||||
MISC_SEARCH_URL = BASE_URL + u'/api2/search/'
|
|
||||||
MISC_LIST_GROUP_AND_CONTACTS_URL = BASE_URL + u'/api2/groupandcontacts/'
|
|
||||||
MISC_LIST_EVENTS_URL = BASE_URL + u'/api2/events/'
|
|
||||||
|
|
||||||
class ApiTestCase(unittest.TestCase):
|
|
||||||
_token = None
|
_token = None
|
||||||
|
|
||||||
def get(self, *args, **kwargs):
|
use_test_user = False
|
||||||
self._req('GET', *args, **kwargs)
|
use_test_group = False
|
||||||
|
use_test_repo = False
|
||||||
|
|
||||||
def post(self, *args, **kwargs):
|
test_user_name = None
|
||||||
self._req('POST', *args, **kwargs)
|
test_user_url = None
|
||||||
|
|
||||||
def put(self, *args, **kwargs):
|
test_group_name = None
|
||||||
self._req('PUT', *args, **kwargs)
|
test_group_id = None
|
||||||
|
test_group_url = None
|
||||||
|
|
||||||
def delete(self, *args, **kwargs):
|
test_repo_id = None
|
||||||
self._req('DELETE', *args, **kwargs)
|
test_repo_url = None
|
||||||
|
test_file_url = None
|
||||||
|
test_dir_url = None
|
||||||
|
|
||||||
def _req(self, method, *args, **kwargs):
|
def setUp(self):
|
||||||
if self._token is None:
|
if self.use_test_user:
|
||||||
self._token = get_auth_token()
|
self.create_tmp_user()
|
||||||
|
if self.use_test_group:
|
||||||
|
self.create_tmp_group()
|
||||||
|
if self.use_test_repo:
|
||||||
|
self.create_tmp_repo()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if self.use_test_user:
|
||||||
|
self.remove_tmp_user()
|
||||||
|
if self.use_test_group:
|
||||||
|
self.remove_tmp_group()
|
||||||
|
if self.use_test_repo:
|
||||||
|
self.remove_tmp_repo()
|
||||||
|
|
||||||
|
def create_tmp_repo(self):
|
||||||
|
repo_name = '测试-test-repo-%s' % randstring(6)
|
||||||
|
data = {
|
||||||
|
'name': repo_name,
|
||||||
|
'desc': 'just for test - 测试用资料库',
|
||||||
|
}
|
||||||
|
repo = self.post(REPOS_URL, data=data).json()
|
||||||
|
self.test_repo_id = repo['repo_id']
|
||||||
|
self.test_repo_url = urljoin(REPOS_URL, self.test_repo_id)
|
||||||
|
self.test_file_url = urljoin(self.test_repo_url, 'file')
|
||||||
|
self.test_dir_url = urljoin(self.test_repo_url, 'dir')
|
||||||
|
|
||||||
|
def remove_tmp_repo(self):
|
||||||
|
if self.test_repo_id:
|
||||||
|
self.delete(self.test_repo_url)
|
||||||
|
|
||||||
|
def create_tmp_group(self):
|
||||||
|
self.test_group_name = '测试群组-%s' % randstring(16)
|
||||||
|
data = {'group_name': self.test_group_name}
|
||||||
|
self.test_group_id = self.put(GROUPS_URL, data=data).json()['group_id']
|
||||||
|
self.test_group_url = urljoin(GROUPS_URL, str(self.test_group_id))
|
||||||
|
|
||||||
|
def remove_tmp_group(self):
|
||||||
|
if self.test_group_id:
|
||||||
|
self.delete(self.test_group_url)
|
||||||
|
|
||||||
|
def create_tmp_user(self):
|
||||||
|
data = {'password': 'testtest'}
|
||||||
|
username = '%s@test.com' % randstring(20)
|
||||||
|
self.put(urljoin(ACCOUNTS_URL, username), data=data, expected=201)
|
||||||
|
self.test_user_name = username
|
||||||
|
self.test_user_url = urljoin(ACCOUNTS_URL, username)
|
||||||
|
|
||||||
|
def remove_tmp_user(self):
|
||||||
|
if self.test_user_name:
|
||||||
|
self.delete(self.test_user_url)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get(cls, *args, **kwargs):
|
||||||
|
return cls._req('GET', *args, **kwargs)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def post(cls, *args, **kwargs):
|
||||||
|
return cls._req('POST', *args, **kwargs)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def put(cls, *args, **kwargs):
|
||||||
|
return cls._req('PUT', *args, **kwargs)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def delete(cls, *args, **kwargs):
|
||||||
|
return cls._req('DELETE', *args, **kwargs)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _req(cls, method, *args, **kwargs):
|
||||||
|
auth = kwargs.pop('auth', True)
|
||||||
|
if auth:
|
||||||
|
if cls._token is None:
|
||||||
|
cls._token = get_auth_token()
|
||||||
|
|
||||||
headers = kwargs.pop('headers', {})
|
headers = kwargs.pop('headers', {})
|
||||||
headers.setdefault('Authorization', 'Token ' + self._token)
|
headers.setdefault('Authorization', 'Token ' + cls._token)
|
||||||
kwargs['headers'] = headers
|
kwargs['headers'] = headers
|
||||||
|
|
||||||
resp = requests.request(method, *args, **kwargs)
|
|
||||||
expected = kwargs.pop('expected', 200)
|
expected = kwargs.pop('expected', 200)
|
||||||
|
resp = requests.request(method, *args, **kwargs)
|
||||||
if expected is not None:
|
if expected is not None:
|
||||||
if hasattr(expected, '__iter__'):
|
if hasattr(expected, '__iter__'):
|
||||||
self.assertIn(resp.status_code, expected,
|
assert_in(resp.status_code, expected,
|
||||||
"Expected http status in %s, received %s" % (expected,
|
"Expected http status in %s, received %s" % (expected,
|
||||||
resp.status_code))
|
resp.status_code))
|
||||||
else:
|
else:
|
||||||
self.assertEqual(resp.status_code, expected,
|
assert_equal(resp.status_code, expected,
|
||||||
"Expected http status %s, received %s" % (expected,
|
"Expected http status %s, received %s" % (expected,
|
||||||
resp.status_code))
|
resp.status_code))
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
def assertHasLen(self, lst, length):
|
||||||
|
"""
|
||||||
|
Assert a list/tuple/string has exact `length`
|
||||||
|
"""
|
||||||
|
msg = 'Expected to have length %s, but length is %s' \
|
||||||
|
% (length, len(lst))
|
||||||
|
self.assertEqual(len(lst), length, msg)
|
||||||
|
|
||||||
|
def assertNotEmpty(self, lst):
|
||||||
|
"""
|
||||||
|
Assert a list/tuple/string is not empty
|
||||||
|
"""
|
||||||
|
msg = 'Expected not empty, but it is'
|
||||||
|
self.assertGreater(len(lst), 0, msg)
|
||||||
|
|
||||||
def get_auth_token():
|
def get_auth_token():
|
||||||
res = requests.post(TOKEN_URL,
|
res = requests.post(TOKEN_URL,
|
||||||
data=dict(username=USERNAME, password=PASSWORD))
|
data=dict(username=USERNAME, password=PASSWORD))
|
||||||
|
@@ -1,67 +1,67 @@
|
|||||||
from apitestbase import ACCOUNTS_URL, ACCOUNT_INFO_URL, get_authed_instance
|
|
||||||
from apitestbase import USERNAME
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
ACCOUNT_USERNAME = u'test_tmp@test.com'
|
from tests.common.utils import apiurl, urljoin
|
||||||
ACCOUNT_PASSWORD = r'test_test'
|
from tests.api.apitestbase import USERNAME, ApiTestBase
|
||||||
ACCOUNT_PASSWORD2 = r'test_test2'
|
from tests.api.urls import ACCOUNTS_URL, ACCOUNT_INFO_URL, PING_URL, \
|
||||||
ACCOUNT_URL = ACCOUNTS_URL + ACCOUNT_USERNAME + u'/'
|
AUTH_PING_URL
|
||||||
|
|
||||||
class AccountsApiTestCase(unittest.TestCase):
|
test_account_username = u'test_tmp@test.com'
|
||||||
|
test_account_password = r'test_test'
|
||||||
|
test_account_password2 = r'test_test2'
|
||||||
|
test_account_url = urljoin(ACCOUNTS_URL, test_account_username)
|
||||||
|
|
||||||
def setUp(self):
|
class AccountsApiTest(ApiTestBase):
|
||||||
self.requests = get_authed_instance()
|
use_test_uesr = True
|
||||||
self.assertIsNotNone(self.requests)
|
|
||||||
|
|
||||||
def test_check_account_info_api(self):
|
def test_check_account_info(self):
|
||||||
res = self.requests.get(ACCOUNT_INFO_URL)
|
info = self.get(ACCOUNT_INFO_URL).json()
|
||||||
self.assertEqual(res.status_code, 200)
|
self.assertIsNotNone(info)
|
||||||
json = res.json()
|
self.assertEqual(info['email'], USERNAME)
|
||||||
self.assertIsNotNone(json)
|
self.assertIsNotNone(info['total'])
|
||||||
self.assertEqual(json['email'], USERNAME)
|
self.assertIsNotNone(info['usage'])
|
||||||
self.assertIsNotNone(json['total'])
|
|
||||||
self.assertIsNotNone(json['usage'])
|
|
||||||
|
|
||||||
def test_list_accounts_api(self):
|
def test_list_accounts(self):
|
||||||
res = self.requests.get(ACCOUNTS_URL)
|
accounts = self.get(ACCOUNTS_URL).json()
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertIsNotNone(res.json())
|
|
||||||
found = False
|
found = False
|
||||||
for i in res.json():
|
for account in accounts:
|
||||||
if (i['email'] == USERNAME):
|
if account['email'] == USERNAME:
|
||||||
found = True
|
found = True
|
||||||
self.assertEqual(found, True)
|
self.assertTrue(found)
|
||||||
|
|
||||||
def test_create_account_api(self):
|
def test_create_account(self):
|
||||||
data = { 'password': ACCOUNT_PASSWORD }
|
data = {'password': test_account_password}
|
||||||
res = self.requests.put(ACCOUNT_URL, data=data)
|
res = self.put(test_account_url, data=data, expected=201)
|
||||||
self.assertEqual(res.status_code, 201)
|
|
||||||
self.assertEqual(res.text, u'"success"')
|
self.assertEqual(res.text, u'"success"')
|
||||||
self.requests.delete(ACCOUNT_URL)
|
self.delete(test_account_url)
|
||||||
|
|
||||||
def test_update_account_api(self):
|
def test_update_account(self):
|
||||||
data = { 'password': ACCOUNT_PASSWORD }
|
data = {'password': test_account_password}
|
||||||
self.requests.put(ACCOUNT_URL, data=data)
|
self.put(test_account_url, data=data, expected=201)
|
||||||
data = { 'password': ACCOUNT_PASSWORD2, 'is_staff': 1,
|
data = {
|
||||||
'is_active': 1 }
|
'password': test_account_password2,
|
||||||
res = self.requests.put(ACCOUNT_URL, data=data)
|
'is_staff': 1,
|
||||||
self.assertEqual(res.status_code, 200)
|
'is_active': 1,
|
||||||
|
}
|
||||||
|
res = self.put(test_account_url, data=data)
|
||||||
self.assertEqual(res.text, u'"success"')
|
self.assertEqual(res.text, u'"success"')
|
||||||
self.requests.delete(ACCOUNT_URL)
|
self.delete(test_account_url)
|
||||||
#TODO: verify updated account
|
|
||||||
|
|
||||||
def test_delete_account_api(self):
|
def test_delete_account(self):
|
||||||
data = { 'password': ACCOUNT_PASSWORD }
|
data = {'password': test_account_password}
|
||||||
res = self.requests.put(ACCOUNT_URL, data=data)
|
self.put(test_account_url, data=data, expected=201)
|
||||||
res = self.requests.delete(ACCOUNT_URL)
|
res = self.delete(test_account_url)
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertEqual(res.text, u'"success"')
|
self.assertEqual(res.text, u'"success"')
|
||||||
res = self.requests.get(ACCOUNTS_URL)
|
accounts = self.get(ACCOUNTS_URL).json()
|
||||||
found = False
|
found = False
|
||||||
for i in res.json():
|
for account in accounts:
|
||||||
if (i['email'] == ACCOUNT_USERNAME):
|
if account['email'] == test_account_username:
|
||||||
found = True
|
found = True
|
||||||
self.assertEqual(found, False)
|
self.assertFalse(found)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def test_auth_ping(self):
|
||||||
unittest.main(verbosity=2)
|
res = self.get(AUTH_PING_URL)
|
||||||
|
self.assertRegexpMatches(res.text, u'"pong"')
|
||||||
|
|
||||||
|
def test_ping(self):
|
||||||
|
res = self.get(PING_URL, auth=False)
|
||||||
|
self.assertRegexpMatches(res.text, u'"pong"')
|
||||||
|
@@ -1,16 +0,0 @@
|
|||||||
from apitestbase import AUTH_PING_URL, get_authed_instance
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
class AuthPingApiTestCase(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.requests = get_authed_instance()
|
|
||||||
self.assertIsNotNone(self.requests)
|
|
||||||
|
|
||||||
def test_auth_ping_api(self):
|
|
||||||
res = self.requests.get(AUTH_PING_URL)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertRegexpMatches(res.text, u'"pong"')
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
@@ -1,35 +1,25 @@
|
|||||||
from apitestbase import AVATAR_BASE_URL, get_authed_instance
|
|
||||||
from apitestbase import GROUPS_URL, USERNAME
|
|
||||||
from common.utils import randomword
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
class AvatarApiTestCase(unittest.TestCase):
|
from tests.api.apitestbase import ApiTestBase, USERNAME
|
||||||
|
from tests.api.urls import AVATAR_BASE_URL, GROUPS_URL
|
||||||
|
from tests.common.utils import randstring, apiurl, urljoin
|
||||||
|
|
||||||
def setUp(self):
|
class AvatarApiTest(ApiTestBase):
|
||||||
self.requests = get_authed_instance()
|
def test_user_avatar(self):
|
||||||
self.assertIsNotNone(self.requests)
|
avatar_url = urljoin(AVATAR_BASE_URL, 'user', USERNAME, '/resized/80/')
|
||||||
|
info = self.get(avatar_url).json()
|
||||||
|
self.assertIsNotNone(info['url'])
|
||||||
|
self.assertIsNotNone(info['is_default'])
|
||||||
|
self.assertIsNotNone(info['mtime'])
|
||||||
|
|
||||||
def test_user_avatar_api(self):
|
def test_group_avatar(self):
|
||||||
res = self.requests.get(AVATAR_BASE_URL + u'user/' + USERNAME + u'/resized/80/')
|
gname = randstring(16)
|
||||||
self.assertEqual(res.status_code, 200)
|
data = {'group_name': gname}
|
||||||
json = res.json()
|
res = self.put(GROUPS_URL, data=data)
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['url'])
|
|
||||||
self.assertIsNotNone(json['is_default'])
|
|
||||||
self.assertIsNotNone(json['mtime'])
|
|
||||||
|
|
||||||
def test_group_avatar_api(self):
|
|
||||||
gname = randomword(16)
|
|
||||||
data = { 'group_name': gname }
|
|
||||||
res = self.requests.put(GROUPS_URL, data=data)
|
|
||||||
gid = res.json()['group_id']
|
gid = res.json()['group_id']
|
||||||
res = self.requests.get(AVATAR_BASE_URL + u'group/' + str(gid) + u'/resized/80/')
|
avatar_url = urljoin(AVATAR_BASE_URL, 'group', str(gid), '/resized/80/')
|
||||||
self.assertEqual(res.status_code, 200)
|
info = self.get(avatar_url).json()
|
||||||
json = res.json()
|
self.assertIsNotNone(info)
|
||||||
self.assertIsNotNone(json)
|
self.assertIsNotNone(info['url'])
|
||||||
self.assertIsNotNone(json['url'])
|
self.assertIsNotNone(info['is_default'])
|
||||||
self.assertIsNotNone(json['is_default'])
|
self.assertIsNotNone(info['mtime'])
|
||||||
self.assertIsNotNone(json['mtime'])
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
||||||
|
@@ -1,62 +0,0 @@
|
|||||||
from apitestbase import DEFAULT_LIBRARY_URL, get_authed_instance, \
|
|
||||||
LIBRARIES_URL, USERNAME
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
class DirectoriesApiTestCase(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.requests = get_authed_instance()
|
|
||||||
self.assertIsNotNone(self.requests)
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
self.rid = res.json()['repo_id']
|
|
||||||
self.rurl = LIBRARIES_URL + str(self.rid) + u'/'
|
|
||||||
self.durl = self.rurl + u'dir/'
|
|
||||||
|
|
||||||
def test_list_directory(self):
|
|
||||||
res = self.requests.get(self.durl)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
for directory in json:
|
|
||||||
self.assertIsNotNone(directory['id'])
|
|
||||||
self.assertIsNotNone(directory['type'])
|
|
||||||
self.assertIsNotNone(directory['name'])
|
|
||||||
#self.assertIsNotNone(directory['size']) #allow null
|
|
||||||
|
|
||||||
def test_create_directory(self):
|
|
||||||
data = { 'operation': 'mkdir' }
|
|
||||||
durl = self.durl + u'?p=/test'
|
|
||||||
res = self.requests.post(durl, data=data)
|
|
||||||
self.assertEqual(res.status_code, 201)
|
|
||||||
self.assertEqual(res.text, u'"success"')
|
|
||||||
|
|
||||||
def test_remove_directory(self):
|
|
||||||
data = { 'operation': 'mkdir' }
|
|
||||||
durl = self.durl + u'?p=/test_dir_remove'
|
|
||||||
res = self.requests.post(durl, data=data)
|
|
||||||
res = self.requests.delete(durl)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertEqual(res.text, u'"success"')
|
|
||||||
|
|
||||||
def test_download_directory(self):
|
|
||||||
data = { 'operation': 'mkdir' }
|
|
||||||
durl = self.durl + u'?p=/test_dir_download'
|
|
||||||
self.requests.post(durl, data=data)
|
|
||||||
ddurl = self.durl + u'download/?p=/test_dir_download'
|
|
||||||
res = self.requests.get(ddurl)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertRegexpMatches(res.text, \
|
|
||||||
r'"http(.*)/files/\w{8,8}/test_dir_download"')
|
|
||||||
|
|
||||||
def test_share_directory(self):
|
|
||||||
data = { 'operation': 'mkdir' }
|
|
||||||
durl = self.durl + u'?p=/test_dir_share'
|
|
||||||
self.requests.post(durl, data=data)
|
|
||||||
dsurl = self.durl + u'share/?p=/test_dir_share'
|
|
||||||
data = { 'emails': USERNAME, 's_type': 'd', 'path': '/', 'perm': 'r' }
|
|
||||||
res = self.requests.post(dsurl, data=data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertEqual(res.text, u'{}')
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
@@ -1,168 +1,219 @@
|
|||||||
#coding: UTF-8
|
#coding: UTF-8
|
||||||
|
"""
|
||||||
|
Test file/dir operations.
|
||||||
|
"""
|
||||||
|
|
||||||
import random
|
import random
|
||||||
import re
|
import re
|
||||||
from urllib import urlencode, quote
|
from urllib import urlencode, quote
|
||||||
|
|
||||||
from apitestbase import ApiTestCase, DEFAULT_LIBRARY_URL, apiurl
|
from tests.common.utils import randstring, urljoin
|
||||||
|
from tests.api.urls import DEFAULT_REPO_URL, REPOS_URL
|
||||||
|
from tests.api.apitestbase import ApiTestBase, USERNAME
|
||||||
|
|
||||||
DEFAULT_REPO_URL = '/api2/default-repo/'
|
class FilesApiTest(ApiTestBase):
|
||||||
REPO_URL = '/api2/repos/%s/'
|
use_test_repo = True
|
||||||
|
use_test_user = True
|
||||||
|
|
||||||
class FilesApiTestCase(ApiTestCase):
|
def create_file(self, fname=None):
|
||||||
def setUp(self):
|
|
||||||
res = self.post(apiurl(DEFAULT_REPO_URL)).json()
|
|
||||||
self.repo_id = res['repo_id']
|
|
||||||
self.repo_url = apiurl(REPO_URL % self.repo_id)
|
|
||||||
self.file_url = self.repo_url + u'file/'
|
|
||||||
|
|
||||||
def create_file(self):
|
|
||||||
data = {'operation': 'create'}
|
data = {'operation': 'create'}
|
||||||
name = '文件%s.txt' % random.randint(1, 10000)
|
fname = fname or ('文件 %s.txt' % randstring())
|
||||||
query = '?p=/%s' % quote(name)
|
fpath = '/' + fname
|
||||||
furl = self.file_url + query
|
query = urlencode(dict(p=fpath))
|
||||||
|
furl = self.test_file_url + '?' + query
|
||||||
res = self.post(furl, data=data, expected=201)
|
res = self.post(furl, data=data, expected=201)
|
||||||
self.assertEqual(res.text, '"success"')
|
self.assertEqual(res.text, '"success"')
|
||||||
return name, furl
|
return fname, furl
|
||||||
|
|
||||||
|
def create_dir(self):
|
||||||
|
data = {'operation': 'mkdir'}
|
||||||
|
dpath = '/目录 %s' % randstring()
|
||||||
|
query = urlencode(dict(p=dpath))
|
||||||
|
durl = self.test_dir_url + '?' + query
|
||||||
|
res = self.post(durl, data=data, expected=201)
|
||||||
|
self.assertEqual(res.text, u'"success"')
|
||||||
|
return dpath, durl
|
||||||
|
|
||||||
def test_create_file(self):
|
def test_create_file(self):
|
||||||
self.create_file()
|
self.create_file()
|
||||||
|
|
||||||
# def test_rename_file(self):
|
def test_rename_file(self):
|
||||||
# name, furl = self.create_file()
|
name, furl = self.create_file()
|
||||||
# data = {
|
data = {
|
||||||
# 'operation': 'rename',
|
'operation': 'rename',
|
||||||
# 'newname': name + str(random.randint(1, 10000)),
|
'newname': name + randstring(),
|
||||||
# }
|
}
|
||||||
# res = self.post(furl, data=data)
|
res = self.post(furl, data=data)
|
||||||
# self.assertRegexpMatches(res.text, r'"http(.*)"')
|
self.assertRegexpMatches(res.text, r'"http(.*)"')
|
||||||
|
|
||||||
# def test_remove_file(self):
|
def test_remove_file(self):
|
||||||
# _, furl = self.create_file()
|
_, furl = self.create_file()
|
||||||
# res = self.delete(furl)
|
res = self.delete(furl)
|
||||||
# self.assertEqual(res.text, '"success"')
|
self.assertEqual(res.text, '"success"')
|
||||||
|
|
||||||
# def test_move_file(self):
|
def test_move_file(self):
|
||||||
# _, furl = self.create_file()
|
_, furl = self.create_file()
|
||||||
# # TODO: create another repo here, and use it as dst_repo
|
# TODO: create another repo here, and use it as dst_repo
|
||||||
# data = {
|
data = {
|
||||||
# 'operation': 'move',
|
'operation': 'move',
|
||||||
# 'dst_repo': self.repo_id,
|
'dst_repo': self.test_repo_id,
|
||||||
# 'dst_dir': '/'
|
'dst_dir': '/'
|
||||||
# }
|
}
|
||||||
# res = self.post(furl, data=data)
|
res = self.post(furl, data=data)
|
||||||
# self.assertEqual(res.text, '"success"')
|
self.assertEqual(res.text, '"success"')
|
||||||
|
|
||||||
# def test_copy_file(self):
|
def test_copy_file(self):
|
||||||
# fname, _ = self.create_file()
|
fname, _ = self.create_file()
|
||||||
# # TODO: create another repo here, and use it as dst_repo
|
# TODO: create another repo here, and use it as dst_repo
|
||||||
# fopurl = self.repo_url + u'fileops/copy/?p=/'
|
dpath, _ = self.create_dir()
|
||||||
# data = {
|
fopurl = self.test_repo_url + u'fileops/copy/?p=/'
|
||||||
# 'file_names': fname,
|
data = {
|
||||||
# 'dst_repo': self.repo_id,
|
'file_names': fname,
|
||||||
# 'dst_dir': '/'
|
'dst_repo': self.test_repo_id,
|
||||||
# }
|
'dst_dir': dpath,
|
||||||
# res = self.post(fopurl, data=data)
|
}
|
||||||
# self.assertEqual(res.text, '"success"')
|
res = self.post(fopurl, data=data)
|
||||||
|
self.assertEqual(res.text, '"success"')
|
||||||
|
|
||||||
# def test_download_file(self):
|
def test_download_file(self):
|
||||||
# fname, furl = self.create_file()
|
fname, furl = self.create_file()
|
||||||
# res = self.get(furl)
|
res = self.get(furl)
|
||||||
# self.assertRegexpMatches(res.text, '"http(.*)/%s"' % quote(fname))
|
self.assertRegexpMatches(res.text, '"http(.*)/%s"' % quote(fname))
|
||||||
|
|
||||||
# def test_download_file_from_history(self):
|
def test_download_file_from_history(self):
|
||||||
# fname, _ = self.create_file()
|
fname, _ = self.create_file()
|
||||||
# file_history_url = self.file_url + u'history/?p=/%s' % quote(fname)
|
file_history_url = self.test_file_url + u'history/?p=/%s' % quote(fname)
|
||||||
# res = self.get(file_history_url).json()
|
res = self.get(file_history_url).json()
|
||||||
# commit_id = res['commits'][0]['id']
|
commit_id = res['commits'][0]['id']
|
||||||
# self.assertEqual(len(commit_id), 40)
|
self.assertEqual(len(commit_id), 40)
|
||||||
# data = {
|
data = {
|
||||||
# 'p': fname,
|
'p': fname,
|
||||||
# 'commit_id': commit_id,
|
'commit_id': commit_id,
|
||||||
# }
|
}
|
||||||
# query = '?' + urlencode(data)
|
query = '?' + urlencode(data)
|
||||||
# res = self.get(self.file_url + query)
|
res = self.get(self.test_file_url + query)
|
||||||
# self.assertEqual(res.status_code, 200)
|
self.assertEqual(res.status_code, 200)
|
||||||
# self.assertRegexpMatches(res.text, r'"http(.*)/%s"' % quote(fname))
|
self.assertRegexpMatches(res.text, r'"http(.*)/%s"' % quote(fname))
|
||||||
|
|
||||||
# def test_get_file_detail(self):
|
def test_get_file_detail(self):
|
||||||
# fname, _ = self.create_file()
|
fname, _ = self.create_file()
|
||||||
# fdurl = self.file_url + u'detail/?p=/%s' % quote(fname)
|
fdurl = self.test_file_url + u'detail/?p=/%s' % quote(fname)
|
||||||
# detail = self.get(fdurl).json()
|
detail = self.get(fdurl).json()
|
||||||
# self.assertIsNotNone(detail)
|
self.assertIsNotNone(detail)
|
||||||
# self.assertIsNotNone(detail['id'])
|
self.assertIsNotNone(detail['id'])
|
||||||
# self.assertIsNotNone(detail['mtime'])
|
self.assertIsNotNone(detail['mtime'])
|
||||||
# self.assertIsNotNone(detail['type'])
|
self.assertIsNotNone(detail['type'])
|
||||||
# self.assertIsNotNone(detail['name'])
|
self.assertIsNotNone(detail['name'])
|
||||||
# self.assertIsNotNone(detail['size'])
|
self.assertIsNotNone(detail['size'])
|
||||||
|
|
||||||
# def test_get_file_history(self):
|
def test_get_file_history(self):
|
||||||
# self.create_file()
|
fname, _ = self.create_file()
|
||||||
# fhurl = self.file_url + u'history/?p=/test.c'
|
fhurl = self.test_file_url + u'history/?p=%s' % quote(fname)
|
||||||
# history = self.get(fhurl).json()
|
history = self.get(fhurl).json()
|
||||||
# for commit in history['commits']:
|
for commit in history['commits']:
|
||||||
# self.assertIsNotNone(commit['rev_file_size'])
|
self.assertIsNotNone(commit['rev_file_size'])
|
||||||
# #self.assertIsNotNone(commit['rev_file_id']) #allow null
|
#self.assertIsNotNone(commit['rev_file_id']) #allow null
|
||||||
# self.assertIsNotNone(commit['ctime'])
|
self.assertIsNotNone(commit['ctime'])
|
||||||
# self.assertIsNotNone(commit['creator_name'])
|
self.assertIsNotNone(commit['creator_name'])
|
||||||
# self.assertIsNotNone(commit['creator'])
|
self.assertIsNotNone(commit['creator'])
|
||||||
# self.assertIsNotNone(commit['root_id'])
|
self.assertIsNotNone(commit['root_id'])
|
||||||
# #self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
|
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
|
||||||
# #self.assertIsNotNone(commit['parent_id']) #allow null
|
#self.assertIsNotNone(commit['parent_id']) #allow null
|
||||||
# self.assertIsNotNone(commit['new_merge'])
|
self.assertIsNotNone(commit['new_merge'])
|
||||||
# self.assertIsNotNone(commit['repo_id'])
|
self.assertIsNotNone(commit['repo_id'])
|
||||||
# self.assertIsNotNone(commit['desc'])
|
self.assertIsNotNone(commit['desc'])
|
||||||
# self.assertIsNotNone(commit['id'])
|
self.assertIsNotNone(commit['id'])
|
||||||
# self.assertIsNotNone(commit['conflict'])
|
self.assertIsNotNone(commit['conflict'])
|
||||||
# #self.assertIsNotNone(commit['second_parent_id']) #allow null
|
#self.assertIsNotNone(commit['second_parent_id']) #allow null
|
||||||
|
|
||||||
# def test_get_upload_link(self):
|
def test_get_upload_link(self):
|
||||||
# upload_url = self.repo_url + u'upload-link/'
|
upload_url = self.test_repo_url + u'upload-link/'
|
||||||
# res = self.get(upload_url)
|
res = self.get(upload_url)
|
||||||
# self.assertRegexpMatches(res.text, r'"http(.*)/upload-api/\w{8,8}"')
|
self.assertRegexpMatches(res.text, r'"http(.*)/upload-api/\w{8,8}"')
|
||||||
|
|
||||||
# def test_get_update_link(self):
|
def test_get_update_link(self):
|
||||||
# update_url = self.repo_url + u'update-link/'
|
update_url = self.test_repo_url + u'update-link/'
|
||||||
# res = self.get(update_url)
|
res = self.get(update_url)
|
||||||
# self.assertEqual(res.status_code, 200)
|
self.assertRegexpMatches(res.text, r'"http(.*)/update-api/\w{8,8}"')
|
||||||
# self.assertRegexpMatches(res.text, r'"http(.*)/update-api/\w{8,8}"')
|
|
||||||
|
|
||||||
# def test_upload(self):
|
# def test_upload_file(self):
|
||||||
# furl = self.file_url + u'?p=/test_upload.c'
|
# # XXX: requests has problems when post a file whose name contains
|
||||||
# res = self.delete(furl)
|
# # non-ascii data
|
||||||
# upload_url = self.repo_url + u'upload-link/'
|
# fname = 'file-upload-test %s.txt' % randstring()
|
||||||
|
# furl = self.test_file_url + '?p=/%s' % quote(fname)
|
||||||
|
# self.delete(furl)
|
||||||
|
# upload_url = self.test_repo_url + u'upload-link/'
|
||||||
# res = self.get(upload_url)
|
# res = self.get(upload_url)
|
||||||
# upload_api_url = re.match(r'"(.*)"', res.text).group(1)
|
# upload_api_url = re.match(r'"(.*)"', res.text).group(1)
|
||||||
# #target_file must contains its parent dir path
|
# files = {
|
||||||
# files = { 'file': ('test_upload'+'.c', 'int main(){return0;}\n'), \
|
# 'file': (fname, 'Some lines in this file'),
|
||||||
# 'parent_dir': '/' }
|
# 'parent_dir': '/',
|
||||||
|
# }
|
||||||
# res = self.post(upload_api_url, files=files)
|
# res = self.post(upload_api_url, files=files)
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# self.assertRegexpMatches(res.text, r'\w{40,40}')
|
# self.assertRegexpMatches(res.text, r'\w{40,40}')
|
||||||
|
|
||||||
# def test_update(self):
|
# def test_update_file(self):
|
||||||
# data = { 'operation': 'create' }
|
# fname = 'file-update-test %s.txt' % randstring()
|
||||||
# furl = self.file_url + u'?p=/test_update.c'
|
# _, furl = self.create_file(fname=fname)
|
||||||
# res = self.post(furl, data=data)
|
# update_url = self.test_repo_url + u'update-link/'
|
||||||
# # call update-link
|
|
||||||
# update_url = self.repo_url + u'update-link/'
|
|
||||||
# res = self.get(update_url)
|
# res = self.get(update_url)
|
||||||
# update_api_url = re.match(r'"(.*)"', res.text).group(1)
|
# update_api_url = re.match(r'"(.*)"', res.text).group(1)
|
||||||
# #target_file must contains its parent dir path
|
# files = {
|
||||||
# files = { 'file': ('test_update.c', 'int main(){return0;}\n'), \
|
# 'file': ('filename', 'Updated content of this file'),
|
||||||
# 'target_file': '/test_update.c' }
|
# 'target_file': '/test_update.c'
|
||||||
|
# }
|
||||||
# res = self.post(update_api_url, files=files)
|
# res = self.post(update_api_url, files=files)
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# self.assertRegexpMatches(res.text, r'\w{40,40}')
|
# self.assertRegexpMatches(res.text, r'\w{40,40}')
|
||||||
|
|
||||||
# def test_get_upload_blocks_link(self):
|
def test_get_upload_blocks_link(self):
|
||||||
# upload_blks_url = self.repo_url + u'upload-blks-link/'
|
upload_blks_url = self.test_repo_url + u'upload-blks-link/'
|
||||||
# res = self.get(upload_blks_url)
|
res = self.get(upload_blks_url)
|
||||||
# self.assertEqual(res.status_code, 200)
|
self.assertRegexpMatches(res.text, r'"http(.*)/upload-blks-api/\w{8,8}"')
|
||||||
# self.assertRegexpMatches(res.text, r'"http(.*)/upload-blks-api/\w{8,8}"')
|
|
||||||
|
|
||||||
# def test_get_update_blocks_link(self):
|
def test_get_update_blocks_link(self):
|
||||||
# update_blks_url = self.repo_url + u'update-blks-link/'
|
update_blks_url = self.test_repo_url + u'update-blks-link/'
|
||||||
# res = self.get(update_blks_url)
|
res = self.get(update_blks_url)
|
||||||
# self.assertEqual(res.status_code, 200)
|
self.assertRegexpMatches(res.text, r'"http(.*)/update-blks-api/\w{8,8}"')
|
||||||
# self.assertRegexpMatches(res.text, r'"http(.*)/update-blks-api/\w{8,8}"')
|
|
||||||
|
def test_list_dir(self):
|
||||||
|
self.create_file()
|
||||||
|
self.create_dir()
|
||||||
|
dirents = self.get(self.test_dir_url).json()
|
||||||
|
self.assertHasLen(dirents, 2)
|
||||||
|
for dirent in dirents:
|
||||||
|
self.assertIsNotNone(dirent['id'])
|
||||||
|
self.assertIsNotNone(dirent['name'])
|
||||||
|
self.assertIn(dirent['type'], ('file', 'dir'))
|
||||||
|
if dirent['type'] == 'file':
|
||||||
|
self.assertIsNotNone(dirent['size'])
|
||||||
|
|
||||||
|
def test_create_dir(self):
|
||||||
|
self.create_dir()
|
||||||
|
|
||||||
|
def test_remove_dir(self):
|
||||||
|
_, durl = self.create_dir()
|
||||||
|
res = self.delete(durl)
|
||||||
|
self.assertEqual(res.text, u'"success"')
|
||||||
|
self.get(durl, expected=404)
|
||||||
|
|
||||||
|
def test_download_dir(self):
|
||||||
|
dpath, _ = self.create_dir()
|
||||||
|
query = urlencode({'p': dpath})
|
||||||
|
ddurl = self.test_dir_url + 'download/' + '?' + query
|
||||||
|
res = self.get(ddurl)
|
||||||
|
self.assertRegexpMatches(res.text,
|
||||||
|
r'"http(.*)/files/\w{8,8}/%s"' % quote(dpath[1:]))
|
||||||
|
|
||||||
|
def test_share_dir(self):
|
||||||
|
dpath, _ = self.create_dir()
|
||||||
|
query = urlencode({'p': dpath})
|
||||||
|
share_dir_url = self.test_dir_url + u'share/' + '?' + query
|
||||||
|
# TODO: share to another user
|
||||||
|
data = {
|
||||||
|
'emails': USERNAME,
|
||||||
|
's_type': 'd',
|
||||||
|
'path': '/',
|
||||||
|
'perm': 'r'
|
||||||
|
}
|
||||||
|
res = self.post(share_dir_url, data=data)
|
||||||
|
self.assertEqual(res.text, u'{}')
|
||||||
|
@@ -1,37 +0,0 @@
|
|||||||
from apitestbase import ACCOUNTS_URL, GROUPS_URL, get_authed_instance
|
|
||||||
from common.utils import randomword
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
class GroupMemeberApiTestCase(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.requests = get_authed_instance()
|
|
||||||
self.assertIsNotNone(self.requests)
|
|
||||||
self.name = randomword(4) + u'@test.com'
|
|
||||||
data = {'password': 'testtest'}
|
|
||||||
self.requests.put(ACCOUNTS_URL + self.name + u'/', data=data)
|
|
||||||
self.gname = randomword(16)
|
|
||||||
data = { 'group_name': self.gname }
|
|
||||||
res = self.requests.put(GROUPS_URL, data=data)
|
|
||||||
self.gid = res.json()['group_id']
|
|
||||||
self.gurl = GROUPS_URL + str(self.gid) + u'/members/'
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
self.requests.delete(ACCOUNTS_URL + self.name + u'/')
|
|
||||||
|
|
||||||
def test_add_group_member_api(self):
|
|
||||||
data = { 'user_name': self.name }
|
|
||||||
res = self.requests.put(self.gurl, data=data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertEqual(res.json()['success'], True)
|
|
||||||
self.requests.delete(self.gurl, data=data)
|
|
||||||
|
|
||||||
def test_remove_group_member_api(self):
|
|
||||||
data = { 'user_name': self.name }
|
|
||||||
self.requests.put(self.gurl, data=data)
|
|
||||||
res = self.requests.delete(self.gurl, data=data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertEqual(res.json()['success'], True)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
@@ -1,24 +1,31 @@
|
|||||||
from apitestbase import GROUPS_URL, get_authed_instance
|
#coding: UTF-8
|
||||||
from common.utils import randomword
|
"""
|
||||||
|
Test groups api.
|
||||||
|
"""
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
class GroupsApiTestCase(unittest.TestCase):
|
from tests.api.apitestbase import ApiTestBase
|
||||||
|
from tests.api.urls import GROUPS_URL
|
||||||
|
from tests.common.utils import apiurl, urljoin, randstring
|
||||||
|
|
||||||
def setUp(self):
|
class GroupsApiTest(ApiTestBase):
|
||||||
self.requests = get_authed_instance()
|
use_test_user = True
|
||||||
self.assertIsNotNone(self.requests)
|
use_test_group = True
|
||||||
|
|
||||||
def test_list_groups_api(self):
|
def test_add_remove_group_member(self):
|
||||||
data = { 'group_name': 'demo group' }
|
test_group_members_url = urljoin(self.test_group_url, '/members/')
|
||||||
self.requests.put(GROUPS_URL, data=data)
|
data = {'user_name': self.test_user_name}
|
||||||
res = self.requests.get(GROUPS_URL)
|
res = self.put(test_group_members_url, data=data).json()
|
||||||
self.assertEqual(res.status_code, 200)
|
self.assertTrue(res['success'])
|
||||||
json = res.json()
|
res = self.delete(test_group_members_url, data=data).json()
|
||||||
self.assertIsNotNone(json)
|
self.assertTrue(res['success'])
|
||||||
self.assertIsNotNone(json['replynum'])
|
|
||||||
self.assertIsNotNone(json['groups'])
|
def test_list_groups(self):
|
||||||
self.assertGreater(len(json['groups']), 0)
|
groups = self.get(GROUPS_URL).json()
|
||||||
for group in json['groups']:
|
self.assertGreaterEqual(groups['replynum'], 0)
|
||||||
|
self.assertNotEmpty(groups['groups'])
|
||||||
|
for group in groups['groups']:
|
||||||
self.assertIsNotNone(group['ctime'])
|
self.assertIsNotNone(group['ctime'])
|
||||||
self.assertIsNotNone(group['creator'])
|
self.assertIsNotNone(group['creator'])
|
||||||
self.assertIsNotNone(group['msgnum'])
|
self.assertIsNotNone(group['msgnum'])
|
||||||
@@ -26,16 +33,16 @@ class GroupsApiTestCase(unittest.TestCase):
|
|||||||
self.assertIsNotNone(group['id'])
|
self.assertIsNotNone(group['id'])
|
||||||
self.assertIsNotNone(group['name'])
|
self.assertIsNotNone(group['name'])
|
||||||
|
|
||||||
def test_add_group_api(self):
|
def test_add_group(self):
|
||||||
# We cannot create two groups which have the same group name or delete group
|
data = {'group_name': randstring(16)}
|
||||||
# Hack it by creating group with a random name, hope it won't break ci
|
info = self.put(GROUPS_URL, data=data).json()
|
||||||
data = { 'group_name': randomword(16) }
|
self.assertTrue(info['success'])
|
||||||
res = self.requests.put(GROUPS_URL, data=data)
|
group_id = info['group_id']
|
||||||
self.assertEqual(res.status_code, 200)
|
self.assertGreater(group_id, 0)
|
||||||
json = res.json()
|
url = urljoin(GROUPS_URL, str(group_id))
|
||||||
self.assertIsNotNone(json)
|
self.delete(url)
|
||||||
self.assertIsNotNone(json['group_id'])
|
|
||||||
self.assertEqual(json['success'], True)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
# check group is really removed
|
||||||
unittest.main(verbosity=2)
|
groups = self.get(GROUPS_URL).json()['groups']
|
||||||
|
for group in groups:
|
||||||
|
self.assertNotEqual(group['id'], group_id)
|
||||||
|
@@ -1,225 +0,0 @@
|
|||||||
from apitestbase import LIBRARIES_URL, DEFAULT_LIBRARY_URL
|
|
||||||
from apitestbase import VIRTUAL_LIBRARIES_URL, get_authed_instance
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
class LibrariesApiTestCase(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.requests = get_authed_instance()
|
|
||||||
self.assertIsNotNone(self.requests)
|
|
||||||
|
|
||||||
def test_get_default_library_api(self):
|
|
||||||
res = self.requests.get(DEFAULT_LIBRARY_URL)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['repo_id'])
|
|
||||||
self.assertIsNotNone(json['exists'])
|
|
||||||
|
|
||||||
def test_create_default_library_api(self):
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['repo_id'])
|
|
||||||
self.assertEqual(res.json()['exists'], True)
|
|
||||||
res = self.requests.get(DEFAULT_LIBRARY_URL)
|
|
||||||
self.assertEqual(res.json()['exists'], True)
|
|
||||||
|
|
||||||
def test_list_libraries_api(self):
|
|
||||||
res = self.requests.get(LIBRARIES_URL)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
for repo in json:
|
|
||||||
self.assertIsNotNone(repo['permission'])
|
|
||||||
self.assertIsNotNone(repo['encrypted'])
|
|
||||||
self.assertIsNotNone(repo['mtime'])
|
|
||||||
self.assertIsNotNone(repo['owner'])
|
|
||||||
self.assertIsNotNone(repo['id'])
|
|
||||||
self.assertIsNotNone(repo['size'])
|
|
||||||
self.assertIsNotNone(repo['name'])
|
|
||||||
self.assertIsNotNone(repo['type'])
|
|
||||||
# self.assertIsNotNone(repo['virtual']) #allow null for pub-repo
|
|
||||||
self.assertIsNotNone(repo['desc'])
|
|
||||||
self.assertIsNotNone(repo['root'])
|
|
||||||
|
|
||||||
def test_get_library_info_api(self):
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
repo_id = res.json()['repo_id']
|
|
||||||
repo_url = LIBRARIES_URL + repo_id + u'/'
|
|
||||||
res = self.requests.get(repo_url)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
repo = res.json()
|
|
||||||
self.assertIsNotNone(repo)
|
|
||||||
self.assertIsNotNone(repo['encrypted'])
|
|
||||||
self.assertIsNotNone(repo['mtime'])
|
|
||||||
self.assertIsNotNone(repo['owner'])
|
|
||||||
self.assertIsNotNone(repo['id'])
|
|
||||||
self.assertIsNotNone(repo['size'])
|
|
||||||
self.assertIsNotNone(repo['name'])
|
|
||||||
self.assertIsNotNone(repo['root'])
|
|
||||||
self.assertIsNotNone(repo['desc'])
|
|
||||||
self.assertIsNotNone(repo['type'])
|
|
||||||
#self.assertIsNotNone(repo['password_need']) #allow null here
|
|
||||||
|
|
||||||
def test_get_library_owner_api(self):
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
repo_id = res.json()['repo_id']
|
|
||||||
repo_owner_url = LIBRARIES_URL + repo_id + u'/owner/'
|
|
||||||
res = self.requests.get(repo_owner_url)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['owner'])
|
|
||||||
|
|
||||||
def test_get_library_history_api(self):
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
repo_id = res.json()['repo_id']
|
|
||||||
repo_history_url = LIBRARIES_URL + repo_id + u'/history/'
|
|
||||||
res = self.requests.get(repo_history_url)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['commits'])
|
|
||||||
for commit in json['commits']:
|
|
||||||
self.assertIsNotNone(commit['rev_file_size'])
|
|
||||||
#self.assertIsNotNone(commit['rev_file_id']) #allow null
|
|
||||||
self.assertIsNotNone(commit['ctime'])
|
|
||||||
self.assertIsNotNone(commit['creator_name'])
|
|
||||||
self.assertIsNotNone(commit['creator'])
|
|
||||||
self.assertIsNotNone(commit['root_id'])
|
|
||||||
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
|
|
||||||
#self.assertIsNotNone(commit['parent_id']) #allow null
|
|
||||||
self.assertIsNotNone(commit['new_merge'])
|
|
||||||
self.assertIsNotNone(commit['repo_id'])
|
|
||||||
self.assertIsNotNone(commit['desc'])
|
|
||||||
self.assertIsNotNone(commit['id'])
|
|
||||||
self.assertIsNotNone(commit['conflict'])
|
|
||||||
#self.assertIsNotNone(commit['second_parent_id']) #allow null
|
|
||||||
|
|
||||||
def test_create_library_api(self):
|
|
||||||
data = { 'name': 'test' }
|
|
||||||
res = self.requests.post(LIBRARIES_URL, data=data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['encrypted'])
|
|
||||||
self.assertIsNotNone(json['enc_version'])
|
|
||||||
self.assertIsNotNone(json['repo_id'])
|
|
||||||
self.assertIsNotNone(json['magic'])
|
|
||||||
self.assertIsNotNone(json['relay_id'])
|
|
||||||
self.assertIsNotNone(json['repo_version'])
|
|
||||||
self.assertIsNotNone(json['relay_addr'])
|
|
||||||
self.assertIsNotNone(json['token'])
|
|
||||||
self.assertIsNotNone(json['relay_port'])
|
|
||||||
self.assertIsNotNone(json['random_key'])
|
|
||||||
self.assertIsNotNone(json['email'])
|
|
||||||
self.assertIsNotNone(json['repo_name'])
|
|
||||||
|
|
||||||
def test_remove_library_api(self):
|
|
||||||
data = { 'name': 'test' }
|
|
||||||
res = self.requests.post(LIBRARIES_URL, data=data)
|
|
||||||
repo_id = res.json()['repo_id']
|
|
||||||
repo_url = LIBRARIES_URL + repo_id + u'/'
|
|
||||||
res = self.requests.delete(repo_url)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertEqual(res.text, '"success"')
|
|
||||||
|
|
||||||
def test_check_or_create_sub_library_api(self):
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
repo_id = res.json()['repo_id']
|
|
||||||
params = { 'p': '/', 'name': 'sub_lib' }
|
|
||||||
sub_repo_url = LIBRARIES_URL + repo_id + u'/dir/sub_repo/'
|
|
||||||
res = self.requests.get(sub_repo_url, params=params)
|
|
||||||
json = res.json()
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['sub_repo_id'])
|
|
||||||
|
|
||||||
def test_encrpty_or_decrypy_library_api(self):
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
repo_id = res.json()['repo_id']
|
|
||||||
repo_url = LIBRARIES_URL + repo_id + u'/'
|
|
||||||
data = { 'password': 'test' }
|
|
||||||
res = self.requests.post(repo_url, data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertEqual(res.text, '"success"')
|
|
||||||
|
|
||||||
def test_publicize_library_api(self):
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
repo_id = res.json()['repo_id']
|
|
||||||
publicize_repo_url = LIBRARIES_URL + repo_id + u'/public/'
|
|
||||||
res = self.requests.post(publicize_repo_url)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertIsNotNone(res.json())
|
|
||||||
self.assertEqual(res.json()['success'], True)
|
|
||||||
|
|
||||||
def test_depublicize_library_api(self):
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
repo_id = res.json()['repo_id']
|
|
||||||
publicize_repo_url = LIBRARIES_URL + repo_id + u'/public/'
|
|
||||||
self.requests.post(publicize_repo_url)
|
|
||||||
res = self.requests.delete(publicize_repo_url)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertIsNotNone(res.json())
|
|
||||||
self.assertEqual(res.json()['success'], True)
|
|
||||||
|
|
||||||
def test_fetch_library_download_info_api(self):
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
repo_id = res.json()['repo_id']
|
|
||||||
download_info_repo_url = LIBRARIES_URL + repo_id + u'/download-info/'
|
|
||||||
res = self.requests.get(download_info_repo_url)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
#self.assertIsNotNone(json['applet_root']) #does it exist?
|
|
||||||
self.assertIsNotNone(json['relay_addr'])
|
|
||||||
self.assertIsNotNone(json['token'])
|
|
||||||
self.assertIsNotNone(json['repo_id'])
|
|
||||||
self.assertIsNotNone(json['relay_port'])
|
|
||||||
self.assertIsNotNone(json['encrypted'])
|
|
||||||
self.assertIsNotNone(json['repo_name'])
|
|
||||||
self.assertIsNotNone(json['relay_id'])
|
|
||||||
self.assertIsNotNone(json['email'])
|
|
||||||
|
|
||||||
def test_list_virtual_libraries_api(self):
|
|
||||||
res = self.requests.get(VIRTUAL_LIBRARIES_URL)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['virtual-repos'])
|
|
||||||
for repo in json['virtual-repos']:
|
|
||||||
self.assertIsNotNone(repo['virtual_perm'])
|
|
||||||
#self.assertIsNotNone(repo['store_id'])
|
|
||||||
self.assertIsNotNone(repo['worktree_invalid'])
|
|
||||||
self.assertIsNotNone(repo['encrypted'])
|
|
||||||
self.assertIsNotNone(repo['origin_repo_name'])
|
|
||||||
self.assertIsNotNone(repo['last_modify'])
|
|
||||||
self.assertIsNotNone(repo['no_local_history'])
|
|
||||||
#self.assertIsNotNone(repo['head_branch'])
|
|
||||||
self.assertIsNotNone(repo['last_sync_time'])
|
|
||||||
self.assertIsNotNone(repo['id'])
|
|
||||||
self.assertIsNotNone(repo['size'])
|
|
||||||
#self.assertIsNotNone(repo['share_permission'])
|
|
||||||
self.assertIsNotNone(repo['worktree_changed'])
|
|
||||||
self.assertIsNotNone(repo['worktree_checktime'])
|
|
||||||
self.assertIsNotNone(repo['origin_path'])
|
|
||||||
self.assertEqual(repo['is_virtual'], True)
|
|
||||||
self.assertIsNotNone(repo['origin_repo_id'])
|
|
||||||
self.assertIsNotNone(repo['version'])
|
|
||||||
#self.assertIsNotNone(repo['random_key'])
|
|
||||||
self.assertIsNotNone(repo['is_original_owner'])
|
|
||||||
#self.assertIsNotNone(repo['shared_email'])
|
|
||||||
self.assertIsNotNone(repo['enc_version'])
|
|
||||||
self.assertIsNotNone(repo['head_cmmt_id'])
|
|
||||||
#self.assertIsNotNone(repo['desc'])
|
|
||||||
self.assertIsNotNone(repo['index_corrupted'])
|
|
||||||
#self.assertIsNotNone(repo['magic'])
|
|
||||||
self.assertIsNotNone(repo['name'])
|
|
||||||
#self.assertIsNotNone(repo['worktree'])
|
|
||||||
self.assertIsNotNone(repo['auto_sync'])
|
|
||||||
#self.assertIsNotNone(repo['relay_id'])
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
@@ -1,63 +1,14 @@
|
|||||||
from apitestbase import MISC_LIST_EVENTS_URL, MISC_LIST_GROUP_AND_CONTACTS_URL
|
|
||||||
from apitestbase import get_authed_instance, IS_PRO, MISC_SEARCH_URL
|
|
||||||
import unittest
|
import unittest
|
||||||
|
from tests.common.utils import apiurl
|
||||||
|
from tests.api.apitestbase import ApiTestBase
|
||||||
|
|
||||||
class MiscApiTestCase(unittest.TestCase):
|
class MiscApiTest(ApiTestBase):
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.requests = get_authed_instance()
|
|
||||||
self.assertIsNotNone(self.requests)
|
|
||||||
|
|
||||||
def test_list_group_and_contacts_api(self):
|
def test_list_group_and_contacts_api(self):
|
||||||
res = self.requests.get(MISC_LIST_GROUP_AND_CONTACTS_URL)
|
res = self.get(LIST_GROUP_AND_CONTACTS_URL).json()
|
||||||
self.assertEqual(res.status_code, 200)
|
self.assertIsNotNone(res)
|
||||||
json = res.json()
|
self.assertIsInstance(res['contacts'], list)
|
||||||
self.assertIsNotNone(json)
|
self.assertIsNotNone(res['umsgnum'])
|
||||||
self.assertIsNotNone(json['contacts'])
|
self.assertIsNotNone(res['replynum'])
|
||||||
self.assertIsNotNone(json['umsgnum'])
|
self.assertIsInstance(res['groups'], list)
|
||||||
self.assertIsNotNone(json['replynum'])
|
self.assertIsNotNone(res['gmsgnum'])
|
||||||
self.assertIsNotNone(json['groups'])
|
self.assertIsNotNone(res['newreplies'])
|
||||||
self.assertIsNotNone(json['gmsgnum'])
|
|
||||||
self.assertIsNotNone(json['newreplies'])
|
|
||||||
|
|
||||||
def test_search_api(self):
|
|
||||||
# if not pro, skip this
|
|
||||||
if (IS_PRO == False):
|
|
||||||
return
|
|
||||||
res = self.requests.get(MISC_SEARCH_URL + u'?q=*')
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['has_more'])
|
|
||||||
self.assertIsNotNone(json['total'])
|
|
||||||
self.assertIsNotNone(json['results'])
|
|
||||||
for result in json['results']:
|
|
||||||
self.assertIsNotNone(result['repo_id'])
|
|
||||||
self.assertIsNotNone(result['name'])
|
|
||||||
self.assertIsNotNone(result['old'])
|
|
||||||
self.assertIsNotNone(result['last_modified'])
|
|
||||||
self.assertIsNotNone(result['fullpath'])
|
|
||||||
self.assertIsNotNone(result['size'])
|
|
||||||
|
|
||||||
def test_list_events_api(self):
|
|
||||||
# if not pro, skip this
|
|
||||||
if (IS_PRO == False):
|
|
||||||
return
|
|
||||||
res = self.requests.get(MISC_LIST_EVENTS_URL)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['more_offset'])
|
|
||||||
self.assertIsNotNone(json['events'])
|
|
||||||
self.assertIsNotNone(json['more'])
|
|
||||||
for repo in json['events']:
|
|
||||||
self.assertIsNotNone(repo['repo_id'])
|
|
||||||
self.assertIsNotNone(repo['author'])
|
|
||||||
self.assertIsNotNone(repo['nick'])
|
|
||||||
self.assertIsNotNone(repo['time'])
|
|
||||||
self.assertIsNotNone(repo['etype'])
|
|
||||||
self.assertIsNotNone(repo['repo_name'])
|
|
||||||
self.assertIsNotNone(repo['desc'])
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
||||||
|
@@ -1,14 +0,0 @@
|
|||||||
from apitestbase import PING_URL
|
|
||||||
import requests, unittest
|
|
||||||
|
|
||||||
class PingApiTestCase(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.res = requests.get(PING_URL)
|
|
||||||
|
|
||||||
def test_ping_api(self):
|
|
||||||
self.assertEqual(self.res.status_code, 200)
|
|
||||||
self.assertRegexpMatches(self.res.text, u'"pong"')
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
164
tests/api/test_repos.py
Normal file
164
tests/api/test_repos.py
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
#coding: UTF-8
|
||||||
|
"""
|
||||||
|
Test repos api.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from tests.api.apitestbase import ApiTestBase, USERNAME
|
||||||
|
from tests.api.urls import REPOS_URL, DEFAULT_REPO_URL, VIRTUAL_REPOS_URL
|
||||||
|
from tests.common.utils import apiurl, urljoin, randstring
|
||||||
|
|
||||||
|
|
||||||
|
class ReposApiTest(ApiTestBase):
|
||||||
|
use_test_repo = True
|
||||||
|
|
||||||
|
def remove_repo(cls, repo_id):
|
||||||
|
cls.delete(urljoin(REPOS_URL, repo_id))
|
||||||
|
|
||||||
|
def test_get_default_repo(self):
|
||||||
|
repo = self.get(DEFAULT_REPO_URL).json()
|
||||||
|
self.assertIsNotNone(repo['exists'])
|
||||||
|
|
||||||
|
def test_create_default_repo(self):
|
||||||
|
repo = self.post(DEFAULT_REPO_URL).json()
|
||||||
|
self.assertEqual(len(repo['repo_id']), 36)
|
||||||
|
self.assertEqual(repo['exists'], True)
|
||||||
|
|
||||||
|
def test_list_repos(self):
|
||||||
|
repos = self.get(REPOS_URL).json()
|
||||||
|
self.assertHasLen(repos, 1)
|
||||||
|
for repo in repos:
|
||||||
|
self.assertIsNotNone(repo['permission'])
|
||||||
|
self.assertIsNotNone(repo['encrypted'])
|
||||||
|
self.assertIsNotNone(repo['mtime'])
|
||||||
|
self.assertIsNotNone(repo['owner'])
|
||||||
|
self.assertIsNotNone(repo['id'])
|
||||||
|
self.assertIsNotNone(repo['size'])
|
||||||
|
self.assertIsNotNone(repo['name'])
|
||||||
|
self.assertIsNotNone(repo['type'])
|
||||||
|
# self.assertIsNotNone(repo['virtual']) #allow null for pub-repo
|
||||||
|
self.assertIsNotNone(repo['desc'])
|
||||||
|
self.assertIsNotNone(repo['root'])
|
||||||
|
|
||||||
|
def test_get_repo_info(self):
|
||||||
|
repo = self.get(test_repo_url).json()
|
||||||
|
self.assertFalse(repo['encrypted'])
|
||||||
|
self.assertIsNotNone(repo['mtime'])
|
||||||
|
self.assertIsNotNone(repo['owner'])
|
||||||
|
self.assertIsNotNone(repo['id'])
|
||||||
|
self.assertIsNotNone(repo['size'])
|
||||||
|
self.assertIsNotNone(repo['name'])
|
||||||
|
self.assertIsNotNone(repo['root'])
|
||||||
|
self.assertIsNotNone(repo['desc'])
|
||||||
|
self.assertIsNotNone(repo['type'])
|
||||||
|
# self.assertIsNotNone(repo['password_need']) #allow null here
|
||||||
|
|
||||||
|
def test_get_repo_owner(self):
|
||||||
|
repo_owner_url = urljoin(self.test_repo_url, '/owner/')
|
||||||
|
info = self.get(repo_owner_url).json()
|
||||||
|
self.assertEqual(info['owner'], self.test_user_name)
|
||||||
|
|
||||||
|
def test_get_repo_history(self):
|
||||||
|
repo_history_url = urljoin(REPOS_URL, self.test_repo_id, '/history/')
|
||||||
|
history = self.get(repo_history_url).json()
|
||||||
|
for commit in history['commits']:
|
||||||
|
self.assertIsNotNone(commit['rev_file_size'])
|
||||||
|
#self.assertIsNotNone(commit['rev_file_id']) #allow null
|
||||||
|
self.assertIsNotNone(commit['ctime'])
|
||||||
|
self.assertIsNotNone(commit['creator_name'])
|
||||||
|
self.assertIsNotNone(commit['creator'])
|
||||||
|
self.assertIsNotNone(commit['root_id'])
|
||||||
|
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
|
||||||
|
#self.assertIsNotNone(commit['parent_id']) #allow null
|
||||||
|
self.assertIsNotNone(commit['new_merge'])
|
||||||
|
self.assertIsNotNone(commit['repo_id'])
|
||||||
|
self.assertIsNotNone(commit['desc'])
|
||||||
|
self.assertIsNotNone(commit['id'])
|
||||||
|
self.assertIsNotNone(commit['conflict'])
|
||||||
|
#self.assertIsNotNone(commit['second_parent_id']) #allow null
|
||||||
|
|
||||||
|
def test_create_repo(self):
|
||||||
|
data = {'name': 'test'}
|
||||||
|
res = self.post(REPOS_URL, data=data)
|
||||||
|
repo = res.json()
|
||||||
|
self.assertIsNotNone(repo['encrypted'])
|
||||||
|
self.assertIsNotNone(repo['enc_version'])
|
||||||
|
self.assertIsNotNone(repo['repo_id'])
|
||||||
|
self.assertIsNotNone(repo['magic'])
|
||||||
|
self.assertIsNotNone(repo['relay_id'])
|
||||||
|
self.assertIsNotNone(repo['repo_version'])
|
||||||
|
self.assertIsNotNone(repo['relay_addr'])
|
||||||
|
self.assertIsNotNone(repo['token'])
|
||||||
|
self.assertIsNotNone(repo['relay_port'])
|
||||||
|
self.assertIsNotNone(repo['random_key'])
|
||||||
|
self.assertIsNotNone(repo['email'])
|
||||||
|
self.assertIsNotNone(repo['repo_name'])
|
||||||
|
|
||||||
|
repo_id = repo['repo_id']
|
||||||
|
self.remove_repo(repo_id)
|
||||||
|
# Check the repo is really removed
|
||||||
|
self.get(urljoin(REPOS_URL, repo_id), expected=404)
|
||||||
|
|
||||||
|
# TODO: create a sub folder and use it as a sub repo
|
||||||
|
# def test_check_or_create_sub_repo(self):
|
||||||
|
# sub_repo_url = urljoin(REPOS_URL, self.test_repo_id, '/dir/sub_repo/')
|
||||||
|
# params = {'p': '/', 'name': 'sub_lib'}
|
||||||
|
# info = self.get(sub_repo_url, params=params).json()
|
||||||
|
# self.assertHasLen(info['sub_repo_id'], 36)
|
||||||
|
# self.remove_repo(info['sub_repo_id'])
|
||||||
|
|
||||||
|
def test_encrpty_or_decrypy_repo(self):
|
||||||
|
# TODO: create a encrypted library
|
||||||
|
pass
|
||||||
|
# repo_url = urljoin(REPOS_URL, repo_id)
|
||||||
|
# data = {'password': 'test'}
|
||||||
|
# res = self.post(repo_url, data)
|
||||||
|
# self.assertEqual(res.text, '"success"')
|
||||||
|
|
||||||
|
def test_fetch_repo_download_info(self):
|
||||||
|
download_info_repo_url = urljoin(REPOS_URL, self.test_repo_id, '/download-info/')
|
||||||
|
info = self.get(download_info_repo_url).json()
|
||||||
|
self.assertIsNotNone(info['relay_addr'])
|
||||||
|
self.assertIsNotNone(info['token'])
|
||||||
|
self.assertIsNotNone(info['repo_id'])
|
||||||
|
self.assertIsNotNone(info['relay_port'])
|
||||||
|
self.assertIsNotNone(info['encrypted'])
|
||||||
|
self.assertIsNotNone(info['repo_name'])
|
||||||
|
self.assertIsNotNone(info['relay_id'])
|
||||||
|
self.assertIsNotNone(info['email'])
|
||||||
|
|
||||||
|
def test_list_virtual_repos(self):
|
||||||
|
# TODO: we need to create at least on virtual repo first
|
||||||
|
vrepos = self.get(VIRTUAL_REPOS_URL).json()['virtual-repos']
|
||||||
|
for repo in vrepos:
|
||||||
|
self.assertIsNotNone(repo['virtual_perm'])
|
||||||
|
#self.assertIsNotNone(repo['store_id'])
|
||||||
|
self.assertIsNotNone(repo['worktree_invalid'])
|
||||||
|
self.assertIsNotNone(repo['encrypted'])
|
||||||
|
self.assertIsNotNone(repo['origin_repo_name'])
|
||||||
|
self.assertIsNotNone(repo['last_modify'])
|
||||||
|
self.assertIsNotNone(repo['no_local_history'])
|
||||||
|
#self.assertIsNotNone(repo['head_branch'])
|
||||||
|
self.assertIsNotNone(repo['last_sync_time'])
|
||||||
|
self.assertIsNotNone(repo['id'])
|
||||||
|
self.assertIsNotNone(repo['size'])
|
||||||
|
#self.assertIsNotNone(repo['share_permission'])
|
||||||
|
self.assertIsNotNone(repo['worktree_changed'])
|
||||||
|
self.assertIsNotNone(repo['worktree_checktime'])
|
||||||
|
self.assertIsNotNone(repo['origin_path'])
|
||||||
|
self.assertEqual(repo['is_virtual'], True)
|
||||||
|
self.assertIsNotNone(repo['origin_repo_id'])
|
||||||
|
self.assertIsNotNone(repo['version'])
|
||||||
|
#self.assertIsNotNone(repo['random_key'])
|
||||||
|
self.assertIsNotNone(repo['is_original_owner'])
|
||||||
|
#self.assertIsNotNone(repo['shared_email'])
|
||||||
|
self.assertIsNotNone(repo['enc_version'])
|
||||||
|
self.assertIsNotNone(repo['head_cmmt_id'])
|
||||||
|
#self.assertIsNotNone(repo['desc'])
|
||||||
|
self.assertIsNotNone(repo['index_corrupted'])
|
||||||
|
#self.assertIsNotNone(repo['magic'])
|
||||||
|
self.assertIsNotNone(repo['name'])
|
||||||
|
#self.assertIsNotNone(repo['worktree'])
|
||||||
|
self.assertIsNotNone(repo['auto_sync'])
|
||||||
|
#self.assertIsNotNone(repo['relay_id'])
|
@@ -1,32 +1,40 @@
|
|||||||
from common.utils import randomword
|
#coding: UTF-8
|
||||||
from apitestbase import SHARED_LINKS_URL, LIBRARIES_URL, \
|
|
||||||
DEFAULT_LIBRARY_URL, SHARED_LIBRARIES_URL, BESHARED_LIBRARIES_URL, \
|
|
||||||
USERNAME, SHARED_FILES_URL, GROUPS_URL, F_URL, S_F_URL, \
|
|
||||||
get_authed_instance, get_anonymous_instance
|
|
||||||
import unittest
|
import unittest
|
||||||
|
from urllib import urlencode, quote
|
||||||
|
from tests.common.utils import apiurl, randstring, urljoin
|
||||||
|
from tests.api.apitestbase import ApiTestBase
|
||||||
|
from tests.api.urls import SHARED_LINKS_URL, SHARED_LIBRARIES_URL, \
|
||||||
|
BESHARED_LIBRARIES_URL, SHARED_FILES_URL, F_URL, S_F_URL
|
||||||
|
|
||||||
class SharesApiTestCase(unittest.TestCase):
|
class SharesApiTest(ApiTestBase):
|
||||||
|
use_test_group = True
|
||||||
|
use_test_repo = True
|
||||||
|
|
||||||
def setUp(self):
|
def create_file(self, fname=None):
|
||||||
self.requests = get_authed_instance()
|
data = {'operation': 'create'}
|
||||||
self.arequests = get_anonymous_instance()
|
fname = fname or ('文件 %s.txt' % randstring(10))
|
||||||
self.assertIsNotNone(self.requests)
|
fpath = '/' + fname
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
query = urlencode(dict(p=fpath))
|
||||||
self.rid = res.json()['repo_id']
|
furl = self.test_file_url + '?' + query
|
||||||
self.rurl = LIBRARIES_URL + str(self.rid) + u'/'
|
res = self.post(furl, data=data, expected=201)
|
||||||
self.furl = self.rurl + u'file/'
|
self.assertEqual(res.text, '"success"')
|
||||||
self.durl = self.rurl + u'dir/'
|
return fname, furl
|
||||||
self.gname = randomword(16)
|
|
||||||
data = { 'group_name': self.gname }
|
|
||||||
res = self.requests.put(GROUPS_URL, data=data)
|
|
||||||
self.gid = res.json()['group_id']
|
|
||||||
|
|
||||||
def test_list_file_shared_links_api(self):
|
def test_create_file_shared_link(self):
|
||||||
res = self.requests.get(SHARED_LINKS_URL)
|
fname, _ = self.create_file()
|
||||||
self.assertEqual(res.status_code, 200)
|
fsurl = urljoin(self.test_file_url, 'shared-link')
|
||||||
json = res.json()
|
data = {
|
||||||
self.assertIsNotNone(json)
|
'type': 'f',
|
||||||
for fileshare in json['fileshares']:
|
'p': '/' + fname,
|
||||||
|
}
|
||||||
|
res = self.put(fsurl, data=data, expected=201)
|
||||||
|
self.assertRegexpMatches(res.headers['Location'], \
|
||||||
|
r'http(.*)/f/(\w{10,10})/')
|
||||||
|
|
||||||
|
res = self.get(SHARED_LINKS_URL).json()
|
||||||
|
self.assertNotEmpty(res)
|
||||||
|
for fileshare in res['fileshares']:
|
||||||
self.assertIsNotNone(fileshare['username'])
|
self.assertIsNotNone(fileshare['username'])
|
||||||
self.assertIsNotNone(fileshare['repo_id'])
|
self.assertIsNotNone(fileshare['repo_id'])
|
||||||
#self.assertIsNotNone(fileshare['ctime'])
|
#self.assertIsNotNone(fileshare['ctime'])
|
||||||
@@ -35,203 +43,190 @@ class SharesApiTestCase(unittest.TestCase):
|
|||||||
self.assertIsNotNone(fileshare['view_cnt'])
|
self.assertIsNotNone(fileshare['view_cnt'])
|
||||||
self.assertIsNotNone(fileshare['path'])
|
self.assertIsNotNone(fileshare['path'])
|
||||||
|
|
||||||
def test_create_file_shared_link_api(self):
|
|
||||||
data = { 'operation': 'create' }
|
|
||||||
furl = self.furl + u'?p=/test_create_shared_link_f'
|
|
||||||
self.requests.post(furl, data=data)
|
|
||||||
fsurl = self.furl + u'shared-link/'
|
|
||||||
data = { 'type': 'f', 'p': '/test_create_shared_link_f' }
|
|
||||||
res = self.requests.put(fsurl, data=data)
|
|
||||||
self.assertEqual(res.status_code, 201)
|
|
||||||
self.assertRegexpMatches(res.headers['Location'], \
|
|
||||||
r'http(.*)/f/(\w{10,10})/')
|
|
||||||
|
|
||||||
def test_create_directory_shared_link_api(self):
|
# def test_create_directory_shared_link(self):
|
||||||
data = { 'operation': 'mkdir' }
|
# data = { 'operation': 'mkdir' }
|
||||||
durl = self.durl + u'?p=/test_create_shared_link_d'
|
# durl = self.test_dir_url + u'?p=/test_create_shared_link_d'
|
||||||
self.requests.post(durl, data=data)
|
# self.post(durl, data=data)
|
||||||
self.requests.post(durl, data=data)
|
# self.post(durl, data=data)
|
||||||
fsurl = self.furl + u'shared-link/'
|
# fsurl = self.test_file_url + u'shared-link/'
|
||||||
data = { 'type': 'd', 'p': '/test_create_shared_link_d' }
|
# data = { 'type': 'd', 'p': '/test_create_shared_link_d' }
|
||||||
res = self.requests.put(fsurl, data=data)
|
# res = self.put(fsurl, data=data)
|
||||||
self.assertEqual(res.status_code, 201)
|
# self.assertEqual(res.status_code, 201)
|
||||||
self.assertRegexpMatches(res.headers['Location'], \
|
# self.assertRegexpMatches(res.headers['Location'], \
|
||||||
r'http(.*)/d/(\w{10,10})/')
|
# r'http(.*)/d/(\w{10,10})/')
|
||||||
|
|
||||||
def test_remove_shared_link_api(self):
|
# def test_remove_shared_link(self):
|
||||||
data = { 'operation': 'create' }
|
# data = { 'operation': 'create' }
|
||||||
furl = self.furl + u'?p=/test_remove_shared_link_f'
|
# furl = self.test_file_url + u'?p=/test_remove_shared_link_f'
|
||||||
self.requests.post(furl, data=data)
|
# self.post(furl, data=data)
|
||||||
fsurl = self.furl + u'shared-link/'
|
# fsurl = self.test_file_url + u'shared-link/'
|
||||||
data = { 'type': 'f', 'p': '/test_remove_shared_link_f' }
|
# data = { 'type': 'f', 'p': '/test_remove_shared_link_f' }
|
||||||
res = self.requests.put(fsurl, data=data)
|
# res = self.put(fsurl, data=data)
|
||||||
import re
|
# import re
|
||||||
t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
||||||
fturl = SHARED_LINKS_URL + u'?t=' + t
|
# fturl = SHARED_LINKS_URL + u'?t=' + t
|
||||||
res = self.requests.delete(fturl)
|
# res = self.delete(fturl)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
self.assertEqual(res.text, u'{}')
|
# self.assertEqual(res.text, u'{}')
|
||||||
|
|
||||||
def test_get_shared_file_url_api(self):
|
# def test_get_shared_file_url(self):
|
||||||
data = { 'operation': 'create' }
|
# data = { 'operation': 'create' }
|
||||||
furl = self.furl + u'?p=/test_visit_shared_link_f'
|
# furl = self.test_file_url + u'?p=/test_visit_shared_link_f'
|
||||||
self.requests.post(furl, data=data)
|
# self.post(furl, data=data)
|
||||||
fsurl = self.furl + u'shared-link/'
|
# fsurl = self.test_file_url + u'shared-link/'
|
||||||
data = { 'type': 'f', 'p': '/test_visit_shared_link_f' }
|
# data = { 'type': 'f', 'p': '/test_visit_shared_link_f' }
|
||||||
res = self.requests.put(fsurl, data=data)
|
# res = self.put(fsurl, data=data)
|
||||||
import re
|
# import re
|
||||||
t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
||||||
fdurl = F_URL + t + u'/'
|
# fdurl = F_URL + t + u'/'
|
||||||
res = self.requests.get(fdurl)
|
# res = self.get(fdurl)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
self.assertRegexpMatches(res.text, r'"http(.*)/files/\w{8,8}/(.*)"')
|
# self.assertRegexpMatches(res.text, r'"http(.*)/files/\w{8,8}/(.*)"')
|
||||||
|
|
||||||
def test_get_shared_file_detail_api(self):
|
# def test_get_shared_file_detail(self):
|
||||||
data = { 'operation': 'create' }
|
# data = { 'operation': 'create' }
|
||||||
furl = self.furl + u'?p=/test_visitd_shared_link_f'
|
# furl = self.test_file_url + u'?p=/test_visitd_shared_link_f'
|
||||||
self.requests.post(furl, data=data)
|
# self.post(furl, data=data)
|
||||||
fsurl = self.furl + u'shared-link/'
|
# fsurl = self.test_file_url + u'shared-link/'
|
||||||
data = { 'type': 'f', 'p': '/test_visitd_shared_link_f' }
|
# data = { 'type': 'f', 'p': '/test_visitd_shared_link_f' }
|
||||||
res = self.requests.put(fsurl, data=data)
|
# res = self.put(fsurl, data=data)
|
||||||
import re
|
# import re
|
||||||
t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
||||||
fdurl = F_URL + t + u'/detail/'
|
# fdurl = F_URL + t + u'/detail/'
|
||||||
res = self.requests.get(fdurl)
|
# res = self.get(fdurl)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
json = res.json()
|
# json = res.json()
|
||||||
self.assertIsNotNone(json)
|
# self.assertIsNotNone(json)
|
||||||
self.assertIsNotNone(json['repo_id'])
|
# self.assertIsNotNone(json['repo_id'])
|
||||||
self.assertIsNotNone(json['name'])
|
# self.assertIsNotNone(json['name'])
|
||||||
self.assertIsNotNone(json['size'])
|
# self.assertIsNotNone(json['size'])
|
||||||
self.assertIsNotNone(json['path'])
|
# self.assertIsNotNone(json['path'])
|
||||||
self.assertIsNotNone(json['type'])
|
# self.assertIsNotNone(json['type'])
|
||||||
self.assertIsNotNone(json['mtime'])
|
# self.assertIsNotNone(json['mtime'])
|
||||||
self.assertIsNotNone(json['id'])
|
# self.assertIsNotNone(json['id'])
|
||||||
|
|
||||||
def test_get_private_shared_file_url_api(self):
|
# def test_get_private_shared_file_url(self):
|
||||||
if True: #todo: override this
|
# if True: #todo: override this
|
||||||
return
|
# return
|
||||||
data = { 'operation': 'create' }
|
# data = { 'operation': 'create' }
|
||||||
furl = self.furl + u'?p=/test_visit_shared_link_sf'
|
# furl = self.test_file_url + u'?p=/test_visit_shared_link_sf'
|
||||||
self.requests.post(furl, data=data)
|
# self.post(furl, data=data)
|
||||||
fsurl = self.furl + u'shared-link/'
|
# fsurl = self.test_file_url + u'shared-link/'
|
||||||
data = { 'type': 'f', 'p': '/test_visit_shared_link_sf' }
|
# data = { 'type': 'f', 'p': '/test_visit_shared_link_sf' }
|
||||||
res = self.requests.put(fsurl, data=data)
|
# res = self.put(fsurl, data=data)
|
||||||
import re
|
# import re
|
||||||
t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
||||||
fdurl = S_F_URL + t + u'/'
|
# fdurl = S_F_URL + t + u'/'
|
||||||
res = self.requests.get(fdurl)
|
# res = self.get(fdurl)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
self.assertRegexpMatches(res.text, r'"http(.*)/files/\w{8,8}/(.*)"')
|
# self.assertRegexpMatches(res.text, r'"http(.*)/files/\w{8,8}/(.*)"')
|
||||||
|
|
||||||
def test_get_private_shared_file_detail_api(self):
|
# def test_get_private_shared_file_detail(self):
|
||||||
if True: #todo: override this
|
# if True: #todo: override this
|
||||||
return
|
# return
|
||||||
data = { 'operation': 'create' }
|
# data = { 'operation': 'create' }
|
||||||
furl = self.furl + u'?p=/test_visitd_shared_link_sf'
|
# furl = self.test_file_url + u'?p=/test_visitd_shared_link_sf'
|
||||||
self.requests.post(furl, data=data)
|
# self.post(furl, data=data)
|
||||||
fsurl = self.furl + u'shared-link/'
|
# fsurl = self.test_file_url + u'shared-link/'
|
||||||
data = { 'type': 'f', 'p': '/test_visitd_shared_link_sf' }
|
# data = { 'type': 'f', 'p': '/test_visitd_shared_link_sf' }
|
||||||
res = self.requests.put(fsurl, data=data)
|
# res = self.put(fsurl, data=data)
|
||||||
import re
|
# import re
|
||||||
t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
||||||
fdurl = S_F_URL + t + u'/detail/'
|
# fdurl = S_F_URL + t + u'/detail/'
|
||||||
res = self.requests.get(fdurl)
|
# res = self.get(fdurl)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
json = res.json()
|
# json = res.json()
|
||||||
self.assertIsNotNone(json)
|
# self.assertIsNotNone(json)
|
||||||
self.assertIsNotNone(json['repo_id'])
|
# self.assertIsNotNone(json['repo_id'])
|
||||||
self.assertIsNotNone(json['name'])
|
# self.assertIsNotNone(json['name'])
|
||||||
self.assertIsNotNone(json['size'])
|
# self.assertIsNotNone(json['size'])
|
||||||
self.assertIsNotNone(json['path'])
|
# self.assertIsNotNone(json['path'])
|
||||||
self.assertIsNotNone(json['type'])
|
# self.assertIsNotNone(json['type'])
|
||||||
self.assertIsNotNone(json['mtime'])
|
# self.assertIsNotNone(json['mtime'])
|
||||||
self.assertIsNotNone(json['id'])
|
# self.assertIsNotNone(json['id'])
|
||||||
|
|
||||||
def test_remove_shared_file_api(self):
|
# def test_remove_shared_file(self):
|
||||||
if True: #todo: override this
|
# if True: #todo: override this
|
||||||
return
|
# return
|
||||||
data = { 'operation': 'create' }
|
# data = { 'operation': 'create' }
|
||||||
furl = self.furl + u'?p=/test_remove_shared_file'
|
# furl = self.test_file_url + u'?p=/test_remove_shared_file'
|
||||||
self.requests.post(furl, data=data)
|
# self.post(furl, data=data)
|
||||||
fsurl = self.furl + u'shared-link/'
|
# fsurl = self.test_file_url + u'shared-link/'
|
||||||
data = { 'type': 'f', 'p': '/test_remove_shared_file' }
|
# data = { 'type': 'f', 'p': '/test_remove_shared_file' }
|
||||||
res = self.requests.put(fsurl, data=data)
|
# res = self.put(fsurl, data=data)
|
||||||
import re
|
# import re
|
||||||
t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
||||||
fturl = SHARED_FILES_URL + u'?t=' + t
|
# fturl = SHARED_FILES_URL + u'?t=' + t
|
||||||
res = self.requests.delete(fturl)
|
# res = self.delete(fturl)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
self.assertEqual(res.text, u'{}')
|
# self.assertEqual(res.text, u'{}')
|
||||||
|
|
||||||
def test_list_shared_libraries_api(self):
|
# def test_list_shared_libraries(self):
|
||||||
res = self.requests.get(SHARED_LIBRARIES_URL)
|
# res = self.get(SHARED_LIBRARIES_URL)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
json = res.json()
|
# json = res.json()
|
||||||
self.assertIsNotNone(json)
|
# self.assertIsNotNone(json)
|
||||||
for repo in json:
|
# for repo in json:
|
||||||
self.assertIsNotNone(repo['repo_id'])
|
# self.assertIsNotNone(repo['repo_id'])
|
||||||
self.assertIsNotNone(repo['share_type'])
|
# self.assertIsNotNone(repo['share_type'])
|
||||||
self.assertIsNotNone(repo['permission'])
|
# self.assertIsNotNone(repo['permission'])
|
||||||
self.assertIsNotNone(repo['encrypted'])
|
# self.assertIsNotNone(repo['encrypted'])
|
||||||
self.assertIsNotNone(repo['user'])
|
# self.assertIsNotNone(repo['user'])
|
||||||
self.assertIsNotNone(repo['last_modified'])
|
# self.assertIsNotNone(repo['last_modified'])
|
||||||
self.assertIsNotNone(repo['repo_desc'])
|
# self.assertIsNotNone(repo['repo_desc'])
|
||||||
self.assertIsNotNone(repo['group_id'])
|
# self.assertIsNotNone(repo['group_id'])
|
||||||
self.assertIsNotNone(repo['repo_name'])
|
# self.assertIsNotNone(repo['repo_name'])
|
||||||
|
|
||||||
def test_list_beshared_libraries_api(self):
|
# def test_list_beshared_libraries(self):
|
||||||
res = self.requests.get(BESHARED_LIBRARIES_URL)
|
# res = self.get(BESHARED_LIBRARIES_URL)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
json = res.json()
|
# json = res.json()
|
||||||
self.assertIsNotNone(json)
|
# self.assertIsNotNone(json)
|
||||||
for repo in json:
|
# for repo in json:
|
||||||
self.assertIsNotNone(repo['user'])
|
# self.assertIsNotNone(repo['user'])
|
||||||
self.assertIsNotNone(repo['repo_id'])
|
# self.assertIsNotNone(repo['repo_id'])
|
||||||
self.assertIsNotNone(repo['share_type'])
|
# self.assertIsNotNone(repo['share_type'])
|
||||||
self.assertIsNotNone(repo['permission'])
|
# self.assertIsNotNone(repo['permission'])
|
||||||
self.assertIsNotNone(repo['encrypted'])
|
# self.assertIsNotNone(repo['encrypted'])
|
||||||
self.assertIsNotNone(repo['last_modified'])
|
# self.assertIsNotNone(repo['last_modified'])
|
||||||
self.assertIsNotNone(repo['repo_desc'])
|
# self.assertIsNotNone(repo['repo_desc'])
|
||||||
self.assertIsNotNone(repo['group_id'])
|
# self.assertIsNotNone(repo['group_id'])
|
||||||
self.assertIsNotNone(repo['repo_name'])
|
# self.assertIsNotNone(repo['repo_name'])
|
||||||
self.assertIsNotNone(repo['is_virtual'])
|
# self.assertIsNotNone(repo['is_virtual'])
|
||||||
|
|
||||||
def test_share_library_api(self):
|
# def test_share_library(self):
|
||||||
data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid , \
|
# data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid , \
|
||||||
'permission': 'rw' }
|
# 'permission': 'rw' }
|
||||||
slurl = SHARED_LIBRARIES_URL + str(self.rid) + u'/'
|
# slurl = SHARED_LIBRARIES_URL + str(self.test_repo_id) + u'/'
|
||||||
res = self.requests.put(slurl, params=data)
|
# res = self.put(slurl, params=data)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
self.assertEqual(res.text, u'"success"')
|
# self.assertEqual(res.text, u'"success"')
|
||||||
|
|
||||||
def test_un_share_library_api(self):
|
# def test_un_share_library(self):
|
||||||
data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid , \
|
# data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid , \
|
||||||
'permission': 'rw' }
|
# 'permission': 'rw' }
|
||||||
slurl = SHARED_LIBRARIES_URL + str(self.rid) + u'/'
|
# slurl = SHARED_LIBRARIES_URL + str(self.test_repo_id) + u'/'
|
||||||
data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid }
|
# data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid }
|
||||||
self.requests.put(slurl, params=data)
|
# self.put(slurl, params=data)
|
||||||
res = self.requests.delete(slurl, params=data)
|
# res = self.delete(slurl, params=data)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
self.assertEqual(res.text, u'"success"')
|
# self.assertEqual(res.text, u'"success"')
|
||||||
|
|
||||||
def test_list_shared_files_api(self):
|
# def test_list_shared_files(self):
|
||||||
res = self.requests.get(SHARED_FILES_URL)
|
# res = self.get(SHARED_FILES_URL)
|
||||||
self.assertEqual(res.status_code, 200)
|
# self.assertEqual(res.status_code, 200)
|
||||||
json = res.json()
|
# json = res.json()
|
||||||
self.assertIsNotNone(json)
|
# self.assertIsNotNone(json)
|
||||||
self.assertIsNotNone(json['priv_share_in'])
|
# self.assertIsNotNone(json['priv_share_in'])
|
||||||
self.assertIsNotNone(json['priv_share_out'])
|
# self.assertIsNotNone(json['priv_share_out'])
|
||||||
|
|
||||||
for sfiles in zip(json['priv_share_in'], json['priv_share_out']):
|
# for sfiles in zip(json['priv_share_in'], json['priv_share_out']):
|
||||||
for sfile in sfiles:
|
# for sfile in sfiles:
|
||||||
self.assertIsNotNone(sfile['s_type'])
|
# self.assertIsNotNone(sfile['s_type'])
|
||||||
self.assertIsNotNone(sfile['repo_id'])
|
# self.assertIsNotNone(sfile['repo_id'])
|
||||||
self.assertIsNotNone(sfile['permission'])
|
# self.assertIsNotNone(sfile['permission'])
|
||||||
self.assertIsNotNone(sfile['to_user'])
|
# self.assertIsNotNone(sfile['to_user'])
|
||||||
self.assertIsNotNone(sfile['token'])
|
# self.assertIsNotNone(sfile['token'])
|
||||||
self.assertIsNotNone(sfile['from_user'])
|
# self.assertIsNotNone(sfile['from_user'])
|
||||||
self.assertIsNotNone(sfile['path'])
|
# self.assertIsNotNone(sfile['path'])
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
||||||
|
32
tests/api/urls.py
Normal file
32
tests/api/urls.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
from tests.common.common import USERNAME
|
||||||
|
from tests.common.utils import apiurl
|
||||||
|
|
||||||
|
PING_URL = apiurl('/api2/ping/')
|
||||||
|
TOKEN_URL = apiurl('/api2/auth-token/')
|
||||||
|
AUTH_PING_URL = apiurl('/api2/auth/ping/')
|
||||||
|
|
||||||
|
ACCOUNTS_URL = apiurl('/api2/accounts/')
|
||||||
|
ACCOUNT_INFO_URL = apiurl('/api2/account/info/')
|
||||||
|
AVATAR_BASE_URL = apiurl(u'/api2/avatars/')
|
||||||
|
|
||||||
|
REPOS_URL = apiurl('/api2/repos/')
|
||||||
|
DEFAULT_REPO_URL = apiurl('/api2/default-repo/')
|
||||||
|
VIRTUAL_REPOS_URL = apiurl('/api2/virtual-repos/')
|
||||||
|
|
||||||
|
GROUPS_URL = apiurl(u'/api2/groups/')
|
||||||
|
|
||||||
|
USERMSGS_URL = apiurl('/api2/user/msgs/', USERNAME)
|
||||||
|
USERMSGS_COUNT_URL = apiurl('/api2/unseen_messages/')
|
||||||
|
GROUPMSGS_URL = apiurl('/api2/group/msgs/')
|
||||||
|
GROUPMSGS_NREPLY_URL = apiurl('/api2/new_replies/')
|
||||||
|
|
||||||
|
STARREDFILES_URL = apiurl('/api2/starredfiles/')
|
||||||
|
SHARED_LINKS_URL = apiurl('/api2/shared-links/')
|
||||||
|
SHARED_LIBRARIES_URL = apiurl('/api2/shared-repos/')
|
||||||
|
BESHARED_LIBRARIES_URL = apiurl('/api2/beshared-repos/')
|
||||||
|
SHARED_FILES_URL = apiurl('/api2/shared-files/')
|
||||||
|
F_URL = apiurl('/api2/f/')
|
||||||
|
S_F_URL = apiurl('/api2/s/f/')
|
||||||
|
|
||||||
|
LIST_GROUP_AND_CONTACTS_URL = apiurl('/api2/groupandcontacts/')
|
||||||
|
|
@@ -3,7 +3,9 @@ import random
|
|||||||
|
|
||||||
from .common import BASE_URL
|
from .common import BASE_URL
|
||||||
|
|
||||||
def randomword(length):
|
def randstring(length=0):
|
||||||
|
if length == 0:
|
||||||
|
length = random.randint(1, 30)
|
||||||
return ''.join(random.choice(string.lowercase) for i in range(length))
|
return ''.join(random.choice(string.lowercase) for i in range(length))
|
||||||
|
|
||||||
def urljoin(base, *args):
|
def urljoin(base, *args):
|
||||||
|
Reference in New Issue
Block a user