1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-24 21:07:17 +00:00

Merge pull request #284 from haiwen/ci

Added seahub api functional tests
This commit is contained in:
lins05
2014-09-22 16:33:11 +08:00
23 changed files with 1004 additions and 1 deletions

4
.gitignore vendored
View File

@@ -29,4 +29,6 @@ notification_email.sh
send_user_notifications.sh
shutdown.sh
cscope*
docs/_build/*
docs/_build/*
deps
ghostdriver.log

16
.travis.yml Normal file
View File

@@ -0,0 +1,16 @@
language: python
before_install:
# build/init/start ccnet-server/seafile-server
- git clone --depth=1 --branch=master git://github.com/haiwen/seafile-test-deploy /tmp/seafile-test-deploy
- cd /tmp/seafile-test-deploy && ./bootstrap.sh && cd -
# install phantomjs
- ./tests/install-deps.sh
script:
- ./tests/seahubtests.sh init && ./tests/seahubtests.sh runserver && ./tests/seahubtests.sh test
env:
- CCNET_CONF_DIR=/tmp/ccnet SEAFILE_CONF_DIR=/tmp/seafile-data
install:
- pip install -r requirements.txt --allow-all-external --allow-unverified Djblets --allow-unverified PIL
- pip install -r test-requirements.txt
notifications:
email: false

View File

@@ -1,3 +1,5 @@
[![Build Status](https://secure.travis-ci.org/haiwen/seahub.svg?branch=master)](http://travis-ci.org/haiwen/seahub)
Introduction
==========

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
python-dateutil
chardet
six
Image
Django==1.5.8
Djblets==0.6.14

3
test-requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
selenium==2.42.1
requests==2.3.0
nose

0
tests/__init__.py Normal file
View File

0
tests/api/__init__.py Normal file
View File

235
tests/api/apitestbase.py Normal file
View File

@@ -0,0 +1,235 @@
#coding: UTF-8
import requests
import unittest
from contextlib import contextmanager
from nose.tools import assert_equal, assert_in # pylint: disable=E0611
from urllib import quote
from tests.common.common import USERNAME, PASSWORD, \
ADMIN_USERNAME, ADMIN_PASSWORD
from tests.common.utils import apiurl, urljoin, randstring
from tests.api.urls import TOKEN_URL, GROUPS_URL, ACCOUNTS_URL, REPOS_URL
class ApiTestBase(unittest.TestCase):
_token = None
_admin_token = None
username = USERNAME
password = PASSWORD
admin_username = ADMIN_USERNAME
admin_password = ADMIN_PASSWORD
@classmethod
def get(cls, *args, **kwargs):
return cls._req('GET', *args, **kwargs)
@classmethod
def post(cls, *args, **kwargs):
return cls._req('POST', *args, **kwargs)
@classmethod
def put(cls, *args, **kwargs):
return cls._req('PUT', *args, **kwargs)
@classmethod
def delete(cls, *args, **kwargs):
return cls._req('DELETE', *args, **kwargs)
@classmethod
def admin_get(cls, *args, **kwargs):
kwargs['admin'] = True
return cls.get(*args, **kwargs)
@classmethod
def admin_post(cls, *args, **kwargs):
kwargs['admin'] = True
return cls.post(*args, **kwargs)
@classmethod
def admin_put(cls, *args, **kwargs):
kwargs['admin'] = True
return cls.put(*args, **kwargs)
@classmethod
def admin_delete(cls, *args, **kwargs):
kwargs['admin'] = True
return cls.delete(*args, **kwargs)
@classmethod
def _req(cls, method, *args, **kwargs):
admin = kwargs.pop('admin', False)
if admin:
if cls._admin_token is None:
cls._admin_token = get_auth_token(ADMIN_USERNAME,
ADMIN_PASSWORD)
token = cls._admin_token
else:
if cls._token is None:
cls._token = get_auth_token(USERNAME, PASSWORD)
token = cls._token
headers = kwargs.get('headers', {})
headers.setdefault('Authorization', 'Token ' + token)
kwargs['headers'] = headers
expected = kwargs.pop('expected', 200)
resp = requests.request(method, *args, **kwargs)
if expected is not None:
if hasattr(expected, '__iter__'):
assert_in(resp.status_code, expected,
"Expected http status in %s, received %s" % (expected,
resp.status_code))
else:
assert_equal(resp.status_code, expected,
"Expected http status %s, received %s" % (expected,
resp.status_code))
return resp
def assertHasLen(self, lst, length):
"""
Assert a list/tuple/string has exact `length`
"""
msg = 'Expected to have length %s, but length is %s' \
% (length, len(lst))
self.assertEqual(len(lst), length, msg)
def assertNotEmpty(self, lst):
"""
Assert a list/tuple/string is not empty
"""
msg = 'Expected not empty, but it is'
self.assertGreater(len(lst), 0, msg)
@contextmanager
def get_tmp_repo(self):
"""
Context manager to create a tmp repo, and automatically delete it after use
with self.tmp_repo() as repo:
self.get(repo.file_url + '?p=/')
"""
repo = self.create_repo()
try:
yield repo
finally:
self.remove_repo(repo.repo_id)
@contextmanager
def get_tmp_group(self):
"""
Context manager to create a tmp group, and automatically delete it after use
with self.tmp_repo() as repo:
self.get(repo.file_url + '?p=/')
"""
group = self.create_group()
try:
yield group
finally:
self.remove_group(group.group_id)
@contextmanager
def get_tmp_user(self):
"""
Context manager to create a tmp user, and automatically delete it after use
with self.tmp_repo() as repo:
self.get(repo.file_url + '?p=/')
"""
user = self.create_user()
try:
yield user
finally:
self.remove_user(user.user_name)
def create_repo(self):
repo_name = '测试-test-repo-%s' % randstring(6)
data = {
'name': repo_name,
'desc': 'just for test - 测试用资料库',
}
repo = self.post(REPOS_URL, data=data).json()
repo_id = repo['repo_id']
return _Repo(repo_id)
def remove_repo(self, repo_id):
repo_url = urljoin(REPOS_URL, repo_id)
self.delete(repo_url)
def create_group(self):
group_name = '测试群组-%s' % randstring(16)
data = {'group_name': group_name}
group_id = self.put(GROUPS_URL, data=data).json()['group_id']
return _Group(group_name, group_id)
def remove_group(self, group_id):
group_url = urljoin(GROUPS_URL, str(group_id))
self.delete(group_url)
def create_user(self):
username = '%s@test.com' % randstring(20)
password = randstring(20)
data = {'password': password}
self.admin_put(urljoin(ACCOUNTS_URL, username), data=data, expected=201)
return _User(username, password)
def remove_user(self, username):
user_url = urljoin(ACCOUNTS_URL, username)
self.admin_delete(user_url)
def create_file(self, repo, fname=None):
fname = fname or ('文件 %s.txt' % randstring())
furl = repo.get_filepath_url('/' + fname)
data = {'operation': 'create'}
res = self.post(furl, data=data, expected=201)
self.assertEqual(res.text, '"success"')
return fname, furl
def create_dir(self, repo):
data = {'operation': 'mkdir'}
dpath = '/目录 %s' % randstring()
durl = repo.get_dirpath_url(dpath)
res = self.post(durl, data=data, expected=201)
self.assertEqual(res.text, u'"success"')
return dpath, durl
def get_auth_token(username, password):
data = {
'username': username,
'password': password,
}
res = requests.post(TOKEN_URL, data=data)
assert_equal(res.status_code, 200)
token = res.json()['token']
assert_equal(len(token), 40)
return token
class _Repo(object):
def __init__(self, repo_id):
self.repo_id = repo_id
self.repo_url = urljoin(REPOS_URL, self.repo_id)
self.file_url = urljoin(self.repo_url, 'file')
self.dir_url = urljoin(self.repo_url, 'dir')
def get_filepath_url(self, path):
query = '?p=%s' % quote(path)
return self.file_url + query
def get_dirpath_url(self, path):
query = '?p=%s' % quote(path)
return self.dir_url + query
class _Group(object):
def __init__(self, group_name, group_id):
self.group_name = group_name
self.group_id = group_id
self.group_url = urljoin(GROUPS_URL, str(self.group_id))
class _User(object):
def __init__(self, username, password):
self.user_name = username
self.password = password
self.user_url = urljoin(ACCOUNTS_URL, username)

View File

@@ -0,0 +1,53 @@
import requests
import unittest
from tests.common.utils import apiurl, urljoin, randstring
from tests.api.apitestbase import ApiTestBase
from tests.api.urls import ACCOUNTS_URL, ACCOUNT_INFO_URL, PING_URL, \
AUTH_PING_URL
test_account_username = 'test_%s@test.com' % randstring(10)
test_account_password = randstring(20)
test_account_password2 = randstring(20)
test_account_url = urljoin(ACCOUNTS_URL, test_account_username)
class AccountsApiTest(ApiTestBase):
def test_check_account_info(self):
info = self.get(ACCOUNT_INFO_URL).json()
self.assertIsNotNone(info)
self.assertEqual(info['email'], self.username)
self.assertIsNotNone(info['total'])
self.assertIsNotNone(info['usage'])
def test_list_accounts(self):
# Normal user can not list accounts
self.get(ACCOUNTS_URL, expected=403)
accounts = self.admin_get(ACCOUNTS_URL).json()
self.assertGreaterEqual(accounts, 2)
# TODO: check returned json, test start/limit param
def test_create_delete_account(self):
data = {'password': test_account_password}
# non-admin user can not create new user
self.put(test_account_url, data=data, expected=403)
res = self.admin_put(test_account_url, data=data, expected=201)
self.assertEqual(res.text, u'"success"')
# non-admin user can not delete a user
self.delete(test_account_url, expected=403)
self.admin_delete(test_account_url)
# check the user is really deleted
self.admin_get(test_account_url, expected=404)
def test_auth_ping(self):
res = self.get(AUTH_PING_URL)
self.assertRegexpMatches(res.text, u'"pong"')
res = requests.get(AUTH_PING_URL)
self.assertEqual(res.status_code, 403)
def test_ping(self):
res = requests.get(PING_URL)
self.assertRegexpMatches(res.text, u'"pong"')
self.assertEqual(res.status_code, 200)

25
tests/api/test_avatar.py Normal file
View File

@@ -0,0 +1,25 @@
import unittest
from tests.api.apitestbase import ApiTestBase
from tests.api.urls import AVATAR_BASE_URL, GROUPS_URL
from tests.common.utils import randstring, apiurl, urljoin
class AvatarApiTest(ApiTestBase):
def test_user_avatar(self):
avatar_url = urljoin(AVATAR_BASE_URL, 'user', self.username, '/resized/80/')
info = self.get(avatar_url).json()
self.assertIsNotNone(info['url'])
self.assertIsNotNone(info['is_default'])
self.assertIsNotNone(info['mtime'])
def test_group_avatar(self):
gname = randstring(16)
data = {'group_name': gname}
res = self.put(GROUPS_URL, data=data)
gid = res.json()['group_id']
avatar_url = urljoin(AVATAR_BASE_URL, 'group', str(gid), '/resized/80/')
info = self.get(avatar_url).json()
self.assertIsNotNone(info)
self.assertIsNotNone(info['url'])
self.assertIsNotNone(info['is_default'])
self.assertIsNotNone(info['mtime'])

207
tests/api/test_files.py Normal file
View File

@@ -0,0 +1,207 @@
#coding: UTF-8
"""
Test file/dir operations.
"""
import random
import re
from urllib import urlencode, quote
from tests.common.utils import randstring, urljoin
from tests.api.urls import DEFAULT_REPO_URL, REPOS_URL
from tests.api.apitestbase import ApiTestBase, USERNAME
class FilesApiTest(ApiTestBase):
def test_rename_file(self):
with self.get_tmp_repo() as repo:
name, furl = self.create_file(repo)
data = {
'operation': 'rename',
'newname': name + randstring(),
}
res = self.post(furl, data=data)
self.assertRegexpMatches(res.text, r'"http(.*)"')
def test_remove_file(self):
with self.get_tmp_repo() as repo:
_, furl = self.create_file(repo)
res = self.delete(furl)
self.assertEqual(res.text, '"success"')
def test_move_file(self):
with self.get_tmp_repo() as repo:
_, furl = self.create_file(repo)
# TODO: create another repo here, and use it as dst_repo
data = {
'operation': 'move',
'dst_repo': repo.repo_id,
'dst_dir': '/',
}
res = self.post(furl, data=data)
self.assertEqual(res.text, '"success"')
def test_copy_file(self):
with self.get_tmp_repo() as repo:
fname, _ = self.create_file(repo)
# TODO: create another repo here, and use it as dst_repo
dpath, _ = self.create_dir(repo)
fopurl = urljoin(repo.repo_url, 'fileops/copy/') + '?p=/'
data = {
'file_names': fname,
'dst_repo': repo.repo_id,
'dst_dir': dpath,
}
res = self.post(fopurl, data=data)
self.assertEqual(res.text, '"success"')
def test_download_file(self):
with self.get_tmp_repo() as repo:
fname, furl = self.create_file(repo)
res = self.get(furl)
self.assertRegexpMatches(res.text, '"http(.*)/%s"' % quote(fname))
def test_download_file_from_history(self):
with self.get_tmp_repo() as repo:
fname, _ = self.create_file(repo)
file_history_url = urljoin(repo.repo_url, 'history/') + \
'?p=/%s' % quote(fname)
res = self.get(file_history_url).json()
commit_id = res['commits'][0]['id']
self.assertEqual(len(commit_id), 40)
data = {
'p': fname,
'commit_id': commit_id,
}
query = '?' + urlencode(data)
res = self.get(repo.file_url + query)
self.assertRegexpMatches(res.text, r'"http(.*)/%s"' % quote(fname))
def test_get_file_detail(self):
with self.get_tmp_repo() as repo:
fname, _ = self.create_file(repo)
fdurl = repo.file_url + u'detail/?p=/%s' % quote(fname)
detail = self.get(fdurl).json()
self.assertIsNotNone(detail)
self.assertIsNotNone(detail['id'])
self.assertIsNotNone(detail['mtime'])
self.assertIsNotNone(detail['type'])
self.assertIsNotNone(detail['name'])
self.assertIsNotNone(detail['size'])
def test_get_file_history(self):
with self.get_tmp_repo() as repo:
fname, _ = self.create_file(repo)
fhurl = repo.file_url + u'history/?p=%s' % quote(fname)
history = self.get(fhurl).json()
for commit in history['commits']:
self.assertIsNotNone(commit['rev_file_size'])
#self.assertIsNotNone(commit['rev_file_id']) #allow null
self.assertIsNotNone(commit['ctime'])
self.assertIsNotNone(commit['creator_name'])
self.assertIsNotNone(commit['creator'])
self.assertIsNotNone(commit['root_id'])
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
#self.assertIsNotNone(commit['parent_id']) #allow null
self.assertIsNotNone(commit['new_merge'])
self.assertIsNotNone(commit['repo_id'])
self.assertIsNotNone(commit['desc'])
self.assertIsNotNone(commit['id'])
self.assertIsNotNone(commit['conflict'])
#self.assertIsNotNone(commit['second_parent_id']) #allow null
def test_get_upload_link(self):
with self.get_tmp_repo() as repo:
upload_url = urljoin(repo.repo_url, 'upload-link')
res = self.get(upload_url)
self.assertRegexpMatches(res.text, r'"http(.*)/upload-api/\w{8,8}"')
def test_get_update_link(self):
with self.get_tmp_repo() as repo:
update_url = urljoin(repo.repo_url, 'update-link')
res = self.get(update_url)
self.assertRegexpMatches(res.text, r'"http(.*)/update-api/\w{8,8}"')
# def test_upload_file(self):
# # XXX: requests has problems when post a file whose name contains
# # non-ascii data
# fname = 'file-upload-test %s.txt' % randstring()
# furl = self.test_file_url + '?p=/%s' % quote(fname)
# self.delete(furl)
# upload_url = self.test_repo_url + u'upload-link/'
# res = self.get(upload_url)
# upload_api_url = re.match(r'"(.*)"', res.text).group(1)
# files = {
# 'file': (fname, 'Some lines in this file'),
# 'parent_dir': '/',
# }
# res = self.post(upload_api_url, files=files)
# self.assertRegexpMatches(res.text, r'\w{40,40}')
# def test_update_file(self):
# fname = 'file-update-test %s.txt' % randstring()
# _, furl = self.create_file(fname=fname)
# update_url = self.test_repo_url + u'update-link/'
# res = self.get(update_url)
# update_api_url = re.match(r'"(.*)"', res.text).group(1)
# files = {
# 'file': ('filename', 'Updated content of this file'),
# 'target_file': '/test_update.c'
# }
# res = self.post(update_api_url, files=files)
# self.assertRegexpMatches(res.text, r'\w{40,40}')
def test_get_upload_blocks_link(self):
with self.get_tmp_repo() as repo:
upload_blks_url = urljoin(repo.repo_url, 'upload-blks-link')
res = self.get(upload_blks_url)
self.assertRegexpMatches(res.text, r'"http(.*)/upload-blks-api/\w{8,8}"')
def test_get_update_blocks_link(self):
with self.get_tmp_repo() as repo:
update_blks_url = urljoin(repo.repo_url, 'update-blks-link')
res = self.get(update_blks_url)
self.assertRegexpMatches(res.text, r'"http(.*)/update-blks-api/\w{8,8}"')
def test_list_dir(self):
with self.get_tmp_repo() as repo:
self.create_file(repo)
self.create_dir(repo)
dirents = self.get(repo.dir_url).json()
self.assertHasLen(dirents, 2)
for dirent in dirents:
self.assertIsNotNone(dirent['id'])
self.assertIsNotNone(dirent['name'])
self.assertIn(dirent['type'], ('file', 'dir'))
if dirent['type'] == 'file':
self.assertIsNotNone(dirent['size'])
def test_remove_dir(self):
with self.get_tmp_repo() as repo:
_, durl = self.create_dir(repo)
res = self.delete(durl)
self.assertEqual(res.text, u'"success"')
self.get(durl, expected=404)
def test_download_dir(self):
with self.get_tmp_repo() as repo:
dpath, _ = self.create_dir(repo)
query = '?p=%s' % quote(dpath)
ddurl = urljoin(repo.dir_url, 'download') + query
res = self.get(ddurl)
self.assertRegexpMatches(res.text,
r'"http(.*)/files/\w{8,8}/%s"' % quote(dpath[1:]))
def test_share_dir(self):
with self.get_tmp_repo() as repo:
dpath, _ = self.create_dir(repo)
query = '?p=%s' % quote(dpath)
share_dir_url = urljoin(repo.dir_url, 'share/') + query
with self.get_tmp_user() as user:
data = {
'emails': user.user_name,
's_type': 'd',
'path': '/',
'perm': 'r'
}
res = self.post(share_dir_url, data=data)
self.assertEqual(res.text, u'{}')

48
tests/api/test_groups.py Normal file
View File

@@ -0,0 +1,48 @@
#coding: UTF-8
"""
Test groups api.
"""
import unittest
from tests.api.apitestbase import ApiTestBase
from tests.api.urls import GROUPS_URL
from tests.common.utils import apiurl, urljoin, randstring
class GroupsApiTest(ApiTestBase):
def test_add_remove_group_member(self):
with self.get_tmp_user() as user:
with self.get_tmp_group() as group:
test_group_members_url = urljoin(group.group_url, '/members/')
data = {'user_name': user.user_name}
res = self.put(test_group_members_url, data=data).json()
self.assertTrue(res['success'])
res = self.delete(test_group_members_url, data=data).json()
self.assertTrue(res['success'])
def test_list_groups(self):
with self.get_tmp_group() as group:
groups = self.get(GROUPS_URL).json()
self.assertGreaterEqual(groups['replynum'], 0)
self.assertNotEmpty(groups['groups'])
for group in groups['groups']:
self.assertIsNotNone(group['ctime'])
self.assertIsNotNone(group['creator'])
self.assertIsNotNone(group['msgnum'])
self.assertIsNotNone(group['mtime'])
self.assertIsNotNone(group['id'])
self.assertIsNotNone(group['name'])
def test_add_remove_group(self):
data = {'group_name': randstring(16)}
info = self.put(GROUPS_URL, data=data).json()
self.assertTrue(info['success'])
group_id = info['group_id']
self.assertGreater(group_id, 0)
url = urljoin(GROUPS_URL, str(group_id))
self.delete(url)
# check group is really removed
groups = self.get(GROUPS_URL).json()['groups']
for group in groups:
self.assertNotEqual(group['id'], group_id)

14
tests/api/test_misc.py Normal file
View File

@@ -0,0 +1,14 @@
import unittest
from tests.api.apitestbase import ApiTestBase
from tests.api.urls import LIST_GROUP_AND_CONTACTS_URL
class MiscApiTest(ApiTestBase):
def test_list_group_and_contacts(self):
res = self.get(LIST_GROUP_AND_CONTACTS_URL).json()
self.assertIsNotNone(res)
self.assertIsInstance(res['contacts'], list)
self.assertIsNotNone(res['umsgnum'])
self.assertIsNotNone(res['replynum'])
self.assertIsInstance(res['groups'], list)
self.assertIsNotNone(res['gmsgnum'])
self.assertIsNotNone(res['newreplies'])

158
tests/api/test_repos.py Normal file
View File

@@ -0,0 +1,158 @@
#coding: UTF-8
"""
Test repos api.
"""
import unittest
from tests.api.apitestbase import ApiTestBase
from tests.api.urls import REPOS_URL, DEFAULT_REPO_URL, VIRTUAL_REPOS_URL
from tests.common.utils import apiurl, urljoin, randstring
# TODO: all tests should be run on an encrypted repo
class ReposApiTest(ApiTestBase):
def test_get_default_repo(self):
repo = self.get(DEFAULT_REPO_URL).json()
self.assertIsNotNone(repo['exists'])
def test_create_default_repo(self):
repo = self.post(DEFAULT_REPO_URL).json()
self.assertEqual(len(repo['repo_id']), 36)
self.assertEqual(repo['exists'], True)
def test_list_repos(self):
repos = self.get(REPOS_URL).json()
self.assertHasLen(repos, 1)
for repo in repos:
self.assertIsNotNone(repo['permission'])
self.assertIsNotNone(repo['encrypted'])
self.assertIsNotNone(repo['mtime'])
self.assertIsNotNone(repo['owner'])
self.assertIsNotNone(repo['id'])
self.assertIsNotNone(repo['size'])
self.assertIsNotNone(repo['name'])
self.assertIsNotNone(repo['type'])
# self.assertIsNotNone(repo['virtual']) #allow null for pub-repo
self.assertIsNotNone(repo['desc'])
self.assertIsNotNone(repo['root'])
def test_get_repo_info(self):
with self.get_tmp_repo() as repo:
rinfo = self.get(repo.repo_url).json()
self.assertFalse(rinfo['encrypted'])
self.assertIsNotNone(rinfo['mtime'])
self.assertIsNotNone(rinfo['owner'])
self.assertIsNotNone(rinfo['id'])
self.assertIsNotNone(rinfo['size'])
self.assertIsNotNone(rinfo['name'])
self.assertIsNotNone(rinfo['root'])
self.assertIsNotNone(rinfo['desc'])
self.assertIsNotNone(rinfo['type'])
# elf.assertIsNotNone(rinfo['password_need']) # allow null here
def test_get_repo_owner(self):
with self.get_tmp_repo() as repo:
repo_owner_url = urljoin(repo.repo_url, '/owner/')
# XXX: why only admin can get the owner of a repo?
info = self.admin_get(repo_owner_url).json()
self.assertEqual(info['owner'], self.username)
def test_get_repo_history(self):
with self.get_tmp_repo() as repo:
self.create_file(repo)
self.create_dir(repo)
repo_history_url = urljoin(repo.repo_url, '/history/')
history = self.get(repo_history_url).json()
commits = history['commits']
self.assertHasLen(commits, 3)
for commit in commits:
self.assertIsNotNone(commit['rev_file_size'])
#self.assertIsNotNone(commit['rev_file_id']) #allow null
self.assertIsNotNone(commit['ctime'])
self.assertIsNotNone(commit['creator_name'])
self.assertIsNotNone(commit['creator'])
self.assertIsNotNone(commit['root_id'])
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
#self.assertIsNotNone(commit['parent_id']) #allow null
self.assertIsNotNone(commit['new_merge'])
self.assertIsNotNone(commit['repo_id'])
self.assertIsNotNone(commit['desc'])
self.assertIsNotNone(commit['id'])
self.assertIsNotNone(commit['conflict'])
#self.assertIsNotNone(commit['second_parent_id']) #allow null
def test_create_repo(self):
data = {'name': 'test'}
res = self.post(REPOS_URL, data=data)
repo = res.json()
repo_id = repo['repo_id']
try:
self.assertIsNotNone(repo['encrypted'])
self.assertIsNotNone(repo['enc_version'])
self.assertIsNotNone(repo['repo_id'])
self.assertIsNotNone(repo['magic'])
self.assertIsNotNone(repo['relay_id'])
self.assertIsNotNone(repo['repo_version'])
self.assertIsNotNone(repo['relay_addr'])
self.assertIsNotNone(repo['token'])
self.assertIsNotNone(repo['relay_port'])
self.assertIsNotNone(repo['random_key'])
self.assertIsNotNone(repo['email'])
self.assertIsNotNone(repo['repo_name'])
finally:
self.remove_repo(repo_id)
# Check the repo is really removed
self.get(urljoin(REPOS_URL, repo_id), expected=404)
def test_check_or_create_sub_repo(self):
# TODO: create a sub folder and use it as a sub repo
pass
def test_fetch_repo_download_info(self):
with self.get_tmp_repo() as repo:
download_info_repo_url = urljoin(repo.repo_url, '/download-info/')
info = self.get(download_info_repo_url).json()
self.assertIsNotNone(info['relay_addr'])
self.assertIsNotNone(info['token'])
self.assertIsNotNone(info['repo_id'])
self.assertIsNotNone(info['relay_port'])
self.assertIsNotNone(info['encrypted'])
self.assertIsNotNone(info['repo_name'])
self.assertIsNotNone(info['relay_id'])
self.assertIsNotNone(info['email'])
def test_list_virtual_repos(self):
# TODO: we need to create at least on virtual repo first
vrepos = self.get(VIRTUAL_REPOS_URL).json()['virtual-repos']
for repo in vrepos:
self.assertIsNotNone(repo['virtual_perm'])
#self.assertIsNotNone(repo['store_id'])
self.assertIsNotNone(repo['worktree_invalid'])
self.assertIsNotNone(repo['encrypted'])
self.assertIsNotNone(repo['origin_repo_name'])
self.assertIsNotNone(repo['last_modify'])
self.assertIsNotNone(repo['no_local_history'])
#self.assertIsNotNone(repo['head_branch'])
self.assertIsNotNone(repo['last_sync_time'])
self.assertIsNotNone(repo['id'])
self.assertIsNotNone(repo['size'])
#self.assertIsNotNone(repo['share_permission'])
self.assertIsNotNone(repo['worktree_changed'])
self.assertIsNotNone(repo['worktree_checktime'])
self.assertIsNotNone(repo['origin_path'])
self.assertEqual(repo['is_virtual'], True)
self.assertIsNotNone(repo['origin_repo_id'])
self.assertIsNotNone(repo['version'])
#self.assertIsNotNone(repo['random_key'])
self.assertIsNotNone(repo['is_original_owner'])
#self.assertIsNotNone(repo['shared_email'])
self.assertIsNotNone(repo['enc_version'])
self.assertIsNotNone(repo['head_cmmt_id'])
#self.assertIsNotNone(repo['desc'])
self.assertIsNotNone(repo['index_corrupted'])
#self.assertIsNotNone(repo['magic'])
self.assertIsNotNone(repo['name'])
#self.assertIsNotNone(repo['worktree'])
self.assertIsNotNone(repo['auto_sync'])
#self.assertIsNotNone(repo['relay_id'])

30
tests/api/test_shares.py Normal file
View File

@@ -0,0 +1,30 @@
#coding: UTF-8
from tests.common.utils import urljoin
from tests.api.apitestbase import ApiTestBase
from tests.api.urls import SHARED_LINKS_URL, SHARED_LIBRARIES_URL, \
BESHARED_LIBRARIES_URL, SHARED_FILES_URL, F_URL, S_F_URL
class SharesApiTest(ApiTestBase):
def test_create_file_shared_link(self):
with self.get_tmp_repo() as repo:
fname, _ = self.create_file(repo)
fsurl = urljoin(repo.file_url, 'shared-link')
data = {
'type': 'f',
'p': '/' + fname,
}
res = self.put(fsurl, data=data, expected=201)
self.assertRegexpMatches(res.headers['Location'], \
r'http(.*)/f/(\w{10,10})/')
res = self.get(SHARED_LINKS_URL).json()
self.assertNotEmpty(res)
for fileshare in res['fileshares']:
self.assertIsNotNone(fileshare['username'])
self.assertIsNotNone(fileshare['repo_id'])
#self.assertIsNotNone(fileshare['ctime'])
self.assertIsNotNone(fileshare['s_type'])
self.assertIsNotNone(fileshare['token'])
self.assertIsNotNone(fileshare['view_cnt'])
self.assertIsNotNone(fileshare['path'])

32
tests/api/urls.py Normal file
View File

@@ -0,0 +1,32 @@
from tests.common.common import USERNAME
from tests.common.utils import apiurl
PING_URL = apiurl('/api2/ping/')
TOKEN_URL = apiurl('/api2/auth-token/')
AUTH_PING_URL = apiurl('/api2/auth/ping/')
ACCOUNTS_URL = apiurl('/api2/accounts/')
ACCOUNT_INFO_URL = apiurl('/api2/account/info/')
AVATAR_BASE_URL = apiurl(u'/api2/avatars/')
REPOS_URL = apiurl('/api2/repos/')
DEFAULT_REPO_URL = apiurl('/api2/default-repo/')
VIRTUAL_REPOS_URL = apiurl('/api2/virtual-repos/')
GROUPS_URL = apiurl(u'/api2/groups/')
USERMSGS_URL = apiurl('/api2/user/msgs/', USERNAME)
USERMSGS_COUNT_URL = apiurl('/api2/unseen_messages/')
GROUPMSGS_URL = apiurl('/api2/group/msgs/')
GROUPMSGS_NREPLY_URL = apiurl('/api2/new_replies/')
STARREDFILES_URL = apiurl('/api2/starredfiles/')
SHARED_LINKS_URL = apiurl('/api2/shared-links/')
SHARED_LIBRARIES_URL = apiurl('/api2/shared-repos/')
BESHARED_LIBRARIES_URL = apiurl('/api2/beshared-repos/')
SHARED_FILES_URL = apiurl('/api2/shared-files/')
F_URL = apiurl('/api2/f/')
S_F_URL = apiurl('/api2/s/f/')
LIST_GROUP_AND_CONTACTS_URL = apiurl('/api2/groupandcontacts/')

0
tests/common/__init__.py Normal file
View File

12
tests/common/common.py Normal file
View File

@@ -0,0 +1,12 @@
import os
BASE_URL = os.getenv('SEAHUB_TEST_BASE_URL', u'http://127.0.0.1:8000')
USERNAME = os.getenv('SEAHUB_TEST_USERNAME', u'test@seafiletest.com')
PASSWORD = os.getenv('SEAHUB_TEST_PASSWORD', u'testtest')
ADMIN_USERNAME = os.getenv('SEAHUB_TEST_ADMIN_USERNAME', u'admin@seafiletest.com')
ADMIN_PASSWORD = os.getenv('SEAHUB_TEST_ADMIN_PASSWORD', u'adminadmin')
if os.getenv('SEAHUB_TEST_IS_PRO', u'') == u'':
IS_PRO = False
else:
S_PRO = True

21
tests/common/utils.py Normal file
View File

@@ -0,0 +1,21 @@
import string
import random
from .common import BASE_URL
def randstring(length=0):
if length == 0:
length = random.randint(1, 30)
return ''.join(random.choice(string.lowercase) for i in range(length))
def urljoin(base, *args):
url = base
if url[-1] != '/':
url += '/'
for arg in args:
arg = arg.strip('/')
url += arg + '/'
return url
def apiurl(*parts):
return urljoin(BASE_URL, *parts)

15
tests/install-deps.sh Executable file
View File

@@ -0,0 +1,15 @@
#!/bin/bash
set -e
set -x
SCRIPT=$(readlink -f "$0")
SEAHUB_TESTSDIR=$(dirname "${SCRIPT}")
SEAHUB_SRCDIR=$(dirname "${SEAHUB_TESTSDIR}")
cd "$SEAHUB_SRCDIR"
# install phantomjs
curl -L -o /tmp/phantomjs.tar.bz2 https://bitbucket.org/ariya/phantomjs/downloads/phantomjs-1.9.7-linux-x86_64.tar.bz2
tar -C /tmp -xf /tmp/phantomjs.tar.bz2
sudo install -m 755 /tmp/phantomjs-1.9.7-linux-x86_64/bin/phantomjs /usr/bin/phantomjs

86
tests/seahubtests.sh Executable file
View File

@@ -0,0 +1,86 @@
#!/bin/bash
: ${PYTHON=python}
: ${SEAHUB_TEST_USERNAME="test@seafiletest.com"}
: ${SEAHUB_TEST_PASSWORD="testtest"}
: ${SEAHUB_TEST_ADMIN_USERNAME="admin@seafiletest.com"}
: ${SEAHUB_TEST_ADMIN_PASSWORD="adminadmin"}
export SEAHUB_TEST_USERNAME
export SEAHUB_TEST_PASSWORD
export SEAHUB_TEST_ADMIN_USERNAME
export SEAHUB_TEST_ADMIN_PASSWORD
# If you run this script on your local machine, you must set CCNET_CONF_DIR
# and SEAFILE_CONF_DIR like this:
#
# export CCNET_CONF_DIR=/your/path/to/ccnet
# export SEAFILE_CONF_DIR=/your/path/to/seafile-data
#
set -e
if [[ ${TRAVIS} != "" ]]; then
set -x
fi
SCRIPT=$(readlink -f "$0")
SEAHUB_TESTSDIR=$(dirname "${SCRIPT}")
SEAHUB_SRCDIR=$(dirname "${SEAHUB_TESTSDIR}")
local_settings_py=${SEAHUB_SRCDIR}/seahub/local_settings.py
export PYTHONPATH="/usr/local/lib/python2.7/site-packages:/usr/lib/python2.7/site-packages:${SEAHUB_SRCDIR}/thirdpart:${PYTHONPATH}"
cd "$SEAHUB_SRCDIR"
function init() {
###############################
# create database and two new users: an admin, and a normal user
###############################
$PYTHON ./manage.py syncdb
# create normal user
$PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${SEAHUB_TEST_USERNAME}', '${SEAHUB_TEST_PASSWORD}', 0, 1);"
# create admin
$PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${SEAHUB_TEST_ADMIN_USERNAME}', '${SEAHUB_TEST_ADMIN_PASSWORD}', 1, 1);"
# overwrite api throttling settings in settings.py
echo "REST_FRAMEWORK = {}" >> "${local_settings_py}"
}
function start_seahub() {
$PYTHON ./manage.py runserver 1>/tmp/seahub.access.log 2>&1 &
sleep 5
}
function run_tests() {
set +e
cd tests
nosetests $nose_opts
rvalue=$?
cd -
if [[ ${TRAVIS} != "" ]]; then
# On travis-ci, dump seahub logs when test finished
for logfile in /tmp/seahub*.log; do
echo -e "\nLog file $logfile:\n"
cat "${logfile}"
echo
done
fi
exit $rvalue
}
case $1 in
"init")
init
;;
"runserver")
start_seahub
;;
"test")
shift
nose_opts=$*
run_tests
;;
*)
echo "unknow command \"$1\""
;;
esac

0
tests/ui/__init__.py Normal file
View File

38
tests/ui/test_login.py Normal file
View File

@@ -0,0 +1,38 @@
import unittest
from tests.common.common import BASE_URL, USERNAME, PASSWORD
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
LOGIN_URL = BASE_URL + u'/accounts/login/'
HOME_URL = BASE_URL + u'/home/my/'
LOGOUT_URL = BASE_URL + u'/accounts/logout/'
def get_logged_instance():
browser = webdriver.PhantomJS()
browser.get(LOGIN_URL)
username_input = browser.find_element_by_name('username')
password_input = browser.find_element_by_name('password')
username_input.send_keys(USERNAME)
password_input.send_keys(PASSWORD)
password_input.send_keys(Keys.RETURN)
if browser.current_url != HOME_URL:
browser.quit()
return None
return browser
class LoginTestCase(unittest.TestCase):
def setUp(self):
self.browser = get_logged_instance()
self.assertIsNotNone(self.browser)
self.addCleanup(self.browser.quit)
def test_login(self):
self.assertRegexpMatches(self.browser.current_url, HOME_URL)
def test_logout(self):
myinfo_bar = self.browser.find_element_by_css_selector('#my-info')
logout_input = self.browser.find_element_by_css_selector('a#logout')
myinfo_bar.click()
logout_input.click()
self.assertRegexpMatches(self.browser.current_url, LOGOUT_URL)