From f65c783fb668be17bb0de8bb0ec4643e16f2592b Mon Sep 17 00:00:00 2001 From: lins05 Date: Sat, 6 Sep 2014 11:38:20 +0800 Subject: [PATCH] improved tests --- tests/__init__.py | 14 -- tests/api/apitestbase.py | 207 +++++++++++++++--------- tests/api/test_accounts.py | 4 +- tests/api/test_avatar.py | 4 +- tests/api/test_files.py | 278 ++++++++++++++++----------------- tests/api/test_groupmsgs.py | 74 --------- tests/api/test_groups.py | 40 ++--- tests/api/test_misc.py | 4 +- tests/api/test_repos.py | 150 +++++++++--------- tests/api/test_shares.py | 244 +++-------------------------- tests/api/test_starredfiles.py | 41 ----- tests/api/test_usermsgs.py | 38 ----- tests/common/common.py | 16 +- tests/common/utils.py | 2 +- tests/seahubtests.sh | 25 ++- tests/ui/test_login.py | 48 +++--- 16 files changed, 434 insertions(+), 755 deletions(-) delete mode 100644 tests/api/test_groupmsgs.py delete mode 100644 tests/api/test_starredfiles.py delete mode 100644 tests/api/test_usermsgs.py diff --git a/tests/__init__.py b/tests/__init__.py index 324d47746c..e69de29bb2 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,14 +0,0 @@ -# Copyright 2014 seahub authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from common import common diff --git a/tests/api/apitestbase.py b/tests/api/apitestbase.py index 04ef698418..93a5bd93c8 100644 --- a/tests/api/apitestbase.py +++ b/tests/api/apitestbase.py @@ -1,11 +1,12 @@ #coding: UTF-8 -import re import requests import unittest +from contextlib import contextmanager from nose.tools import assert_equal, assert_in # pylint: disable=E0611 +from urllib import quote -from tests.common.common import USERNAME, PASSWORD, IS_PRO, \ +from tests.common.common import USERNAME, PASSWORD, \ ADMIN_USERNAME, ADMIN_PASSWORD from tests.common.utils import apiurl, urljoin, randstring @@ -14,75 +15,11 @@ from tests.api.urls import TOKEN_URL, GROUPS_URL, ACCOUNTS_URL, REPOS_URL class ApiTestBase(unittest.TestCase): _token = None _admin_token = None - - use_test_user = False - use_test_group = False - use_test_repo = False - - test_user_name = None - test_user_url = None - - test_group_name = None - test_group_id = None - test_group_url = None - - test_repo_id = None - test_repo_url = None - test_file_url = None - test_dir_url = None - - def setUp(self): - if self.use_test_user: - self.create_tmp_user() - if self.use_test_group: - self.create_tmp_group() - if self.use_test_repo: - self.create_tmp_repo() - - def tearDown(self): - if self.use_test_user: - self.remove_tmp_user() - if self.use_test_group: - self.remove_tmp_group() - if self.use_test_repo: - self.remove_tmp_repo() - - def create_tmp_repo(self): - repo_name = '测试-test-repo-%s' % randstring(6) - data = { - 'name': repo_name, - 'desc': 'just for test - 测试用资料库', - } - repo = self.post(REPOS_URL, data=data).json() - self.test_repo_id = repo['repo_id'] - self.test_repo_url = urljoin(REPOS_URL, self.test_repo_id) - self.test_file_url = urljoin(self.test_repo_url, 'file') - self.test_dir_url = urljoin(self.test_repo_url, 'dir') - - def remove_tmp_repo(self): - if self.test_repo_id: - self.delete(self.test_repo_url) - - def create_tmp_group(self): - self.test_group_name = '测试群组-%s' % randstring(16) - data = {'group_name': self.test_group_name} - self.test_group_id = self.put(GROUPS_URL, data=data).json()['group_id'] - self.test_group_url = urljoin(GROUPS_URL, str(self.test_group_id)) - - def remove_tmp_group(self): - if self.test_group_id: - self.delete(self.test_group_url) - - def create_tmp_user(self): - data = {'password': 'testtest'} - username = '%s@test.com' % randstring(20) - self.put(urljoin(ACCOUNTS_URL, username), data=data, expected=201) - self.test_user_name = username - self.test_user_url = urljoin(ACCOUNTS_URL, username) - - def remove_tmp_user(self): - if self.test_user_name: - self.delete(self.test_user_url) + + username = USERNAME + password = PASSWORD + admin_username = ADMIN_USERNAME + admin_password = ADMIN_PASSWORD @classmethod def get(cls, *args, **kwargs): @@ -165,10 +102,134 @@ class ApiTestBase(unittest.TestCase): msg = 'Expected not empty, but it is' self.assertGreater(len(lst), 0, msg) + @contextmanager + def get_tmp_repo(self): + """ + Context manager to create a tmp repo, and automatically delete it after use + + with self.tmp_repo() as repo: + self.get(repo.file_url + '?p=/') + """ + repo = self.create_repo() + try: + yield repo + finally: + self.remove_repo(repo.repo_id) + + @contextmanager + def get_tmp_group(self): + """ + Context manager to create a tmp group, and automatically delete it after use + + with self.tmp_repo() as repo: + self.get(repo.file_url + '?p=/') + """ + group = self.create_group() + try: + yield group + finally: + self.remove_group(group.group_id) + + @contextmanager + def get_tmp_user(self): + """ + Context manager to create a tmp user, and automatically delete it after use + + with self.tmp_repo() as repo: + self.get(repo.file_url + '?p=/') + """ + user = self.create_user() + try: + yield user + finally: + self.remove_user(user.user_name) + + def create_repo(self): + repo_name = '测试-test-repo-%s' % randstring(6) + data = { + 'name': repo_name, + 'desc': 'just for test - 测试用资料库', + } + repo = self.post(REPOS_URL, data=data).json() + repo_id = repo['repo_id'] + return _Repo(repo_id) + + def remove_repo(self, repo_id): + repo_url = urljoin(REPOS_URL, repo_id) + self.delete(repo_url) + + def create_group(self): + group_name = '测试群组-%s' % randstring(16) + data = {'group_name': group_name} + group_id = self.put(GROUPS_URL, data=data).json()['group_id'] + return _Group(group_name, group_id) + + def remove_group(self, group_id): + group_url = urljoin(GROUPS_URL, str(group_id)) + self.delete(group_url) + + def create_user(self): + username = '%s@test.com' % randstring(20) + password = randstring(20) + data = {'password': password} + self.admin_put(urljoin(ACCOUNTS_URL, username), data=data, expected=201) + return _User(username, password) + + def remove_user(self, username): + user_url = urljoin(ACCOUNTS_URL, username) + self.admin_delete(user_url) + + def create_file(self, repo, fname=None): + fname = fname or ('文件 %s.txt' % randstring()) + furl = repo.get_filepath_url('/' + fname) + data = {'operation': 'create'} + res = self.post(furl, data=data, expected=201) + self.assertEqual(res.text, '"success"') + return fname, furl + + def create_dir(self, repo): + data = {'operation': 'mkdir'} + dpath = '/目录 %s' % randstring() + durl = repo.get_dirpath_url(dpath) + res = self.post(durl, data=data, expected=201) + self.assertEqual(res.text, u'"success"') + return dpath, durl + + def get_auth_token(username, password): - res = requests.post(TOKEN_URL, - data=dict(username=username, password=password)) + data = { + 'username': username, + 'password': password, + } + res = requests.post(TOKEN_URL, data=data) assert_equal(res.status_code, 200) token = res.json()['token'] assert_equal(len(token), 40) return token + +class _Repo(object): + def __init__(self, repo_id): + self.repo_id = repo_id + self.repo_url = urljoin(REPOS_URL, self.repo_id) + self.file_url = urljoin(self.repo_url, 'file') + self.dir_url = urljoin(self.repo_url, 'dir') + + def get_filepath_url(self, path): + query = '?p=%s' % quote(path) + return self.file_url + query + + def get_dirpath_url(self, path): + query = '?p=%s' % quote(path) + return self.dir_url + query + +class _Group(object): + def __init__(self, group_name, group_id): + self.group_name = group_name + self.group_id = group_id + self.group_url = urljoin(GROUPS_URL, str(self.group_id)) + +class _User(object): + def __init__(self, username, password): + self.user_name = username + self.password = password + self.user_url = urljoin(ACCOUNTS_URL, username) diff --git a/tests/api/test_accounts.py b/tests/api/test_accounts.py index 9901c7d387..9858fde9b5 100644 --- a/tests/api/test_accounts.py +++ b/tests/api/test_accounts.py @@ -2,7 +2,7 @@ import requests import unittest from tests.common.utils import apiurl, urljoin, randstring -from tests.api.apitestbase import USERNAME, ApiTestBase +from tests.api.apitestbase import ApiTestBase from tests.api.urls import ACCOUNTS_URL, ACCOUNT_INFO_URL, PING_URL, \ AUTH_PING_URL @@ -15,7 +15,7 @@ class AccountsApiTest(ApiTestBase): def test_check_account_info(self): info = self.get(ACCOUNT_INFO_URL).json() self.assertIsNotNone(info) - self.assertEqual(info['email'], USERNAME) + self.assertEqual(info['email'], self.username) self.assertIsNotNone(info['total']) self.assertIsNotNone(info['usage']) diff --git a/tests/api/test_avatar.py b/tests/api/test_avatar.py index 646c9ca0b7..4cda2996e5 100644 --- a/tests/api/test_avatar.py +++ b/tests/api/test_avatar.py @@ -1,12 +1,12 @@ import unittest -from tests.api.apitestbase import ApiTestBase, USERNAME +from tests.api.apitestbase import ApiTestBase from tests.api.urls import AVATAR_BASE_URL, GROUPS_URL from tests.common.utils import randstring, apiurl, urljoin class AvatarApiTest(ApiTestBase): def test_user_avatar(self): - avatar_url = urljoin(AVATAR_BASE_URL, 'user', USERNAME, '/resized/80/') + avatar_url = urljoin(AVATAR_BASE_URL, 'user', self.username, '/resized/80/') info = self.get(avatar_url).json() self.assertIsNotNone(info['url']) self.assertIsNotNone(info['is_default']) diff --git a/tests/api/test_files.py b/tests/api/test_files.py index 35b520972c..8979be63e5 100644 --- a/tests/api/test_files.py +++ b/tests/api/test_files.py @@ -12,129 +12,114 @@ from tests.api.urls import DEFAULT_REPO_URL, REPOS_URL from tests.api.apitestbase import ApiTestBase, USERNAME class FilesApiTest(ApiTestBase): - use_test_repo = True - use_test_user = True - - def create_file(self, fname=None): - data = {'operation': 'create'} - fname = fname or ('文件 %s.txt' % randstring()) - fpath = '/' + fname - query = urlencode(dict(p=fpath)) - furl = self.test_file_url + '?' + query - res = self.post(furl, data=data, expected=201) - self.assertEqual(res.text, '"success"') - return fname, furl - - def create_dir(self): - data = {'operation': 'mkdir'} - dpath = '/目录 %s' % randstring() - query = urlencode(dict(p=dpath)) - durl = self.test_dir_url + '?' + query - res = self.post(durl, data=data, expected=201) - self.assertEqual(res.text, u'"success"') - return dpath, durl - - def test_create_file(self): - self.create_file() - def test_rename_file(self): - name, furl = self.create_file() - data = { - 'operation': 'rename', - 'newname': name + randstring(), - } - res = self.post(furl, data=data) - self.assertRegexpMatches(res.text, r'"http(.*)"') + with self.get_tmp_repo() as repo: + name, furl = self.create_file(repo) + data = { + 'operation': 'rename', + 'newname': name + randstring(), + } + res = self.post(furl, data=data) + self.assertRegexpMatches(res.text, r'"http(.*)"') def test_remove_file(self): - _, furl = self.create_file() - res = self.delete(furl) - self.assertEqual(res.text, '"success"') + with self.get_tmp_repo() as repo: + _, furl = self.create_file(repo) + res = self.delete(furl) + self.assertEqual(res.text, '"success"') def test_move_file(self): - _, furl = self.create_file() - # TODO: create another repo here, and use it as dst_repo - data = { - 'operation': 'move', - 'dst_repo': self.test_repo_id, - 'dst_dir': '/' - } - res = self.post(furl, data=data) - self.assertEqual(res.text, '"success"') + with self.get_tmp_repo() as repo: + _, furl = self.create_file(repo) + # TODO: create another repo here, and use it as dst_repo + data = { + 'operation': 'move', + 'dst_repo': repo.repo_id, + 'dst_dir': '/', + } + res = self.post(furl, data=data) + self.assertEqual(res.text, '"success"') def test_copy_file(self): - fname, _ = self.create_file() - # TODO: create another repo here, and use it as dst_repo - dpath, _ = self.create_dir() - fopurl = self.test_repo_url + u'fileops/copy/?p=/' - data = { - 'file_names': fname, - 'dst_repo': self.test_repo_id, - 'dst_dir': dpath, - } - res = self.post(fopurl, data=data) - self.assertEqual(res.text, '"success"') + with self.get_tmp_repo() as repo: + fname, _ = self.create_file(repo) + # TODO: create another repo here, and use it as dst_repo + dpath, _ = self.create_dir(repo) + fopurl = urljoin(repo.repo_url, 'fileops/copy/') + '?p=/' + data = { + 'file_names': fname, + 'dst_repo': repo.repo_id, + 'dst_dir': dpath, + } + res = self.post(fopurl, data=data) + self.assertEqual(res.text, '"success"') def test_download_file(self): - fname, furl = self.create_file() - res = self.get(furl) - self.assertRegexpMatches(res.text, '"http(.*)/%s"' % quote(fname)) + with self.get_tmp_repo() as repo: + fname, furl = self.create_file(repo) + res = self.get(furl) + self.assertRegexpMatches(res.text, '"http(.*)/%s"' % quote(fname)) def test_download_file_from_history(self): - fname, _ = self.create_file() - file_history_url = self.test_file_url + u'history/?p=/%s' % quote(fname) - res = self.get(file_history_url).json() - commit_id = res['commits'][0]['id'] - self.assertEqual(len(commit_id), 40) - data = { - 'p': fname, - 'commit_id': commit_id, - } - query = '?' + urlencode(data) - res = self.get(self.test_file_url + query) - self.assertEqual(res.status_code, 200) - self.assertRegexpMatches(res.text, r'"http(.*)/%s"' % quote(fname)) + with self.get_tmp_repo() as repo: + fname, _ = self.create_file(repo) + file_history_url = urljoin(repo.repo_url, 'history/') + \ + '?p=/%s' % quote(fname) + res = self.get(file_history_url).json() + commit_id = res['commits'][0]['id'] + self.assertEqual(len(commit_id), 40) + data = { + 'p': fname, + 'commit_id': commit_id, + } + query = '?' + urlencode(data) + res = self.get(repo.file_url + query) + self.assertRegexpMatches(res.text, r'"http(.*)/%s"' % quote(fname)) def test_get_file_detail(self): - fname, _ = self.create_file() - fdurl = self.test_file_url + u'detail/?p=/%s' % quote(fname) - detail = self.get(fdurl).json() - self.assertIsNotNone(detail) - self.assertIsNotNone(detail['id']) - self.assertIsNotNone(detail['mtime']) - self.assertIsNotNone(detail['type']) - self.assertIsNotNone(detail['name']) - self.assertIsNotNone(detail['size']) + with self.get_tmp_repo() as repo: + fname, _ = self.create_file(repo) + fdurl = repo.file_url + u'detail/?p=/%s' % quote(fname) + detail = self.get(fdurl).json() + self.assertIsNotNone(detail) + self.assertIsNotNone(detail['id']) + self.assertIsNotNone(detail['mtime']) + self.assertIsNotNone(detail['type']) + self.assertIsNotNone(detail['name']) + self.assertIsNotNone(detail['size']) def test_get_file_history(self): - fname, _ = self.create_file() - fhurl = self.test_file_url + u'history/?p=%s' % quote(fname) - history = self.get(fhurl).json() - for commit in history['commits']: - self.assertIsNotNone(commit['rev_file_size']) - #self.assertIsNotNone(commit['rev_file_id']) #allow null - self.assertIsNotNone(commit['ctime']) - self.assertIsNotNone(commit['creator_name']) - self.assertIsNotNone(commit['creator']) - self.assertIsNotNone(commit['root_id']) - #self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null - #self.assertIsNotNone(commit['parent_id']) #allow null - self.assertIsNotNone(commit['new_merge']) - self.assertIsNotNone(commit['repo_id']) - self.assertIsNotNone(commit['desc']) - self.assertIsNotNone(commit['id']) - self.assertIsNotNone(commit['conflict']) - #self.assertIsNotNone(commit['second_parent_id']) #allow null + with self.get_tmp_repo() as repo: + fname, _ = self.create_file(repo) + fhurl = repo.file_url + u'history/?p=%s' % quote(fname) + history = self.get(fhurl).json() + for commit in history['commits']: + self.assertIsNotNone(commit['rev_file_size']) + #self.assertIsNotNone(commit['rev_file_id']) #allow null + self.assertIsNotNone(commit['ctime']) + self.assertIsNotNone(commit['creator_name']) + self.assertIsNotNone(commit['creator']) + self.assertIsNotNone(commit['root_id']) + #self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null + #self.assertIsNotNone(commit['parent_id']) #allow null + self.assertIsNotNone(commit['new_merge']) + self.assertIsNotNone(commit['repo_id']) + self.assertIsNotNone(commit['desc']) + self.assertIsNotNone(commit['id']) + self.assertIsNotNone(commit['conflict']) + #self.assertIsNotNone(commit['second_parent_id']) #allow null def test_get_upload_link(self): - upload_url = self.test_repo_url + u'upload-link/' - res = self.get(upload_url) - self.assertRegexpMatches(res.text, r'"http(.*)/upload-api/\w{8,8}"') + with self.get_tmp_repo() as repo: + upload_url = urljoin(repo.repo_url, 'upload-link') + res = self.get(upload_url) + self.assertRegexpMatches(res.text, r'"http(.*)/upload-api/\w{8,8}"') def test_get_update_link(self): - update_url = self.test_repo_url + u'update-link/' - res = self.get(update_url) - self.assertRegexpMatches(res.text, r'"http(.*)/update-api/\w{8,8}"') + with self.get_tmp_repo() as repo: + update_url = urljoin(repo.repo_url, 'update-link') + res = self.get(update_url) + self.assertRegexpMatches(res.text, r'"http(.*)/update-api/\w{8,8}"') # def test_upload_file(self): # # XXX: requests has problems when post a file whose name contains @@ -166,54 +151,57 @@ class FilesApiTest(ApiTestBase): # self.assertRegexpMatches(res.text, r'\w{40,40}') def test_get_upload_blocks_link(self): - upload_blks_url = self.test_repo_url + u'upload-blks-link/' - res = self.get(upload_blks_url) - self.assertRegexpMatches(res.text, r'"http(.*)/upload-blks-api/\w{8,8}"') + with self.get_tmp_repo() as repo: + upload_blks_url = urljoin(repo.repo_url, 'upload-blks-link') + res = self.get(upload_blks_url) + self.assertRegexpMatches(res.text, r'"http(.*)/upload-blks-api/\w{8,8}"') def test_get_update_blocks_link(self): - update_blks_url = self.test_repo_url + u'update-blks-link/' - res = self.get(update_blks_url) - self.assertRegexpMatches(res.text, r'"http(.*)/update-blks-api/\w{8,8}"') + with self.get_tmp_repo() as repo: + update_blks_url = urljoin(repo.repo_url, 'update-blks-link') + res = self.get(update_blks_url) + self.assertRegexpMatches(res.text, r'"http(.*)/update-blks-api/\w{8,8}"') def test_list_dir(self): - self.create_file() - self.create_dir() - dirents = self.get(self.test_dir_url).json() - self.assertHasLen(dirents, 2) - for dirent in dirents: - self.assertIsNotNone(dirent['id']) - self.assertIsNotNone(dirent['name']) - self.assertIn(dirent['type'], ('file', 'dir')) - if dirent['type'] == 'file': - self.assertIsNotNone(dirent['size']) - - def test_create_dir(self): - self.create_dir() + with self.get_tmp_repo() as repo: + self.create_file(repo) + self.create_dir(repo) + dirents = self.get(repo.dir_url).json() + self.assertHasLen(dirents, 2) + for dirent in dirents: + self.assertIsNotNone(dirent['id']) + self.assertIsNotNone(dirent['name']) + self.assertIn(dirent['type'], ('file', 'dir')) + if dirent['type'] == 'file': + self.assertIsNotNone(dirent['size']) def test_remove_dir(self): - _, durl = self.create_dir() - res = self.delete(durl) - self.assertEqual(res.text, u'"success"') - self.get(durl, expected=404) + with self.get_tmp_repo() as repo: + _, durl = self.create_dir(repo) + res = self.delete(durl) + self.assertEqual(res.text, u'"success"') + self.get(durl, expected=404) def test_download_dir(self): - dpath, _ = self.create_dir() - query = urlencode({'p': dpath}) - ddurl = self.test_dir_url + 'download/' + '?' + query - res = self.get(ddurl) - self.assertRegexpMatches(res.text, - r'"http(.*)/files/\w{8,8}/%s"' % quote(dpath[1:])) + with self.get_tmp_repo() as repo: + dpath, _ = self.create_dir(repo) + query = '?p=%s' % quote(dpath) + ddurl = urljoin(repo.dir_url, 'download') + query + res = self.get(ddurl) + self.assertRegexpMatches(res.text, + r'"http(.*)/files/\w{8,8}/%s"' % quote(dpath[1:])) def test_share_dir(self): - dpath, _ = self.create_dir() - query = urlencode({'p': dpath}) - share_dir_url = self.test_dir_url + u'share/' + '?' + query - # TODO: share to another user - data = { - 'emails': USERNAME, - 's_type': 'd', - 'path': '/', - 'perm': 'r' - } - res = self.post(share_dir_url, data=data) - self.assertEqual(res.text, u'{}') + with self.get_tmp_repo() as repo: + dpath, _ = self.create_dir(repo) + query = '?p=%s' % quote(dpath) + share_dir_url = urljoin(repo.dir_url, 'share/') + query + with self.get_tmp_user() as user: + data = { + 'emails': user.user_name, + 's_type': 'd', + 'path': '/', + 'perm': 'r' + } + res = self.post(share_dir_url, data=data) + self.assertEqual(res.text, u'{}') diff --git a/tests/api/test_groupmsgs.py b/tests/api/test_groupmsgs.py deleted file mode 100644 index 5b29b5d3fa..0000000000 --- a/tests/api/test_groupmsgs.py +++ /dev/null @@ -1,74 +0,0 @@ -from apitestbase import GROUPS_URL, GROUPMSGS_URL, GROUPMSGS_NREPLY_URL -from apitestbase import GROUP_URL, get_authed_instance -from common.utils import randomword -import unittest - -class GroupMsgsApiTestCase(unittest.TestCase): - - def setUp(self): - self.requests = get_authed_instance() - self.assertIsNotNone(self.requests) - self.gname = randomword(16) - data = { 'group_name': self.gname } - res = self.requests.put(GROUPS_URL, data=data) - self.gid = res.json()['group_id'] - self.gmurl = GROUPMSGS_URL + str(self.gid) + u'/' - self.gmurl_ = GROUP_URL + str(self.gid) + u'/msg/' - - def test_list_group_msgs_api(self): - res = self.requests.get(self.gmurl) - self.assertEqual(res.status_code, 200) - json = res.json() - self.assertIsNotNone(json) - self.assertIsNotNone(json['next_page']) - self.assertIsNotNone(json['msgs']) - - def test_post_group_msg_api(self): - #repo_id and path is not tested - data = { 'message': 'test message' } - res = self.requests.post(self.gmurl, data=data) - self.assertEqual(res.status_code, 200) - self.assertIsNotNone(res.json()['msgid']) - - def test_reply_group_msg_api(self): - data = { 'message': 'test message' } - res = self.requests.post(self.gmurl, data=data) - msgid = res.json()['msgid'] - res = self.requests.post(self.gmurl_ + str(msgid) + u'/', data=data) - self.assertEqual(res.status_code, 200) - self.assertIsNotNone(res.json()['msgid']) - - def test_get_group_msg_detail_api(self): - data = { 'message': 'test message' } - res = self.requests.post(self.gmurl, data=data) - msgid = res.json()['msgid'] - res = self.requests.get(self.gmurl_ + str(msgid) + u'/') - self.assertEqual(res.status_code, 200) - json = res.json() - self.assertIsNotNone(json) - self.assertIsNotNone(json['reply_cnt']) - self.assertIsNotNone(json['timestamp']) - self.assertIsNotNone(json['replies']) - self.assertIsNotNone(json['from_email']) - self.assertIsNotNone(json['msgid']) - self.assertIsNotNone(json['msg']) - self.assertIsNotNone(json['nickname']) - - def test_new_replies_group_msg_api(self): - data = { 'message': 'test message' } - res = self.requests.post(self.gmurl, data=data) - res = self.requests.get(GROUPMSGS_NREPLY_URL) - self.assertEqual(res.status_code, 200) - json = res.json() - self.assertIsNotNone(json) - for reply in json: - self.assertIsNotNone(reply['reply_cnt']) - self.assertIsNotNone(reply['timestamp']) - self.assertIsNotNone(reply['replies']) - self.assertIsNotNone(reply['from_email']) - self.assertIsNotNone(reply['msgid']) - self.assertIsNotNone(reply['msg']) - self.assertIsNotNone(reply['nickname']) - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/tests/api/test_groups.py b/tests/api/test_groups.py index 6d5edd1869..36d29c247c 100644 --- a/tests/api/test_groups.py +++ b/tests/api/test_groups.py @@ -10,30 +10,30 @@ from tests.api.urls import GROUPS_URL from tests.common.utils import apiurl, urljoin, randstring class GroupsApiTest(ApiTestBase): - use_test_user = True - use_test_group = True - def test_add_remove_group_member(self): - test_group_members_url = urljoin(self.test_group_url, '/members/') - data = {'user_name': self.test_user_name} - res = self.put(test_group_members_url, data=data).json() - self.assertTrue(res['success']) - res = self.delete(test_group_members_url, data=data).json() - self.assertTrue(res['success']) + with self.get_tmp_user() as user: + with self.get_tmp_group() as group: + test_group_members_url = urljoin(group.group_url, '/members/') + data = {'user_name': user.user_name} + res = self.put(test_group_members_url, data=data).json() + self.assertTrue(res['success']) + res = self.delete(test_group_members_url, data=data).json() + self.assertTrue(res['success']) def test_list_groups(self): - groups = self.get(GROUPS_URL).json() - self.assertGreaterEqual(groups['replynum'], 0) - self.assertNotEmpty(groups['groups']) - for group in groups['groups']: - self.assertIsNotNone(group['ctime']) - self.assertIsNotNone(group['creator']) - self.assertIsNotNone(group['msgnum']) - self.assertIsNotNone(group['mtime']) - self.assertIsNotNone(group['id']) - self.assertIsNotNone(group['name']) + with self.get_tmp_group() as group: + groups = self.get(GROUPS_URL).json() + self.assertGreaterEqual(groups['replynum'], 0) + self.assertNotEmpty(groups['groups']) + for group in groups['groups']: + self.assertIsNotNone(group['ctime']) + self.assertIsNotNone(group['creator']) + self.assertIsNotNone(group['msgnum']) + self.assertIsNotNone(group['mtime']) + self.assertIsNotNone(group['id']) + self.assertIsNotNone(group['name']) - def test_add_group(self): + def test_add_remove_group(self): data = {'group_name': randstring(16)} info = self.put(GROUPS_URL, data=data).json() self.assertTrue(info['success']) diff --git a/tests/api/test_misc.py b/tests/api/test_misc.py index 9480e90010..8416d38fa8 100644 --- a/tests/api/test_misc.py +++ b/tests/api/test_misc.py @@ -1,9 +1,9 @@ import unittest -from tests.common.utils import apiurl from tests.api.apitestbase import ApiTestBase +from tests.api.urls import LIST_GROUP_AND_CONTACTS_URL class MiscApiTest(ApiTestBase): - def test_list_group_and_contacts_api(self): + def test_list_group_and_contacts(self): res = self.get(LIST_GROUP_AND_CONTACTS_URL).json() self.assertIsNotNone(res) self.assertIsInstance(res['contacts'], list) diff --git a/tests/api/test_repos.py b/tests/api/test_repos.py index 849875b26a..2cb0791278 100644 --- a/tests/api/test_repos.py +++ b/tests/api/test_repos.py @@ -5,17 +5,12 @@ Test repos api. import unittest -from tests.api.apitestbase import ApiTestBase, USERNAME +from tests.api.apitestbase import ApiTestBase from tests.api.urls import REPOS_URL, DEFAULT_REPO_URL, VIRTUAL_REPOS_URL from tests.common.utils import apiurl, urljoin, randstring - +# TODO: all tests should be run on an encrypted repo class ReposApiTest(ApiTestBase): - use_test_repo = True - - def remove_repo(cls, repo_id): - cls.delete(urljoin(REPOS_URL, repo_id)) - def test_get_default_repo(self): repo = self.get(DEFAULT_REPO_URL).json() self.assertIsNotNone(repo['exists']) @@ -28,6 +23,7 @@ class ReposApiTest(ApiTestBase): def test_list_repos(self): repos = self.get(REPOS_URL).json() self.assertHasLen(repos, 1) + for repo in repos: self.assertIsNotNone(repo['permission']) self.assertIsNotNone(repo['encrypted']) @@ -42,91 +38,89 @@ class ReposApiTest(ApiTestBase): self.assertIsNotNone(repo['root']) def test_get_repo_info(self): - repo = self.get(test_repo_url).json() - self.assertFalse(repo['encrypted']) - self.assertIsNotNone(repo['mtime']) - self.assertIsNotNone(repo['owner']) - self.assertIsNotNone(repo['id']) - self.assertIsNotNone(repo['size']) - self.assertIsNotNone(repo['name']) - self.assertIsNotNone(repo['root']) - self.assertIsNotNone(repo['desc']) - self.assertIsNotNone(repo['type']) - # self.assertIsNotNone(repo['password_need']) #allow null here + with self.get_tmp_repo() as repo: + rinfo = self.get(repo.repo_url).json() + self.assertFalse(rinfo['encrypted']) + self.assertIsNotNone(rinfo['mtime']) + self.assertIsNotNone(rinfo['owner']) + self.assertIsNotNone(rinfo['id']) + self.assertIsNotNone(rinfo['size']) + self.assertIsNotNone(rinfo['name']) + self.assertIsNotNone(rinfo['root']) + self.assertIsNotNone(rinfo['desc']) + self.assertIsNotNone(rinfo['type']) + # elf.assertIsNotNone(rinfo['password_need']) # allow null here def test_get_repo_owner(self): - repo_owner_url = urljoin(self.test_repo_url, '/owner/') - info = self.get(repo_owner_url).json() - self.assertEqual(info['owner'], self.test_user_name) + with self.get_tmp_repo() as repo: + repo_owner_url = urljoin(repo.repo_url, '/owner/') + # XXX: why only admin can get the owner of a repo? + info = self.admin_get(repo_owner_url).json() + self.assertEqual(info['owner'], self.username) def test_get_repo_history(self): - repo_history_url = urljoin(REPOS_URL, self.test_repo_id, '/history/') - history = self.get(repo_history_url).json() - for commit in history['commits']: - self.assertIsNotNone(commit['rev_file_size']) - #self.assertIsNotNone(commit['rev_file_id']) #allow null - self.assertIsNotNone(commit['ctime']) - self.assertIsNotNone(commit['creator_name']) - self.assertIsNotNone(commit['creator']) - self.assertIsNotNone(commit['root_id']) - #self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null - #self.assertIsNotNone(commit['parent_id']) #allow null - self.assertIsNotNone(commit['new_merge']) - self.assertIsNotNone(commit['repo_id']) - self.assertIsNotNone(commit['desc']) - self.assertIsNotNone(commit['id']) - self.assertIsNotNone(commit['conflict']) - #self.assertIsNotNone(commit['second_parent_id']) #allow null + with self.get_tmp_repo() as repo: + self.create_file(repo) + self.create_dir(repo) + repo_history_url = urljoin(repo.repo_url, '/history/') + history = self.get(repo_history_url).json() + commits = history['commits'] + self.assertHasLen(commits, 3) + for commit in commits: + self.assertIsNotNone(commit['rev_file_size']) + #self.assertIsNotNone(commit['rev_file_id']) #allow null + self.assertIsNotNone(commit['ctime']) + self.assertIsNotNone(commit['creator_name']) + self.assertIsNotNone(commit['creator']) + self.assertIsNotNone(commit['root_id']) + #self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null + #self.assertIsNotNone(commit['parent_id']) #allow null + self.assertIsNotNone(commit['new_merge']) + self.assertIsNotNone(commit['repo_id']) + self.assertIsNotNone(commit['desc']) + self.assertIsNotNone(commit['id']) + self.assertIsNotNone(commit['conflict']) + #self.assertIsNotNone(commit['second_parent_id']) #allow null def test_create_repo(self): data = {'name': 'test'} res = self.post(REPOS_URL, data=data) repo = res.json() - self.assertIsNotNone(repo['encrypted']) - self.assertIsNotNone(repo['enc_version']) - self.assertIsNotNone(repo['repo_id']) - self.assertIsNotNone(repo['magic']) - self.assertIsNotNone(repo['relay_id']) - self.assertIsNotNone(repo['repo_version']) - self.assertIsNotNone(repo['relay_addr']) - self.assertIsNotNone(repo['token']) - self.assertIsNotNone(repo['relay_port']) - self.assertIsNotNone(repo['random_key']) - self.assertIsNotNone(repo['email']) - self.assertIsNotNone(repo['repo_name']) - repo_id = repo['repo_id'] - self.remove_repo(repo_id) - # Check the repo is really removed - self.get(urljoin(REPOS_URL, repo_id), expected=404) + try: + self.assertIsNotNone(repo['encrypted']) + self.assertIsNotNone(repo['enc_version']) + self.assertIsNotNone(repo['repo_id']) + self.assertIsNotNone(repo['magic']) + self.assertIsNotNone(repo['relay_id']) + self.assertIsNotNone(repo['repo_version']) + self.assertIsNotNone(repo['relay_addr']) + self.assertIsNotNone(repo['token']) + self.assertIsNotNone(repo['relay_port']) + self.assertIsNotNone(repo['random_key']) + self.assertIsNotNone(repo['email']) + self.assertIsNotNone(repo['repo_name']) + finally: + self.remove_repo(repo_id) + # Check the repo is really removed + self.get(urljoin(REPOS_URL, repo_id), expected=404) - # TODO: create a sub folder and use it as a sub repo - # def test_check_or_create_sub_repo(self): - # sub_repo_url = urljoin(REPOS_URL, self.test_repo_id, '/dir/sub_repo/') - # params = {'p': '/', 'name': 'sub_lib'} - # info = self.get(sub_repo_url, params=params).json() - # self.assertHasLen(info['sub_repo_id'], 36) - # self.remove_repo(info['sub_repo_id']) - - def test_encrpty_or_decrypy_repo(self): - # TODO: create a encrypted library + def test_check_or_create_sub_repo(self): + # TODO: create a sub folder and use it as a sub repo pass - # repo_url = urljoin(REPOS_URL, repo_id) - # data = {'password': 'test'} - # res = self.post(repo_url, data) - # self.assertEqual(res.text, '"success"') def test_fetch_repo_download_info(self): - download_info_repo_url = urljoin(REPOS_URL, self.test_repo_id, '/download-info/') - info = self.get(download_info_repo_url).json() - self.assertIsNotNone(info['relay_addr']) - self.assertIsNotNone(info['token']) - self.assertIsNotNone(info['repo_id']) - self.assertIsNotNone(info['relay_port']) - self.assertIsNotNone(info['encrypted']) - self.assertIsNotNone(info['repo_name']) - self.assertIsNotNone(info['relay_id']) - self.assertIsNotNone(info['email']) + with self.get_tmp_repo() as repo: + download_info_repo_url = urljoin(repo.repo_url, '/download-info/') + info = self.get(download_info_repo_url).json() + self.assertIsNotNone(info['relay_addr']) + self.assertIsNotNone(info['token']) + self.assertIsNotNone(info['repo_id']) + self.assertIsNotNone(info['relay_port']) + self.assertIsNotNone(info['encrypted']) + self.assertIsNotNone(info['repo_name']) + self.assertIsNotNone(info['relay_id']) + self.assertIsNotNone(info['email']) def test_list_virtual_repos(self): # TODO: we need to create at least on virtual repo first diff --git a/tests/api/test_shares.py b/tests/api/test_shares.py index e46a0f9458..267cfceb6f 100644 --- a/tests/api/test_shares.py +++ b/tests/api/test_shares.py @@ -1,232 +1,30 @@ #coding: UTF-8 -import unittest -from urllib import urlencode, quote -from tests.common.utils import apiurl, randstring, urljoin +from tests.common.utils import urljoin from tests.api.apitestbase import ApiTestBase from tests.api.urls import SHARED_LINKS_URL, SHARED_LIBRARIES_URL, \ BESHARED_LIBRARIES_URL, SHARED_FILES_URL, F_URL, S_F_URL class SharesApiTest(ApiTestBase): - use_test_group = True - use_test_repo = True - - def create_file(self, fname=None): - data = {'operation': 'create'} - fname = fname or ('文件 %s.txt' % randstring(10)) - fpath = '/' + fname - query = urlencode(dict(p=fpath)) - furl = self.test_file_url + '?' + query - res = self.post(furl, data=data, expected=201) - self.assertEqual(res.text, '"success"') - return fname, furl - def test_create_file_shared_link(self): - fname, _ = self.create_file() - fsurl = urljoin(self.test_file_url, 'shared-link') - data = { - 'type': 'f', - 'p': '/' + fname, - } - res = self.put(fsurl, data=data, expected=201) - self.assertRegexpMatches(res.headers['Location'], \ - r'http(.*)/f/(\w{10,10})/') + with self.get_tmp_repo() as repo: + fname, _ = self.create_file(repo) + fsurl = urljoin(repo.file_url, 'shared-link') + data = { + 'type': 'f', + 'p': '/' + fname, + } + res = self.put(fsurl, data=data, expected=201) + self.assertRegexpMatches(res.headers['Location'], \ + r'http(.*)/f/(\w{10,10})/') - res = self.get(SHARED_LINKS_URL).json() - self.assertNotEmpty(res) - for fileshare in res['fileshares']: - self.assertIsNotNone(fileshare['username']) - self.assertIsNotNone(fileshare['repo_id']) - #self.assertIsNotNone(fileshare['ctime']) - self.assertIsNotNone(fileshare['s_type']) - self.assertIsNotNone(fileshare['token']) - self.assertIsNotNone(fileshare['view_cnt']) - self.assertIsNotNone(fileshare['path']) - - - # def test_create_directory_shared_link(self): - # data = { 'operation': 'mkdir' } - # durl = self.test_dir_url + u'?p=/test_create_shared_link_d' - # self.post(durl, data=data) - # self.post(durl, data=data) - # fsurl = self.test_file_url + u'shared-link/' - # data = { 'type': 'd', 'p': '/test_create_shared_link_d' } - # res = self.put(fsurl, data=data) - # self.assertEqual(res.status_code, 201) - # self.assertRegexpMatches(res.headers['Location'], \ - # r'http(.*)/d/(\w{10,10})/') - - # def test_remove_shared_link(self): - # data = { 'operation': 'create' } - # furl = self.test_file_url + u'?p=/test_remove_shared_link_f' - # self.post(furl, data=data) - # fsurl = self.test_file_url + u'shared-link/' - # data = { 'type': 'f', 'p': '/test_remove_shared_link_f' } - # res = self.put(fsurl, data=data) - # import re - # t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2) - # fturl = SHARED_LINKS_URL + u'?t=' + t - # res = self.delete(fturl) - # self.assertEqual(res.status_code, 200) - # self.assertEqual(res.text, u'{}') - - # def test_get_shared_file_url(self): - # data = { 'operation': 'create' } - # furl = self.test_file_url + u'?p=/test_visit_shared_link_f' - # self.post(furl, data=data) - # fsurl = self.test_file_url + u'shared-link/' - # data = { 'type': 'f', 'p': '/test_visit_shared_link_f' } - # res = self.put(fsurl, data=data) - # import re - # t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2) - # fdurl = F_URL + t + u'/' - # res = self.get(fdurl) - # self.assertEqual(res.status_code, 200) - # self.assertRegexpMatches(res.text, r'"http(.*)/files/\w{8,8}/(.*)"') - - # def test_get_shared_file_detail(self): - # data = { 'operation': 'create' } - # furl = self.test_file_url + u'?p=/test_visitd_shared_link_f' - # self.post(furl, data=data) - # fsurl = self.test_file_url + u'shared-link/' - # data = { 'type': 'f', 'p': '/test_visitd_shared_link_f' } - # res = self.put(fsurl, data=data) - # import re - # t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2) - # fdurl = F_URL + t + u'/detail/' - # res = self.get(fdurl) - # self.assertEqual(res.status_code, 200) - # json = res.json() - # self.assertIsNotNone(json) - # self.assertIsNotNone(json['repo_id']) - # self.assertIsNotNone(json['name']) - # self.assertIsNotNone(json['size']) - # self.assertIsNotNone(json['path']) - # self.assertIsNotNone(json['type']) - # self.assertIsNotNone(json['mtime']) - # self.assertIsNotNone(json['id']) - - # def test_get_private_shared_file_url(self): - # if True: #todo: override this - # return - # data = { 'operation': 'create' } - # furl = self.test_file_url + u'?p=/test_visit_shared_link_sf' - # self.post(furl, data=data) - # fsurl = self.test_file_url + u'shared-link/' - # data = { 'type': 'f', 'p': '/test_visit_shared_link_sf' } - # res = self.put(fsurl, data=data) - # import re - # t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2) - # fdurl = S_F_URL + t + u'/' - # res = self.get(fdurl) - # self.assertEqual(res.status_code, 200) - # self.assertRegexpMatches(res.text, r'"http(.*)/files/\w{8,8}/(.*)"') - - # def test_get_private_shared_file_detail(self): - # if True: #todo: override this - # return - # data = { 'operation': 'create' } - # furl = self.test_file_url + u'?p=/test_visitd_shared_link_sf' - # self.post(furl, data=data) - # fsurl = self.test_file_url + u'shared-link/' - # data = { 'type': 'f', 'p': '/test_visitd_shared_link_sf' } - # res = self.put(fsurl, data=data) - # import re - # t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2) - # fdurl = S_F_URL + t + u'/detail/' - # res = self.get(fdurl) - # self.assertEqual(res.status_code, 200) - # json = res.json() - # self.assertIsNotNone(json) - # self.assertIsNotNone(json['repo_id']) - # self.assertIsNotNone(json['name']) - # self.assertIsNotNone(json['size']) - # self.assertIsNotNone(json['path']) - # self.assertIsNotNone(json['type']) - # self.assertIsNotNone(json['mtime']) - # self.assertIsNotNone(json['id']) - - # def test_remove_shared_file(self): - # if True: #todo: override this - # return - # data = { 'operation': 'create' } - # furl = self.test_file_url + u'?p=/test_remove_shared_file' - # self.post(furl, data=data) - # fsurl = self.test_file_url + u'shared-link/' - # data = { 'type': 'f', 'p': '/test_remove_shared_file' } - # res = self.put(fsurl, data=data) - # import re - # t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2) - # fturl = SHARED_FILES_URL + u'?t=' + t - # res = self.delete(fturl) - # self.assertEqual(res.status_code, 200) - # self.assertEqual(res.text, u'{}') - - # def test_list_shared_libraries(self): - # res = self.get(SHARED_LIBRARIES_URL) - # self.assertEqual(res.status_code, 200) - # json = res.json() - # self.assertIsNotNone(json) - # for repo in json: - # self.assertIsNotNone(repo['repo_id']) - # self.assertIsNotNone(repo['share_type']) - # self.assertIsNotNone(repo['permission']) - # self.assertIsNotNone(repo['encrypted']) - # self.assertIsNotNone(repo['user']) - # self.assertIsNotNone(repo['last_modified']) - # self.assertIsNotNone(repo['repo_desc']) - # self.assertIsNotNone(repo['group_id']) - # self.assertIsNotNone(repo['repo_name']) - - # def test_list_beshared_libraries(self): - # res = self.get(BESHARED_LIBRARIES_URL) - # self.assertEqual(res.status_code, 200) - # json = res.json() - # self.assertIsNotNone(json) - # for repo in json: - # self.assertIsNotNone(repo['user']) - # self.assertIsNotNone(repo['repo_id']) - # self.assertIsNotNone(repo['share_type']) - # self.assertIsNotNone(repo['permission']) - # self.assertIsNotNone(repo['encrypted']) - # self.assertIsNotNone(repo['last_modified']) - # self.assertIsNotNone(repo['repo_desc']) - # self.assertIsNotNone(repo['group_id']) - # self.assertIsNotNone(repo['repo_name']) - # self.assertIsNotNone(repo['is_virtual']) - - # def test_share_library(self): - # data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid , \ - # 'permission': 'rw' } - # slurl = SHARED_LIBRARIES_URL + str(self.test_repo_id) + u'/' - # res = self.put(slurl, params=data) - # self.assertEqual(res.status_code, 200) - # self.assertEqual(res.text, u'"success"') - - # def test_un_share_library(self): - # data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid , \ - # 'permission': 'rw' } - # slurl = SHARED_LIBRARIES_URL + str(self.test_repo_id) + u'/' - # data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid } - # self.put(slurl, params=data) - # res = self.delete(slurl, params=data) - # self.assertEqual(res.status_code, 200) - # self.assertEqual(res.text, u'"success"') - - # def test_list_shared_files(self): - # res = self.get(SHARED_FILES_URL) - # self.assertEqual(res.status_code, 200) - # json = res.json() - # self.assertIsNotNone(json) - # self.assertIsNotNone(json['priv_share_in']) - # self.assertIsNotNone(json['priv_share_out']) - - # for sfiles in zip(json['priv_share_in'], json['priv_share_out']): - # for sfile in sfiles: - # self.assertIsNotNone(sfile['s_type']) - # self.assertIsNotNone(sfile['repo_id']) - # self.assertIsNotNone(sfile['permission']) - # self.assertIsNotNone(sfile['to_user']) - # self.assertIsNotNone(sfile['token']) - # self.assertIsNotNone(sfile['from_user']) - # self.assertIsNotNone(sfile['path']) + res = self.get(SHARED_LINKS_URL).json() + self.assertNotEmpty(res) + for fileshare in res['fileshares']: + self.assertIsNotNone(fileshare['username']) + self.assertIsNotNone(fileshare['repo_id']) + #self.assertIsNotNone(fileshare['ctime']) + self.assertIsNotNone(fileshare['s_type']) + self.assertIsNotNone(fileshare['token']) + self.assertIsNotNone(fileshare['view_cnt']) + self.assertIsNotNone(fileshare['path']) diff --git a/tests/api/test_starredfiles.py b/tests/api/test_starredfiles.py deleted file mode 100644 index 0cf6335119..0000000000 --- a/tests/api/test_starredfiles.py +++ /dev/null @@ -1,41 +0,0 @@ -from apitestbase import STARREDFILES_URL, get_authed_instance -from apitestbase import DEFAULT_LIBRARY_URL, LIBRARIES_URL -import unittest - -class StarredFilesApiTestCase(unittest.TestCase): - - def setUp(self): - self.requests = get_authed_instance() - self.assertIsNotNone(self.requests) - res = self.requests.post(DEFAULT_LIBRARY_URL) - self.rid = res.json()['repo_id'] - self.rurl = LIBRARIES_URL + str(self.rid) + u'/' - self.furl = self.rurl + u'file/' - - def test_list_starred_files_api(self): - res = self.requests.get(STARREDFILES_URL) - self.assertEqual(res.status_code, 200) - json = res.json() - self.assertIsNotNone(json) - - def test_star_file_api(self): - data = { 'operation': 'create' } - furl = self.furl + '?p=/test.c' - self.requests.post(furl, data=data) - data = { 'repo_id': self.rid, 'p': '/test.c' } - res = self.requests.post(STARREDFILES_URL, data=data) - self.assertEqual(res.status_code, 201) - self.assertEqual(res.text, u'"success"') - - def test_un_star_file_api(self): - data = { 'operation': 'create' } - furl = self.furl + '?p=/test.c' - self.requests.post(furl, data=data) - data = { 'repo_id': self.rid, 'p': '/test.c' } - res = self.requests.post(STARREDFILES_URL, data=data) - res = self.requests.delete(STARREDFILES_URL, params=data) - self.assertEqual(res.status_code, 200) - self.assertEqual(res.text, u'"success"') - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/tests/api/test_usermsgs.py b/tests/api/test_usermsgs.py deleted file mode 100644 index b754b16be8..0000000000 --- a/tests/api/test_usermsgs.py +++ /dev/null @@ -1,38 +0,0 @@ -from apitestbase import USERNAME, get_authed_instance -from apitestbase import USERMSGS_URL, USERMSGS_COUNT_URL -import unittest - -class UserMsgsApiTestCase(unittest.TestCase): - - def setUp(self): - self.requests = get_authed_instance() - self.assertIsNotNone(self.requests) - - def test_list_user_msgs_api(self): - res = self.requests.get(USERMSGS_URL) - self.assertEqual(res.status_code, 200) - json = res.json() - self.assertIsNotNone(json) - self.assertEqual(json['to_email'], USERNAME) - self.assertIsNotNone(json['next_page']) - self.assertIsNotNone(json['msgs']) - - def test_reply_user_msg_api(self): - data = { 'id': '0', 'message': 'test' } - res = self.requests.post(USERMSGS_URL, data=data) - self.assertEqual(res.status_code, 200) - json = res.json() - self.assertIsNotNone(json) - self.assertIsNotNone(json['msgid']) - - def test_count_unseen_msgs_api(self): - data = { 'id': '0', 'message': 'test' } - self.requests.post(USERMSGS_URL, data=data) - res = self.requests.get(USERMSGS_COUNT_URL, data=data) - self.assertEqual(res.status_code, 200) - json = res.json() - self.assertIsNotNone(json) - self.assertGreater(json['count'], 0) - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/tests/common/common.py b/tests/common/common.py index 49fa23c5e6..a127b13be1 100644 --- a/tests/common/common.py +++ b/tests/common/common.py @@ -1,16 +1,12 @@ import os -BASE_URL = os.getenv('TEST_BASE_URL', u'http://127.0.0.1:8000') -USERNAME = os.getenv('TEST_USERNAME', u'test@seahubtest.com') -PASSWORD = os.getenv('TEST_PASSWORD', u'testtest') +BASE_URL = os.getenv('SEAHUB_TEST_BASE_URL', u'http://127.0.0.1:8000') +USERNAME = os.getenv('SEAHUB_TEST_USERNAME', u'test@seafiletest.com') +PASSWORD = os.getenv('SEAHUB_TEST_PASSWORD', u'testtest') +ADMIN_USERNAME = os.getenv('SEAHUB_TEST_ADMIN_USERNAME', u'admin@seafiletest.com') +ADMIN_PASSWORD = os.getenv('SEAHUB_TEST_ADMIN_PASSWORD', u'adminadmin') -ADMIN_USERNAME = os.getenv('TEST_ADMIN_USERNAME', u'admin@seahubtest.com') -ADMIN_PASSWORD = os.getenv('TEST_ADMIN_PASSWORD', u'adminadmin') - -if BASE_URL[-1] != '/': - BASE_URL += '/' - -if os.getenv('TEST_IS_PRO', u'') == u'': +if os.getenv('SEAHUB_TEST_IS_PRO', u'') == u'': IS_PRO = False else: S_PRO = True diff --git a/tests/common/utils.py b/tests/common/utils.py index d744e2d617..ab9e73e515 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -18,4 +18,4 @@ def urljoin(base, *args): return url def apiurl(*parts): - return urljoin(BASE_URL, *parts) \ No newline at end of file + return urljoin(BASE_URL, *parts) diff --git a/tests/seahubtests.sh b/tests/seahubtests.sh index 0157afe83d..dc6b553a18 100755 --- a/tests/seahubtests.sh +++ b/tests/seahubtests.sh @@ -1,9 +1,15 @@ #!/bin/bash : ${PYTHON=python} -export TEST_USERNAME="test@seahubtest.com" -export TEST_PASSWORD="testtest" -export TEST_ADMIN_USERNAME="admin@seahubtest.com" -export TEST_ADMIN_PASSWORD="adminadmin" + +: ${SEAHUB_TEST_USERNAME="test@seafiletest.com"} +: ${SEAHUB_TEST_PASSWORD="testtest"} +: ${SEAHUB_TEST_ADMIN_USERNAME="admin@seafiletest.com"} +: ${SEAHUB_TEST_ADMIN_PASSWORD="adminadmin"} + +export SEAHUB_TEST_USERNAME +export SEAHUB_TEST_PASSWORD +export SEAHUB_TEST_ADMIN_USERNAME +export SEAHUB_TEST_ADMIN_PASSWORD # If you run this script on your local machine, you must set CCNET_CONF_DIR # and SEAFILE_CONF_DIR like this: @@ -31,19 +37,20 @@ function init() { $PYTHON ./manage.py syncdb # create normal user - $PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${TEST_USERNAME}', '${TEST_PASSWORD}', 0, 1);" + $PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${SEAHUB_TEST_USERNAME}', '${SEAHUB_TEST_PASSWORD}', 0, 1);" # create admin - $PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${TEST_ADMIN_USERNAME}', '${TEST_ADMIN_PASSWORD}', 1, 1);" + $PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${SEAHUB_TEST_ADMIN_USERNAME}', '${SEAHUB_TEST_ADMIN_PASSWORD}', 1, 1);" } function start_seahub() { - $PYTHON ./manage.py runserver 1>/dev/null 2>&1 & + # $PYTHON ./manage.py runserver 1>/dev/null 2>&1 & + $PYTHON ./manage.py runserver 2>&1 & sleep 5 } function run_tests() { pushd tests - nosetests + nosetests "$nose_opts" popd } @@ -55,6 +62,8 @@ case $1 in start_seahub ;; "test") + shift + nose_opts=$@ run_tests ;; *) diff --git a/tests/ui/test_login.py b/tests/ui/test_login.py index 99466f8da1..95baa2c2b9 100644 --- a/tests/ui/test_login.py +++ b/tests/ui/test_login.py @@ -1,5 +1,5 @@ import unittest -from common.common import BASE_URL, USERNAME, PASSWORD +from tests.common.common import BASE_URL, USERNAME, PASSWORD from selenium import webdriver from selenium.webdriver.common.keys import Keys @@ -8,31 +8,31 @@ HOME_URL = BASE_URL + u'/home/my/' LOGOUT_URL = BASE_URL + u'/accounts/logout/' def get_logged_instance(): - browser = webdriver.PhantomJS() - browser.get(LOGIN_URL) - username_input = browser.find_element_by_name('username') - password_input = browser.find_element_by_name('password') - username_input.send_keys(USERNAME) - password_input.send_keys(PASSWORD) - password_input.send_keys(Keys.RETURN) - if (browser.current_url != HOME_URL): - browser.quit() - return None - return browser + browser = webdriver.PhantomJS() + browser.get(LOGIN_URL) + username_input = browser.find_element_by_name('username') + password_input = browser.find_element_by_name('password') + username_input.send_keys(USERNAME) + password_input.send_keys(PASSWORD) + password_input.send_keys(Keys.RETURN) + if browser.current_url != HOME_URL: + browser.quit() + return None + return browser class LoginTestCase(unittest.TestCase): - def setUp(self): - self.browser = get_logged_instance() - self.assertIsNotNone(self.browser) - self.addCleanup(self.browser.quit) + def setUp(self): + self.browser = get_logged_instance() + self.assertIsNotNone(self.browser) + self.addCleanup(self.browser.quit) - def test_login(self): - self.assertRegexpMatches(self.browser.current_url, HOME_URL) + def test_login(self): + self.assertRegexpMatches(self.browser.current_url, HOME_URL) - def test_logout(self): - myinfo_bar = self.browser.find_element_by_css_selector('#my-info') - logout_input = self.browser.find_element_by_css_selector('a#logout') - myinfo_bar.click() - logout_input.click() - self.assertRegexpMatches(self.browser.current_url, LOGOUT_URL) + def test_logout(self): + myinfo_bar = self.browser.find_element_by_css_selector('#my-info') + logout_input = self.browser.find_element_by_css_selector('a#logout') + myinfo_bar.click() + logout_input.click() + self.assertRegexpMatches(self.browser.current_url, LOGOUT_URL)