mirror of
https://github.com/haiwen/seahub.git
synced 2025-08-10 19:32:25 +00:00
improved tests
This commit is contained in:
parent
2687bdbea4
commit
f65c783fb6
@ -1,14 +0,0 @@
|
|||||||
# Copyright 2014 seahub authors
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
from common import common
|
|
@ -1,11 +1,12 @@
|
|||||||
#coding: UTF-8
|
#coding: UTF-8
|
||||||
|
|
||||||
import re
|
|
||||||
import requests
|
import requests
|
||||||
import unittest
|
import unittest
|
||||||
|
from contextlib import contextmanager
|
||||||
from nose.tools import assert_equal, assert_in # pylint: disable=E0611
|
from nose.tools import assert_equal, assert_in # pylint: disable=E0611
|
||||||
|
from urllib import quote
|
||||||
|
|
||||||
from tests.common.common import USERNAME, PASSWORD, IS_PRO, \
|
from tests.common.common import USERNAME, PASSWORD, \
|
||||||
ADMIN_USERNAME, ADMIN_PASSWORD
|
ADMIN_USERNAME, ADMIN_PASSWORD
|
||||||
|
|
||||||
from tests.common.utils import apiurl, urljoin, randstring
|
from tests.common.utils import apiurl, urljoin, randstring
|
||||||
@ -14,75 +15,11 @@ from tests.api.urls import TOKEN_URL, GROUPS_URL, ACCOUNTS_URL, REPOS_URL
|
|||||||
class ApiTestBase(unittest.TestCase):
|
class ApiTestBase(unittest.TestCase):
|
||||||
_token = None
|
_token = None
|
||||||
_admin_token = None
|
_admin_token = None
|
||||||
|
|
||||||
use_test_user = False
|
username = USERNAME
|
||||||
use_test_group = False
|
password = PASSWORD
|
||||||
use_test_repo = False
|
admin_username = ADMIN_USERNAME
|
||||||
|
admin_password = ADMIN_PASSWORD
|
||||||
test_user_name = None
|
|
||||||
test_user_url = None
|
|
||||||
|
|
||||||
test_group_name = None
|
|
||||||
test_group_id = None
|
|
||||||
test_group_url = None
|
|
||||||
|
|
||||||
test_repo_id = None
|
|
||||||
test_repo_url = None
|
|
||||||
test_file_url = None
|
|
||||||
test_dir_url = None
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
if self.use_test_user:
|
|
||||||
self.create_tmp_user()
|
|
||||||
if self.use_test_group:
|
|
||||||
self.create_tmp_group()
|
|
||||||
if self.use_test_repo:
|
|
||||||
self.create_tmp_repo()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
if self.use_test_user:
|
|
||||||
self.remove_tmp_user()
|
|
||||||
if self.use_test_group:
|
|
||||||
self.remove_tmp_group()
|
|
||||||
if self.use_test_repo:
|
|
||||||
self.remove_tmp_repo()
|
|
||||||
|
|
||||||
def create_tmp_repo(self):
|
|
||||||
repo_name = '测试-test-repo-%s' % randstring(6)
|
|
||||||
data = {
|
|
||||||
'name': repo_name,
|
|
||||||
'desc': 'just for test - 测试用资料库',
|
|
||||||
}
|
|
||||||
repo = self.post(REPOS_URL, data=data).json()
|
|
||||||
self.test_repo_id = repo['repo_id']
|
|
||||||
self.test_repo_url = urljoin(REPOS_URL, self.test_repo_id)
|
|
||||||
self.test_file_url = urljoin(self.test_repo_url, 'file')
|
|
||||||
self.test_dir_url = urljoin(self.test_repo_url, 'dir')
|
|
||||||
|
|
||||||
def remove_tmp_repo(self):
|
|
||||||
if self.test_repo_id:
|
|
||||||
self.delete(self.test_repo_url)
|
|
||||||
|
|
||||||
def create_tmp_group(self):
|
|
||||||
self.test_group_name = '测试群组-%s' % randstring(16)
|
|
||||||
data = {'group_name': self.test_group_name}
|
|
||||||
self.test_group_id = self.put(GROUPS_URL, data=data).json()['group_id']
|
|
||||||
self.test_group_url = urljoin(GROUPS_URL, str(self.test_group_id))
|
|
||||||
|
|
||||||
def remove_tmp_group(self):
|
|
||||||
if self.test_group_id:
|
|
||||||
self.delete(self.test_group_url)
|
|
||||||
|
|
||||||
def create_tmp_user(self):
|
|
||||||
data = {'password': 'testtest'}
|
|
||||||
username = '%s@test.com' % randstring(20)
|
|
||||||
self.put(urljoin(ACCOUNTS_URL, username), data=data, expected=201)
|
|
||||||
self.test_user_name = username
|
|
||||||
self.test_user_url = urljoin(ACCOUNTS_URL, username)
|
|
||||||
|
|
||||||
def remove_tmp_user(self):
|
|
||||||
if self.test_user_name:
|
|
||||||
self.delete(self.test_user_url)
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get(cls, *args, **kwargs):
|
def get(cls, *args, **kwargs):
|
||||||
@ -165,10 +102,134 @@ class ApiTestBase(unittest.TestCase):
|
|||||||
msg = 'Expected not empty, but it is'
|
msg = 'Expected not empty, but it is'
|
||||||
self.assertGreater(len(lst), 0, msg)
|
self.assertGreater(len(lst), 0, msg)
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_tmp_repo(self):
|
||||||
|
"""
|
||||||
|
Context manager to create a tmp repo, and automatically delete it after use
|
||||||
|
|
||||||
|
with self.tmp_repo() as repo:
|
||||||
|
self.get(repo.file_url + '?p=/')
|
||||||
|
"""
|
||||||
|
repo = self.create_repo()
|
||||||
|
try:
|
||||||
|
yield repo
|
||||||
|
finally:
|
||||||
|
self.remove_repo(repo.repo_id)
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_tmp_group(self):
|
||||||
|
"""
|
||||||
|
Context manager to create a tmp group, and automatically delete it after use
|
||||||
|
|
||||||
|
with self.tmp_repo() as repo:
|
||||||
|
self.get(repo.file_url + '?p=/')
|
||||||
|
"""
|
||||||
|
group = self.create_group()
|
||||||
|
try:
|
||||||
|
yield group
|
||||||
|
finally:
|
||||||
|
self.remove_group(group.group_id)
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_tmp_user(self):
|
||||||
|
"""
|
||||||
|
Context manager to create a tmp user, and automatically delete it after use
|
||||||
|
|
||||||
|
with self.tmp_repo() as repo:
|
||||||
|
self.get(repo.file_url + '?p=/')
|
||||||
|
"""
|
||||||
|
user = self.create_user()
|
||||||
|
try:
|
||||||
|
yield user
|
||||||
|
finally:
|
||||||
|
self.remove_user(user.user_name)
|
||||||
|
|
||||||
|
def create_repo(self):
|
||||||
|
repo_name = '测试-test-repo-%s' % randstring(6)
|
||||||
|
data = {
|
||||||
|
'name': repo_name,
|
||||||
|
'desc': 'just for test - 测试用资料库',
|
||||||
|
}
|
||||||
|
repo = self.post(REPOS_URL, data=data).json()
|
||||||
|
repo_id = repo['repo_id']
|
||||||
|
return _Repo(repo_id)
|
||||||
|
|
||||||
|
def remove_repo(self, repo_id):
|
||||||
|
repo_url = urljoin(REPOS_URL, repo_id)
|
||||||
|
self.delete(repo_url)
|
||||||
|
|
||||||
|
def create_group(self):
|
||||||
|
group_name = '测试群组-%s' % randstring(16)
|
||||||
|
data = {'group_name': group_name}
|
||||||
|
group_id = self.put(GROUPS_URL, data=data).json()['group_id']
|
||||||
|
return _Group(group_name, group_id)
|
||||||
|
|
||||||
|
def remove_group(self, group_id):
|
||||||
|
group_url = urljoin(GROUPS_URL, str(group_id))
|
||||||
|
self.delete(group_url)
|
||||||
|
|
||||||
|
def create_user(self):
|
||||||
|
username = '%s@test.com' % randstring(20)
|
||||||
|
password = randstring(20)
|
||||||
|
data = {'password': password}
|
||||||
|
self.admin_put(urljoin(ACCOUNTS_URL, username), data=data, expected=201)
|
||||||
|
return _User(username, password)
|
||||||
|
|
||||||
|
def remove_user(self, username):
|
||||||
|
user_url = urljoin(ACCOUNTS_URL, username)
|
||||||
|
self.admin_delete(user_url)
|
||||||
|
|
||||||
|
def create_file(self, repo, fname=None):
|
||||||
|
fname = fname or ('文件 %s.txt' % randstring())
|
||||||
|
furl = repo.get_filepath_url('/' + fname)
|
||||||
|
data = {'operation': 'create'}
|
||||||
|
res = self.post(furl, data=data, expected=201)
|
||||||
|
self.assertEqual(res.text, '"success"')
|
||||||
|
return fname, furl
|
||||||
|
|
||||||
|
def create_dir(self, repo):
|
||||||
|
data = {'operation': 'mkdir'}
|
||||||
|
dpath = '/目录 %s' % randstring()
|
||||||
|
durl = repo.get_dirpath_url(dpath)
|
||||||
|
res = self.post(durl, data=data, expected=201)
|
||||||
|
self.assertEqual(res.text, u'"success"')
|
||||||
|
return dpath, durl
|
||||||
|
|
||||||
|
|
||||||
def get_auth_token(username, password):
|
def get_auth_token(username, password):
|
||||||
res = requests.post(TOKEN_URL,
|
data = {
|
||||||
data=dict(username=username, password=password))
|
'username': username,
|
||||||
|
'password': password,
|
||||||
|
}
|
||||||
|
res = requests.post(TOKEN_URL, data=data)
|
||||||
assert_equal(res.status_code, 200)
|
assert_equal(res.status_code, 200)
|
||||||
token = res.json()['token']
|
token = res.json()['token']
|
||||||
assert_equal(len(token), 40)
|
assert_equal(len(token), 40)
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
class _Repo(object):
|
||||||
|
def __init__(self, repo_id):
|
||||||
|
self.repo_id = repo_id
|
||||||
|
self.repo_url = urljoin(REPOS_URL, self.repo_id)
|
||||||
|
self.file_url = urljoin(self.repo_url, 'file')
|
||||||
|
self.dir_url = urljoin(self.repo_url, 'dir')
|
||||||
|
|
||||||
|
def get_filepath_url(self, path):
|
||||||
|
query = '?p=%s' % quote(path)
|
||||||
|
return self.file_url + query
|
||||||
|
|
||||||
|
def get_dirpath_url(self, path):
|
||||||
|
query = '?p=%s' % quote(path)
|
||||||
|
return self.dir_url + query
|
||||||
|
|
||||||
|
class _Group(object):
|
||||||
|
def __init__(self, group_name, group_id):
|
||||||
|
self.group_name = group_name
|
||||||
|
self.group_id = group_id
|
||||||
|
self.group_url = urljoin(GROUPS_URL, str(self.group_id))
|
||||||
|
|
||||||
|
class _User(object):
|
||||||
|
def __init__(self, username, password):
|
||||||
|
self.user_name = username
|
||||||
|
self.password = password
|
||||||
|
self.user_url = urljoin(ACCOUNTS_URL, username)
|
||||||
|
@ -2,7 +2,7 @@ import requests
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from tests.common.utils import apiurl, urljoin, randstring
|
from tests.common.utils import apiurl, urljoin, randstring
|
||||||
from tests.api.apitestbase import USERNAME, ApiTestBase
|
from tests.api.apitestbase import ApiTestBase
|
||||||
from tests.api.urls import ACCOUNTS_URL, ACCOUNT_INFO_URL, PING_URL, \
|
from tests.api.urls import ACCOUNTS_URL, ACCOUNT_INFO_URL, PING_URL, \
|
||||||
AUTH_PING_URL
|
AUTH_PING_URL
|
||||||
|
|
||||||
@ -15,7 +15,7 @@ class AccountsApiTest(ApiTestBase):
|
|||||||
def test_check_account_info(self):
|
def test_check_account_info(self):
|
||||||
info = self.get(ACCOUNT_INFO_URL).json()
|
info = self.get(ACCOUNT_INFO_URL).json()
|
||||||
self.assertIsNotNone(info)
|
self.assertIsNotNone(info)
|
||||||
self.assertEqual(info['email'], USERNAME)
|
self.assertEqual(info['email'], self.username)
|
||||||
self.assertIsNotNone(info['total'])
|
self.assertIsNotNone(info['total'])
|
||||||
self.assertIsNotNone(info['usage'])
|
self.assertIsNotNone(info['usage'])
|
||||||
|
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from tests.api.apitestbase import ApiTestBase, USERNAME
|
from tests.api.apitestbase import ApiTestBase
|
||||||
from tests.api.urls import AVATAR_BASE_URL, GROUPS_URL
|
from tests.api.urls import AVATAR_BASE_URL, GROUPS_URL
|
||||||
from tests.common.utils import randstring, apiurl, urljoin
|
from tests.common.utils import randstring, apiurl, urljoin
|
||||||
|
|
||||||
class AvatarApiTest(ApiTestBase):
|
class AvatarApiTest(ApiTestBase):
|
||||||
def test_user_avatar(self):
|
def test_user_avatar(self):
|
||||||
avatar_url = urljoin(AVATAR_BASE_URL, 'user', USERNAME, '/resized/80/')
|
avatar_url = urljoin(AVATAR_BASE_URL, 'user', self.username, '/resized/80/')
|
||||||
info = self.get(avatar_url).json()
|
info = self.get(avatar_url).json()
|
||||||
self.assertIsNotNone(info['url'])
|
self.assertIsNotNone(info['url'])
|
||||||
self.assertIsNotNone(info['is_default'])
|
self.assertIsNotNone(info['is_default'])
|
||||||
|
@ -12,129 +12,114 @@ from tests.api.urls import DEFAULT_REPO_URL, REPOS_URL
|
|||||||
from tests.api.apitestbase import ApiTestBase, USERNAME
|
from tests.api.apitestbase import ApiTestBase, USERNAME
|
||||||
|
|
||||||
class FilesApiTest(ApiTestBase):
|
class FilesApiTest(ApiTestBase):
|
||||||
use_test_repo = True
|
|
||||||
use_test_user = True
|
|
||||||
|
|
||||||
def create_file(self, fname=None):
|
|
||||||
data = {'operation': 'create'}
|
|
||||||
fname = fname or ('文件 %s.txt' % randstring())
|
|
||||||
fpath = '/' + fname
|
|
||||||
query = urlencode(dict(p=fpath))
|
|
||||||
furl = self.test_file_url + '?' + query
|
|
||||||
res = self.post(furl, data=data, expected=201)
|
|
||||||
self.assertEqual(res.text, '"success"')
|
|
||||||
return fname, furl
|
|
||||||
|
|
||||||
def create_dir(self):
|
|
||||||
data = {'operation': 'mkdir'}
|
|
||||||
dpath = '/目录 %s' % randstring()
|
|
||||||
query = urlencode(dict(p=dpath))
|
|
||||||
durl = self.test_dir_url + '?' + query
|
|
||||||
res = self.post(durl, data=data, expected=201)
|
|
||||||
self.assertEqual(res.text, u'"success"')
|
|
||||||
return dpath, durl
|
|
||||||
|
|
||||||
def test_create_file(self):
|
|
||||||
self.create_file()
|
|
||||||
|
|
||||||
def test_rename_file(self):
|
def test_rename_file(self):
|
||||||
name, furl = self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
data = {
|
name, furl = self.create_file(repo)
|
||||||
'operation': 'rename',
|
data = {
|
||||||
'newname': name + randstring(),
|
'operation': 'rename',
|
||||||
}
|
'newname': name + randstring(),
|
||||||
res = self.post(furl, data=data)
|
}
|
||||||
self.assertRegexpMatches(res.text, r'"http(.*)"')
|
res = self.post(furl, data=data)
|
||||||
|
self.assertRegexpMatches(res.text, r'"http(.*)"')
|
||||||
|
|
||||||
def test_remove_file(self):
|
def test_remove_file(self):
|
||||||
_, furl = self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
res = self.delete(furl)
|
_, furl = self.create_file(repo)
|
||||||
self.assertEqual(res.text, '"success"')
|
res = self.delete(furl)
|
||||||
|
self.assertEqual(res.text, '"success"')
|
||||||
|
|
||||||
def test_move_file(self):
|
def test_move_file(self):
|
||||||
_, furl = self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
# TODO: create another repo here, and use it as dst_repo
|
_, furl = self.create_file(repo)
|
||||||
data = {
|
# TODO: create another repo here, and use it as dst_repo
|
||||||
'operation': 'move',
|
data = {
|
||||||
'dst_repo': self.test_repo_id,
|
'operation': 'move',
|
||||||
'dst_dir': '/'
|
'dst_repo': repo.repo_id,
|
||||||
}
|
'dst_dir': '/',
|
||||||
res = self.post(furl, data=data)
|
}
|
||||||
self.assertEqual(res.text, '"success"')
|
res = self.post(furl, data=data)
|
||||||
|
self.assertEqual(res.text, '"success"')
|
||||||
|
|
||||||
def test_copy_file(self):
|
def test_copy_file(self):
|
||||||
fname, _ = self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
# TODO: create another repo here, and use it as dst_repo
|
fname, _ = self.create_file(repo)
|
||||||
dpath, _ = self.create_dir()
|
# TODO: create another repo here, and use it as dst_repo
|
||||||
fopurl = self.test_repo_url + u'fileops/copy/?p=/'
|
dpath, _ = self.create_dir(repo)
|
||||||
data = {
|
fopurl = urljoin(repo.repo_url, 'fileops/copy/') + '?p=/'
|
||||||
'file_names': fname,
|
data = {
|
||||||
'dst_repo': self.test_repo_id,
|
'file_names': fname,
|
||||||
'dst_dir': dpath,
|
'dst_repo': repo.repo_id,
|
||||||
}
|
'dst_dir': dpath,
|
||||||
res = self.post(fopurl, data=data)
|
}
|
||||||
self.assertEqual(res.text, '"success"')
|
res = self.post(fopurl, data=data)
|
||||||
|
self.assertEqual(res.text, '"success"')
|
||||||
|
|
||||||
def test_download_file(self):
|
def test_download_file(self):
|
||||||
fname, furl = self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
res = self.get(furl)
|
fname, furl = self.create_file(repo)
|
||||||
self.assertRegexpMatches(res.text, '"http(.*)/%s"' % quote(fname))
|
res = self.get(furl)
|
||||||
|
self.assertRegexpMatches(res.text, '"http(.*)/%s"' % quote(fname))
|
||||||
|
|
||||||
def test_download_file_from_history(self):
|
def test_download_file_from_history(self):
|
||||||
fname, _ = self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
file_history_url = self.test_file_url + u'history/?p=/%s' % quote(fname)
|
fname, _ = self.create_file(repo)
|
||||||
res = self.get(file_history_url).json()
|
file_history_url = urljoin(repo.repo_url, 'history/') + \
|
||||||
commit_id = res['commits'][0]['id']
|
'?p=/%s' % quote(fname)
|
||||||
self.assertEqual(len(commit_id), 40)
|
res = self.get(file_history_url).json()
|
||||||
data = {
|
commit_id = res['commits'][0]['id']
|
||||||
'p': fname,
|
self.assertEqual(len(commit_id), 40)
|
||||||
'commit_id': commit_id,
|
data = {
|
||||||
}
|
'p': fname,
|
||||||
query = '?' + urlencode(data)
|
'commit_id': commit_id,
|
||||||
res = self.get(self.test_file_url + query)
|
}
|
||||||
self.assertEqual(res.status_code, 200)
|
query = '?' + urlencode(data)
|
||||||
self.assertRegexpMatches(res.text, r'"http(.*)/%s"' % quote(fname))
|
res = self.get(repo.file_url + query)
|
||||||
|
self.assertRegexpMatches(res.text, r'"http(.*)/%s"' % quote(fname))
|
||||||
|
|
||||||
def test_get_file_detail(self):
|
def test_get_file_detail(self):
|
||||||
fname, _ = self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
fdurl = self.test_file_url + u'detail/?p=/%s' % quote(fname)
|
fname, _ = self.create_file(repo)
|
||||||
detail = self.get(fdurl).json()
|
fdurl = repo.file_url + u'detail/?p=/%s' % quote(fname)
|
||||||
self.assertIsNotNone(detail)
|
detail = self.get(fdurl).json()
|
||||||
self.assertIsNotNone(detail['id'])
|
self.assertIsNotNone(detail)
|
||||||
self.assertIsNotNone(detail['mtime'])
|
self.assertIsNotNone(detail['id'])
|
||||||
self.assertIsNotNone(detail['type'])
|
self.assertIsNotNone(detail['mtime'])
|
||||||
self.assertIsNotNone(detail['name'])
|
self.assertIsNotNone(detail['type'])
|
||||||
self.assertIsNotNone(detail['size'])
|
self.assertIsNotNone(detail['name'])
|
||||||
|
self.assertIsNotNone(detail['size'])
|
||||||
|
|
||||||
def test_get_file_history(self):
|
def test_get_file_history(self):
|
||||||
fname, _ = self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
fhurl = self.test_file_url + u'history/?p=%s' % quote(fname)
|
fname, _ = self.create_file(repo)
|
||||||
history = self.get(fhurl).json()
|
fhurl = repo.file_url + u'history/?p=%s' % quote(fname)
|
||||||
for commit in history['commits']:
|
history = self.get(fhurl).json()
|
||||||
self.assertIsNotNone(commit['rev_file_size'])
|
for commit in history['commits']:
|
||||||
#self.assertIsNotNone(commit['rev_file_id']) #allow null
|
self.assertIsNotNone(commit['rev_file_size'])
|
||||||
self.assertIsNotNone(commit['ctime'])
|
#self.assertIsNotNone(commit['rev_file_id']) #allow null
|
||||||
self.assertIsNotNone(commit['creator_name'])
|
self.assertIsNotNone(commit['ctime'])
|
||||||
self.assertIsNotNone(commit['creator'])
|
self.assertIsNotNone(commit['creator_name'])
|
||||||
self.assertIsNotNone(commit['root_id'])
|
self.assertIsNotNone(commit['creator'])
|
||||||
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
|
self.assertIsNotNone(commit['root_id'])
|
||||||
#self.assertIsNotNone(commit['parent_id']) #allow null
|
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
|
||||||
self.assertIsNotNone(commit['new_merge'])
|
#self.assertIsNotNone(commit['parent_id']) #allow null
|
||||||
self.assertIsNotNone(commit['repo_id'])
|
self.assertIsNotNone(commit['new_merge'])
|
||||||
self.assertIsNotNone(commit['desc'])
|
self.assertIsNotNone(commit['repo_id'])
|
||||||
self.assertIsNotNone(commit['id'])
|
self.assertIsNotNone(commit['desc'])
|
||||||
self.assertIsNotNone(commit['conflict'])
|
self.assertIsNotNone(commit['id'])
|
||||||
#self.assertIsNotNone(commit['second_parent_id']) #allow null
|
self.assertIsNotNone(commit['conflict'])
|
||||||
|
#self.assertIsNotNone(commit['second_parent_id']) #allow null
|
||||||
|
|
||||||
def test_get_upload_link(self):
|
def test_get_upload_link(self):
|
||||||
upload_url = self.test_repo_url + u'upload-link/'
|
with self.get_tmp_repo() as repo:
|
||||||
res = self.get(upload_url)
|
upload_url = urljoin(repo.repo_url, 'upload-link')
|
||||||
self.assertRegexpMatches(res.text, r'"http(.*)/upload-api/\w{8,8}"')
|
res = self.get(upload_url)
|
||||||
|
self.assertRegexpMatches(res.text, r'"http(.*)/upload-api/\w{8,8}"')
|
||||||
|
|
||||||
def test_get_update_link(self):
|
def test_get_update_link(self):
|
||||||
update_url = self.test_repo_url + u'update-link/'
|
with self.get_tmp_repo() as repo:
|
||||||
res = self.get(update_url)
|
update_url = urljoin(repo.repo_url, 'update-link')
|
||||||
self.assertRegexpMatches(res.text, r'"http(.*)/update-api/\w{8,8}"')
|
res = self.get(update_url)
|
||||||
|
self.assertRegexpMatches(res.text, r'"http(.*)/update-api/\w{8,8}"')
|
||||||
|
|
||||||
# def test_upload_file(self):
|
# def test_upload_file(self):
|
||||||
# # XXX: requests has problems when post a file whose name contains
|
# # XXX: requests has problems when post a file whose name contains
|
||||||
@ -166,54 +151,57 @@ class FilesApiTest(ApiTestBase):
|
|||||||
# self.assertRegexpMatches(res.text, r'\w{40,40}')
|
# self.assertRegexpMatches(res.text, r'\w{40,40}')
|
||||||
|
|
||||||
def test_get_upload_blocks_link(self):
|
def test_get_upload_blocks_link(self):
|
||||||
upload_blks_url = self.test_repo_url + u'upload-blks-link/'
|
with self.get_tmp_repo() as repo:
|
||||||
res = self.get(upload_blks_url)
|
upload_blks_url = urljoin(repo.repo_url, 'upload-blks-link')
|
||||||
self.assertRegexpMatches(res.text, r'"http(.*)/upload-blks-api/\w{8,8}"')
|
res = self.get(upload_blks_url)
|
||||||
|
self.assertRegexpMatches(res.text, r'"http(.*)/upload-blks-api/\w{8,8}"')
|
||||||
|
|
||||||
def test_get_update_blocks_link(self):
|
def test_get_update_blocks_link(self):
|
||||||
update_blks_url = self.test_repo_url + u'update-blks-link/'
|
with self.get_tmp_repo() as repo:
|
||||||
res = self.get(update_blks_url)
|
update_blks_url = urljoin(repo.repo_url, 'update-blks-link')
|
||||||
self.assertRegexpMatches(res.text, r'"http(.*)/update-blks-api/\w{8,8}"')
|
res = self.get(update_blks_url)
|
||||||
|
self.assertRegexpMatches(res.text, r'"http(.*)/update-blks-api/\w{8,8}"')
|
||||||
|
|
||||||
def test_list_dir(self):
|
def test_list_dir(self):
|
||||||
self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
self.create_dir()
|
self.create_file(repo)
|
||||||
dirents = self.get(self.test_dir_url).json()
|
self.create_dir(repo)
|
||||||
self.assertHasLen(dirents, 2)
|
dirents = self.get(repo.dir_url).json()
|
||||||
for dirent in dirents:
|
self.assertHasLen(dirents, 2)
|
||||||
self.assertIsNotNone(dirent['id'])
|
for dirent in dirents:
|
||||||
self.assertIsNotNone(dirent['name'])
|
self.assertIsNotNone(dirent['id'])
|
||||||
self.assertIn(dirent['type'], ('file', 'dir'))
|
self.assertIsNotNone(dirent['name'])
|
||||||
if dirent['type'] == 'file':
|
self.assertIn(dirent['type'], ('file', 'dir'))
|
||||||
self.assertIsNotNone(dirent['size'])
|
if dirent['type'] == 'file':
|
||||||
|
self.assertIsNotNone(dirent['size'])
|
||||||
def test_create_dir(self):
|
|
||||||
self.create_dir()
|
|
||||||
|
|
||||||
def test_remove_dir(self):
|
def test_remove_dir(self):
|
||||||
_, durl = self.create_dir()
|
with self.get_tmp_repo() as repo:
|
||||||
res = self.delete(durl)
|
_, durl = self.create_dir(repo)
|
||||||
self.assertEqual(res.text, u'"success"')
|
res = self.delete(durl)
|
||||||
self.get(durl, expected=404)
|
self.assertEqual(res.text, u'"success"')
|
||||||
|
self.get(durl, expected=404)
|
||||||
|
|
||||||
def test_download_dir(self):
|
def test_download_dir(self):
|
||||||
dpath, _ = self.create_dir()
|
with self.get_tmp_repo() as repo:
|
||||||
query = urlencode({'p': dpath})
|
dpath, _ = self.create_dir(repo)
|
||||||
ddurl = self.test_dir_url + 'download/' + '?' + query
|
query = '?p=%s' % quote(dpath)
|
||||||
res = self.get(ddurl)
|
ddurl = urljoin(repo.dir_url, 'download') + query
|
||||||
self.assertRegexpMatches(res.text,
|
res = self.get(ddurl)
|
||||||
r'"http(.*)/files/\w{8,8}/%s"' % quote(dpath[1:]))
|
self.assertRegexpMatches(res.text,
|
||||||
|
r'"http(.*)/files/\w{8,8}/%s"' % quote(dpath[1:]))
|
||||||
|
|
||||||
def test_share_dir(self):
|
def test_share_dir(self):
|
||||||
dpath, _ = self.create_dir()
|
with self.get_tmp_repo() as repo:
|
||||||
query = urlencode({'p': dpath})
|
dpath, _ = self.create_dir(repo)
|
||||||
share_dir_url = self.test_dir_url + u'share/' + '?' + query
|
query = '?p=%s' % quote(dpath)
|
||||||
# TODO: share to another user
|
share_dir_url = urljoin(repo.dir_url, 'share/') + query
|
||||||
data = {
|
with self.get_tmp_user() as user:
|
||||||
'emails': USERNAME,
|
data = {
|
||||||
's_type': 'd',
|
'emails': user.user_name,
|
||||||
'path': '/',
|
's_type': 'd',
|
||||||
'perm': 'r'
|
'path': '/',
|
||||||
}
|
'perm': 'r'
|
||||||
res = self.post(share_dir_url, data=data)
|
}
|
||||||
self.assertEqual(res.text, u'{}')
|
res = self.post(share_dir_url, data=data)
|
||||||
|
self.assertEqual(res.text, u'{}')
|
||||||
|
@ -1,74 +0,0 @@
|
|||||||
from apitestbase import GROUPS_URL, GROUPMSGS_URL, GROUPMSGS_NREPLY_URL
|
|
||||||
from apitestbase import GROUP_URL, get_authed_instance
|
|
||||||
from common.utils import randomword
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
class GroupMsgsApiTestCase(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.requests = get_authed_instance()
|
|
||||||
self.assertIsNotNone(self.requests)
|
|
||||||
self.gname = randomword(16)
|
|
||||||
data = { 'group_name': self.gname }
|
|
||||||
res = self.requests.put(GROUPS_URL, data=data)
|
|
||||||
self.gid = res.json()['group_id']
|
|
||||||
self.gmurl = GROUPMSGS_URL + str(self.gid) + u'/'
|
|
||||||
self.gmurl_ = GROUP_URL + str(self.gid) + u'/msg/'
|
|
||||||
|
|
||||||
def test_list_group_msgs_api(self):
|
|
||||||
res = self.requests.get(self.gmurl)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['next_page'])
|
|
||||||
self.assertIsNotNone(json['msgs'])
|
|
||||||
|
|
||||||
def test_post_group_msg_api(self):
|
|
||||||
#repo_id and path is not tested
|
|
||||||
data = { 'message': 'test message' }
|
|
||||||
res = self.requests.post(self.gmurl, data=data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertIsNotNone(res.json()['msgid'])
|
|
||||||
|
|
||||||
def test_reply_group_msg_api(self):
|
|
||||||
data = { 'message': 'test message' }
|
|
||||||
res = self.requests.post(self.gmurl, data=data)
|
|
||||||
msgid = res.json()['msgid']
|
|
||||||
res = self.requests.post(self.gmurl_ + str(msgid) + u'/', data=data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertIsNotNone(res.json()['msgid'])
|
|
||||||
|
|
||||||
def test_get_group_msg_detail_api(self):
|
|
||||||
data = { 'message': 'test message' }
|
|
||||||
res = self.requests.post(self.gmurl, data=data)
|
|
||||||
msgid = res.json()['msgid']
|
|
||||||
res = self.requests.get(self.gmurl_ + str(msgid) + u'/')
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['reply_cnt'])
|
|
||||||
self.assertIsNotNone(json['timestamp'])
|
|
||||||
self.assertIsNotNone(json['replies'])
|
|
||||||
self.assertIsNotNone(json['from_email'])
|
|
||||||
self.assertIsNotNone(json['msgid'])
|
|
||||||
self.assertIsNotNone(json['msg'])
|
|
||||||
self.assertIsNotNone(json['nickname'])
|
|
||||||
|
|
||||||
def test_new_replies_group_msg_api(self):
|
|
||||||
data = { 'message': 'test message' }
|
|
||||||
res = self.requests.post(self.gmurl, data=data)
|
|
||||||
res = self.requests.get(GROUPMSGS_NREPLY_URL)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
for reply in json:
|
|
||||||
self.assertIsNotNone(reply['reply_cnt'])
|
|
||||||
self.assertIsNotNone(reply['timestamp'])
|
|
||||||
self.assertIsNotNone(reply['replies'])
|
|
||||||
self.assertIsNotNone(reply['from_email'])
|
|
||||||
self.assertIsNotNone(reply['msgid'])
|
|
||||||
self.assertIsNotNone(reply['msg'])
|
|
||||||
self.assertIsNotNone(reply['nickname'])
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
@ -10,30 +10,30 @@ from tests.api.urls import GROUPS_URL
|
|||||||
from tests.common.utils import apiurl, urljoin, randstring
|
from tests.common.utils import apiurl, urljoin, randstring
|
||||||
|
|
||||||
class GroupsApiTest(ApiTestBase):
|
class GroupsApiTest(ApiTestBase):
|
||||||
use_test_user = True
|
|
||||||
use_test_group = True
|
|
||||||
|
|
||||||
def test_add_remove_group_member(self):
|
def test_add_remove_group_member(self):
|
||||||
test_group_members_url = urljoin(self.test_group_url, '/members/')
|
with self.get_tmp_user() as user:
|
||||||
data = {'user_name': self.test_user_name}
|
with self.get_tmp_group() as group:
|
||||||
res = self.put(test_group_members_url, data=data).json()
|
test_group_members_url = urljoin(group.group_url, '/members/')
|
||||||
self.assertTrue(res['success'])
|
data = {'user_name': user.user_name}
|
||||||
res = self.delete(test_group_members_url, data=data).json()
|
res = self.put(test_group_members_url, data=data).json()
|
||||||
self.assertTrue(res['success'])
|
self.assertTrue(res['success'])
|
||||||
|
res = self.delete(test_group_members_url, data=data).json()
|
||||||
|
self.assertTrue(res['success'])
|
||||||
|
|
||||||
def test_list_groups(self):
|
def test_list_groups(self):
|
||||||
groups = self.get(GROUPS_URL).json()
|
with self.get_tmp_group() as group:
|
||||||
self.assertGreaterEqual(groups['replynum'], 0)
|
groups = self.get(GROUPS_URL).json()
|
||||||
self.assertNotEmpty(groups['groups'])
|
self.assertGreaterEqual(groups['replynum'], 0)
|
||||||
for group in groups['groups']:
|
self.assertNotEmpty(groups['groups'])
|
||||||
self.assertIsNotNone(group['ctime'])
|
for group in groups['groups']:
|
||||||
self.assertIsNotNone(group['creator'])
|
self.assertIsNotNone(group['ctime'])
|
||||||
self.assertIsNotNone(group['msgnum'])
|
self.assertIsNotNone(group['creator'])
|
||||||
self.assertIsNotNone(group['mtime'])
|
self.assertIsNotNone(group['msgnum'])
|
||||||
self.assertIsNotNone(group['id'])
|
self.assertIsNotNone(group['mtime'])
|
||||||
self.assertIsNotNone(group['name'])
|
self.assertIsNotNone(group['id'])
|
||||||
|
self.assertIsNotNone(group['name'])
|
||||||
|
|
||||||
def test_add_group(self):
|
def test_add_remove_group(self):
|
||||||
data = {'group_name': randstring(16)}
|
data = {'group_name': randstring(16)}
|
||||||
info = self.put(GROUPS_URL, data=data).json()
|
info = self.put(GROUPS_URL, data=data).json()
|
||||||
self.assertTrue(info['success'])
|
self.assertTrue(info['success'])
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
import unittest
|
import unittest
|
||||||
from tests.common.utils import apiurl
|
|
||||||
from tests.api.apitestbase import ApiTestBase
|
from tests.api.apitestbase import ApiTestBase
|
||||||
|
from tests.api.urls import LIST_GROUP_AND_CONTACTS_URL
|
||||||
|
|
||||||
class MiscApiTest(ApiTestBase):
|
class MiscApiTest(ApiTestBase):
|
||||||
def test_list_group_and_contacts_api(self):
|
def test_list_group_and_contacts(self):
|
||||||
res = self.get(LIST_GROUP_AND_CONTACTS_URL).json()
|
res = self.get(LIST_GROUP_AND_CONTACTS_URL).json()
|
||||||
self.assertIsNotNone(res)
|
self.assertIsNotNone(res)
|
||||||
self.assertIsInstance(res['contacts'], list)
|
self.assertIsInstance(res['contacts'], list)
|
||||||
|
@ -5,17 +5,12 @@ Test repos api.
|
|||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from tests.api.apitestbase import ApiTestBase, USERNAME
|
from tests.api.apitestbase import ApiTestBase
|
||||||
from tests.api.urls import REPOS_URL, DEFAULT_REPO_URL, VIRTUAL_REPOS_URL
|
from tests.api.urls import REPOS_URL, DEFAULT_REPO_URL, VIRTUAL_REPOS_URL
|
||||||
from tests.common.utils import apiurl, urljoin, randstring
|
from tests.common.utils import apiurl, urljoin, randstring
|
||||||
|
|
||||||
|
# TODO: all tests should be run on an encrypted repo
|
||||||
class ReposApiTest(ApiTestBase):
|
class ReposApiTest(ApiTestBase):
|
||||||
use_test_repo = True
|
|
||||||
|
|
||||||
def remove_repo(cls, repo_id):
|
|
||||||
cls.delete(urljoin(REPOS_URL, repo_id))
|
|
||||||
|
|
||||||
def test_get_default_repo(self):
|
def test_get_default_repo(self):
|
||||||
repo = self.get(DEFAULT_REPO_URL).json()
|
repo = self.get(DEFAULT_REPO_URL).json()
|
||||||
self.assertIsNotNone(repo['exists'])
|
self.assertIsNotNone(repo['exists'])
|
||||||
@ -28,6 +23,7 @@ class ReposApiTest(ApiTestBase):
|
|||||||
def test_list_repos(self):
|
def test_list_repos(self):
|
||||||
repos = self.get(REPOS_URL).json()
|
repos = self.get(REPOS_URL).json()
|
||||||
self.assertHasLen(repos, 1)
|
self.assertHasLen(repos, 1)
|
||||||
|
|
||||||
for repo in repos:
|
for repo in repos:
|
||||||
self.assertIsNotNone(repo['permission'])
|
self.assertIsNotNone(repo['permission'])
|
||||||
self.assertIsNotNone(repo['encrypted'])
|
self.assertIsNotNone(repo['encrypted'])
|
||||||
@ -42,91 +38,89 @@ class ReposApiTest(ApiTestBase):
|
|||||||
self.assertIsNotNone(repo['root'])
|
self.assertIsNotNone(repo['root'])
|
||||||
|
|
||||||
def test_get_repo_info(self):
|
def test_get_repo_info(self):
|
||||||
repo = self.get(test_repo_url).json()
|
with self.get_tmp_repo() as repo:
|
||||||
self.assertFalse(repo['encrypted'])
|
rinfo = self.get(repo.repo_url).json()
|
||||||
self.assertIsNotNone(repo['mtime'])
|
self.assertFalse(rinfo['encrypted'])
|
||||||
self.assertIsNotNone(repo['owner'])
|
self.assertIsNotNone(rinfo['mtime'])
|
||||||
self.assertIsNotNone(repo['id'])
|
self.assertIsNotNone(rinfo['owner'])
|
||||||
self.assertIsNotNone(repo['size'])
|
self.assertIsNotNone(rinfo['id'])
|
||||||
self.assertIsNotNone(repo['name'])
|
self.assertIsNotNone(rinfo['size'])
|
||||||
self.assertIsNotNone(repo['root'])
|
self.assertIsNotNone(rinfo['name'])
|
||||||
self.assertIsNotNone(repo['desc'])
|
self.assertIsNotNone(rinfo['root'])
|
||||||
self.assertIsNotNone(repo['type'])
|
self.assertIsNotNone(rinfo['desc'])
|
||||||
# self.assertIsNotNone(repo['password_need']) #allow null here
|
self.assertIsNotNone(rinfo['type'])
|
||||||
|
# elf.assertIsNotNone(rinfo['password_need']) # allow null here
|
||||||
|
|
||||||
def test_get_repo_owner(self):
|
def test_get_repo_owner(self):
|
||||||
repo_owner_url = urljoin(self.test_repo_url, '/owner/')
|
with self.get_tmp_repo() as repo:
|
||||||
info = self.get(repo_owner_url).json()
|
repo_owner_url = urljoin(repo.repo_url, '/owner/')
|
||||||
self.assertEqual(info['owner'], self.test_user_name)
|
# XXX: why only admin can get the owner of a repo?
|
||||||
|
info = self.admin_get(repo_owner_url).json()
|
||||||
|
self.assertEqual(info['owner'], self.username)
|
||||||
|
|
||||||
def test_get_repo_history(self):
|
def test_get_repo_history(self):
|
||||||
repo_history_url = urljoin(REPOS_URL, self.test_repo_id, '/history/')
|
with self.get_tmp_repo() as repo:
|
||||||
history = self.get(repo_history_url).json()
|
self.create_file(repo)
|
||||||
for commit in history['commits']:
|
self.create_dir(repo)
|
||||||
self.assertIsNotNone(commit['rev_file_size'])
|
repo_history_url = urljoin(repo.repo_url, '/history/')
|
||||||
#self.assertIsNotNone(commit['rev_file_id']) #allow null
|
history = self.get(repo_history_url).json()
|
||||||
self.assertIsNotNone(commit['ctime'])
|
commits = history['commits']
|
||||||
self.assertIsNotNone(commit['creator_name'])
|
self.assertHasLen(commits, 3)
|
||||||
self.assertIsNotNone(commit['creator'])
|
for commit in commits:
|
||||||
self.assertIsNotNone(commit['root_id'])
|
self.assertIsNotNone(commit['rev_file_size'])
|
||||||
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
|
#self.assertIsNotNone(commit['rev_file_id']) #allow null
|
||||||
#self.assertIsNotNone(commit['parent_id']) #allow null
|
self.assertIsNotNone(commit['ctime'])
|
||||||
self.assertIsNotNone(commit['new_merge'])
|
self.assertIsNotNone(commit['creator_name'])
|
||||||
self.assertIsNotNone(commit['repo_id'])
|
self.assertIsNotNone(commit['creator'])
|
||||||
self.assertIsNotNone(commit['desc'])
|
self.assertIsNotNone(commit['root_id'])
|
||||||
self.assertIsNotNone(commit['id'])
|
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
|
||||||
self.assertIsNotNone(commit['conflict'])
|
#self.assertIsNotNone(commit['parent_id']) #allow null
|
||||||
#self.assertIsNotNone(commit['second_parent_id']) #allow null
|
self.assertIsNotNone(commit['new_merge'])
|
||||||
|
self.assertIsNotNone(commit['repo_id'])
|
||||||
|
self.assertIsNotNone(commit['desc'])
|
||||||
|
self.assertIsNotNone(commit['id'])
|
||||||
|
self.assertIsNotNone(commit['conflict'])
|
||||||
|
#self.assertIsNotNone(commit['second_parent_id']) #allow null
|
||||||
|
|
||||||
def test_create_repo(self):
|
def test_create_repo(self):
|
||||||
data = {'name': 'test'}
|
data = {'name': 'test'}
|
||||||
res = self.post(REPOS_URL, data=data)
|
res = self.post(REPOS_URL, data=data)
|
||||||
repo = res.json()
|
repo = res.json()
|
||||||
self.assertIsNotNone(repo['encrypted'])
|
|
||||||
self.assertIsNotNone(repo['enc_version'])
|
|
||||||
self.assertIsNotNone(repo['repo_id'])
|
|
||||||
self.assertIsNotNone(repo['magic'])
|
|
||||||
self.assertIsNotNone(repo['relay_id'])
|
|
||||||
self.assertIsNotNone(repo['repo_version'])
|
|
||||||
self.assertIsNotNone(repo['relay_addr'])
|
|
||||||
self.assertIsNotNone(repo['token'])
|
|
||||||
self.assertIsNotNone(repo['relay_port'])
|
|
||||||
self.assertIsNotNone(repo['random_key'])
|
|
||||||
self.assertIsNotNone(repo['email'])
|
|
||||||
self.assertIsNotNone(repo['repo_name'])
|
|
||||||
|
|
||||||
repo_id = repo['repo_id']
|
repo_id = repo['repo_id']
|
||||||
self.remove_repo(repo_id)
|
try:
|
||||||
# Check the repo is really removed
|
self.assertIsNotNone(repo['encrypted'])
|
||||||
self.get(urljoin(REPOS_URL, repo_id), expected=404)
|
self.assertIsNotNone(repo['enc_version'])
|
||||||
|
self.assertIsNotNone(repo['repo_id'])
|
||||||
|
self.assertIsNotNone(repo['magic'])
|
||||||
|
self.assertIsNotNone(repo['relay_id'])
|
||||||
|
self.assertIsNotNone(repo['repo_version'])
|
||||||
|
self.assertIsNotNone(repo['relay_addr'])
|
||||||
|
self.assertIsNotNone(repo['token'])
|
||||||
|
self.assertIsNotNone(repo['relay_port'])
|
||||||
|
self.assertIsNotNone(repo['random_key'])
|
||||||
|
self.assertIsNotNone(repo['email'])
|
||||||
|
self.assertIsNotNone(repo['repo_name'])
|
||||||
|
finally:
|
||||||
|
self.remove_repo(repo_id)
|
||||||
|
# Check the repo is really removed
|
||||||
|
self.get(urljoin(REPOS_URL, repo_id), expected=404)
|
||||||
|
|
||||||
# TODO: create a sub folder and use it as a sub repo
|
def test_check_or_create_sub_repo(self):
|
||||||
# def test_check_or_create_sub_repo(self):
|
# TODO: create a sub folder and use it as a sub repo
|
||||||
# sub_repo_url = urljoin(REPOS_URL, self.test_repo_id, '/dir/sub_repo/')
|
|
||||||
# params = {'p': '/', 'name': 'sub_lib'}
|
|
||||||
# info = self.get(sub_repo_url, params=params).json()
|
|
||||||
# self.assertHasLen(info['sub_repo_id'], 36)
|
|
||||||
# self.remove_repo(info['sub_repo_id'])
|
|
||||||
|
|
||||||
def test_encrpty_or_decrypy_repo(self):
|
|
||||||
# TODO: create a encrypted library
|
|
||||||
pass
|
pass
|
||||||
# repo_url = urljoin(REPOS_URL, repo_id)
|
|
||||||
# data = {'password': 'test'}
|
|
||||||
# res = self.post(repo_url, data)
|
|
||||||
# self.assertEqual(res.text, '"success"')
|
|
||||||
|
|
||||||
def test_fetch_repo_download_info(self):
|
def test_fetch_repo_download_info(self):
|
||||||
download_info_repo_url = urljoin(REPOS_URL, self.test_repo_id, '/download-info/')
|
with self.get_tmp_repo() as repo:
|
||||||
info = self.get(download_info_repo_url).json()
|
download_info_repo_url = urljoin(repo.repo_url, '/download-info/')
|
||||||
self.assertIsNotNone(info['relay_addr'])
|
info = self.get(download_info_repo_url).json()
|
||||||
self.assertIsNotNone(info['token'])
|
self.assertIsNotNone(info['relay_addr'])
|
||||||
self.assertIsNotNone(info['repo_id'])
|
self.assertIsNotNone(info['token'])
|
||||||
self.assertIsNotNone(info['relay_port'])
|
self.assertIsNotNone(info['repo_id'])
|
||||||
self.assertIsNotNone(info['encrypted'])
|
self.assertIsNotNone(info['relay_port'])
|
||||||
self.assertIsNotNone(info['repo_name'])
|
self.assertIsNotNone(info['encrypted'])
|
||||||
self.assertIsNotNone(info['relay_id'])
|
self.assertIsNotNone(info['repo_name'])
|
||||||
self.assertIsNotNone(info['email'])
|
self.assertIsNotNone(info['relay_id'])
|
||||||
|
self.assertIsNotNone(info['email'])
|
||||||
|
|
||||||
def test_list_virtual_repos(self):
|
def test_list_virtual_repos(self):
|
||||||
# TODO: we need to create at least on virtual repo first
|
# TODO: we need to create at least on virtual repo first
|
||||||
|
@ -1,232 +1,30 @@
|
|||||||
#coding: UTF-8
|
#coding: UTF-8
|
||||||
|
|
||||||
import unittest
|
from tests.common.utils import urljoin
|
||||||
from urllib import urlencode, quote
|
|
||||||
from tests.common.utils import apiurl, randstring, urljoin
|
|
||||||
from tests.api.apitestbase import ApiTestBase
|
from tests.api.apitestbase import ApiTestBase
|
||||||
from tests.api.urls import SHARED_LINKS_URL, SHARED_LIBRARIES_URL, \
|
from tests.api.urls import SHARED_LINKS_URL, SHARED_LIBRARIES_URL, \
|
||||||
BESHARED_LIBRARIES_URL, SHARED_FILES_URL, F_URL, S_F_URL
|
BESHARED_LIBRARIES_URL, SHARED_FILES_URL, F_URL, S_F_URL
|
||||||
|
|
||||||
class SharesApiTest(ApiTestBase):
|
class SharesApiTest(ApiTestBase):
|
||||||
use_test_group = True
|
|
||||||
use_test_repo = True
|
|
||||||
|
|
||||||
def create_file(self, fname=None):
|
|
||||||
data = {'operation': 'create'}
|
|
||||||
fname = fname or ('文件 %s.txt' % randstring(10))
|
|
||||||
fpath = '/' + fname
|
|
||||||
query = urlencode(dict(p=fpath))
|
|
||||||
furl = self.test_file_url + '?' + query
|
|
||||||
res = self.post(furl, data=data, expected=201)
|
|
||||||
self.assertEqual(res.text, '"success"')
|
|
||||||
return fname, furl
|
|
||||||
|
|
||||||
def test_create_file_shared_link(self):
|
def test_create_file_shared_link(self):
|
||||||
fname, _ = self.create_file()
|
with self.get_tmp_repo() as repo:
|
||||||
fsurl = urljoin(self.test_file_url, 'shared-link')
|
fname, _ = self.create_file(repo)
|
||||||
data = {
|
fsurl = urljoin(repo.file_url, 'shared-link')
|
||||||
'type': 'f',
|
data = {
|
||||||
'p': '/' + fname,
|
'type': 'f',
|
||||||
}
|
'p': '/' + fname,
|
||||||
res = self.put(fsurl, data=data, expected=201)
|
}
|
||||||
self.assertRegexpMatches(res.headers['Location'], \
|
res = self.put(fsurl, data=data, expected=201)
|
||||||
r'http(.*)/f/(\w{10,10})/')
|
self.assertRegexpMatches(res.headers['Location'], \
|
||||||
|
r'http(.*)/f/(\w{10,10})/')
|
||||||
|
|
||||||
res = self.get(SHARED_LINKS_URL).json()
|
res = self.get(SHARED_LINKS_URL).json()
|
||||||
self.assertNotEmpty(res)
|
self.assertNotEmpty(res)
|
||||||
for fileshare in res['fileshares']:
|
for fileshare in res['fileshares']:
|
||||||
self.assertIsNotNone(fileshare['username'])
|
self.assertIsNotNone(fileshare['username'])
|
||||||
self.assertIsNotNone(fileshare['repo_id'])
|
self.assertIsNotNone(fileshare['repo_id'])
|
||||||
#self.assertIsNotNone(fileshare['ctime'])
|
#self.assertIsNotNone(fileshare['ctime'])
|
||||||
self.assertIsNotNone(fileshare['s_type'])
|
self.assertIsNotNone(fileshare['s_type'])
|
||||||
self.assertIsNotNone(fileshare['token'])
|
self.assertIsNotNone(fileshare['token'])
|
||||||
self.assertIsNotNone(fileshare['view_cnt'])
|
self.assertIsNotNone(fileshare['view_cnt'])
|
||||||
self.assertIsNotNone(fileshare['path'])
|
self.assertIsNotNone(fileshare['path'])
|
||||||
|
|
||||||
|
|
||||||
# def test_create_directory_shared_link(self):
|
|
||||||
# data = { 'operation': 'mkdir' }
|
|
||||||
# durl = self.test_dir_url + u'?p=/test_create_shared_link_d'
|
|
||||||
# self.post(durl, data=data)
|
|
||||||
# self.post(durl, data=data)
|
|
||||||
# fsurl = self.test_file_url + u'shared-link/'
|
|
||||||
# data = { 'type': 'd', 'p': '/test_create_shared_link_d' }
|
|
||||||
# res = self.put(fsurl, data=data)
|
|
||||||
# self.assertEqual(res.status_code, 201)
|
|
||||||
# self.assertRegexpMatches(res.headers['Location'], \
|
|
||||||
# r'http(.*)/d/(\w{10,10})/')
|
|
||||||
|
|
||||||
# def test_remove_shared_link(self):
|
|
||||||
# data = { 'operation': 'create' }
|
|
||||||
# furl = self.test_file_url + u'?p=/test_remove_shared_link_f'
|
|
||||||
# self.post(furl, data=data)
|
|
||||||
# fsurl = self.test_file_url + u'shared-link/'
|
|
||||||
# data = { 'type': 'f', 'p': '/test_remove_shared_link_f' }
|
|
||||||
# res = self.put(fsurl, data=data)
|
|
||||||
# import re
|
|
||||||
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
|
||||||
# fturl = SHARED_LINKS_URL + u'?t=' + t
|
|
||||||
# res = self.delete(fturl)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# self.assertEqual(res.text, u'{}')
|
|
||||||
|
|
||||||
# def test_get_shared_file_url(self):
|
|
||||||
# data = { 'operation': 'create' }
|
|
||||||
# furl = self.test_file_url + u'?p=/test_visit_shared_link_f'
|
|
||||||
# self.post(furl, data=data)
|
|
||||||
# fsurl = self.test_file_url + u'shared-link/'
|
|
||||||
# data = { 'type': 'f', 'p': '/test_visit_shared_link_f' }
|
|
||||||
# res = self.put(fsurl, data=data)
|
|
||||||
# import re
|
|
||||||
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
|
||||||
# fdurl = F_URL + t + u'/'
|
|
||||||
# res = self.get(fdurl)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# self.assertRegexpMatches(res.text, r'"http(.*)/files/\w{8,8}/(.*)"')
|
|
||||||
|
|
||||||
# def test_get_shared_file_detail(self):
|
|
||||||
# data = { 'operation': 'create' }
|
|
||||||
# furl = self.test_file_url + u'?p=/test_visitd_shared_link_f'
|
|
||||||
# self.post(furl, data=data)
|
|
||||||
# fsurl = self.test_file_url + u'shared-link/'
|
|
||||||
# data = { 'type': 'f', 'p': '/test_visitd_shared_link_f' }
|
|
||||||
# res = self.put(fsurl, data=data)
|
|
||||||
# import re
|
|
||||||
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
|
||||||
# fdurl = F_URL + t + u'/detail/'
|
|
||||||
# res = self.get(fdurl)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# json = res.json()
|
|
||||||
# self.assertIsNotNone(json)
|
|
||||||
# self.assertIsNotNone(json['repo_id'])
|
|
||||||
# self.assertIsNotNone(json['name'])
|
|
||||||
# self.assertIsNotNone(json['size'])
|
|
||||||
# self.assertIsNotNone(json['path'])
|
|
||||||
# self.assertIsNotNone(json['type'])
|
|
||||||
# self.assertIsNotNone(json['mtime'])
|
|
||||||
# self.assertIsNotNone(json['id'])
|
|
||||||
|
|
||||||
# def test_get_private_shared_file_url(self):
|
|
||||||
# if True: #todo: override this
|
|
||||||
# return
|
|
||||||
# data = { 'operation': 'create' }
|
|
||||||
# furl = self.test_file_url + u'?p=/test_visit_shared_link_sf'
|
|
||||||
# self.post(furl, data=data)
|
|
||||||
# fsurl = self.test_file_url + u'shared-link/'
|
|
||||||
# data = { 'type': 'f', 'p': '/test_visit_shared_link_sf' }
|
|
||||||
# res = self.put(fsurl, data=data)
|
|
||||||
# import re
|
|
||||||
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
|
||||||
# fdurl = S_F_URL + t + u'/'
|
|
||||||
# res = self.get(fdurl)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# self.assertRegexpMatches(res.text, r'"http(.*)/files/\w{8,8}/(.*)"')
|
|
||||||
|
|
||||||
# def test_get_private_shared_file_detail(self):
|
|
||||||
# if True: #todo: override this
|
|
||||||
# return
|
|
||||||
# data = { 'operation': 'create' }
|
|
||||||
# furl = self.test_file_url + u'?p=/test_visitd_shared_link_sf'
|
|
||||||
# self.post(furl, data=data)
|
|
||||||
# fsurl = self.test_file_url + u'shared-link/'
|
|
||||||
# data = { 'type': 'f', 'p': '/test_visitd_shared_link_sf' }
|
|
||||||
# res = self.put(fsurl, data=data)
|
|
||||||
# import re
|
|
||||||
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
|
||||||
# fdurl = S_F_URL + t + u'/detail/'
|
|
||||||
# res = self.get(fdurl)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# json = res.json()
|
|
||||||
# self.assertIsNotNone(json)
|
|
||||||
# self.assertIsNotNone(json['repo_id'])
|
|
||||||
# self.assertIsNotNone(json['name'])
|
|
||||||
# self.assertIsNotNone(json['size'])
|
|
||||||
# self.assertIsNotNone(json['path'])
|
|
||||||
# self.assertIsNotNone(json['type'])
|
|
||||||
# self.assertIsNotNone(json['mtime'])
|
|
||||||
# self.assertIsNotNone(json['id'])
|
|
||||||
|
|
||||||
# def test_remove_shared_file(self):
|
|
||||||
# if True: #todo: override this
|
|
||||||
# return
|
|
||||||
# data = { 'operation': 'create' }
|
|
||||||
# furl = self.test_file_url + u'?p=/test_remove_shared_file'
|
|
||||||
# self.post(furl, data=data)
|
|
||||||
# fsurl = self.test_file_url + u'shared-link/'
|
|
||||||
# data = { 'type': 'f', 'p': '/test_remove_shared_file' }
|
|
||||||
# res = self.put(fsurl, data=data)
|
|
||||||
# import re
|
|
||||||
# t = re.match(r'http(.*)/f/(\w{10,10})/', res.headers['Location']).group(2)
|
|
||||||
# fturl = SHARED_FILES_URL + u'?t=' + t
|
|
||||||
# res = self.delete(fturl)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# self.assertEqual(res.text, u'{}')
|
|
||||||
|
|
||||||
# def test_list_shared_libraries(self):
|
|
||||||
# res = self.get(SHARED_LIBRARIES_URL)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# json = res.json()
|
|
||||||
# self.assertIsNotNone(json)
|
|
||||||
# for repo in json:
|
|
||||||
# self.assertIsNotNone(repo['repo_id'])
|
|
||||||
# self.assertIsNotNone(repo['share_type'])
|
|
||||||
# self.assertIsNotNone(repo['permission'])
|
|
||||||
# self.assertIsNotNone(repo['encrypted'])
|
|
||||||
# self.assertIsNotNone(repo['user'])
|
|
||||||
# self.assertIsNotNone(repo['last_modified'])
|
|
||||||
# self.assertIsNotNone(repo['repo_desc'])
|
|
||||||
# self.assertIsNotNone(repo['group_id'])
|
|
||||||
# self.assertIsNotNone(repo['repo_name'])
|
|
||||||
|
|
||||||
# def test_list_beshared_libraries(self):
|
|
||||||
# res = self.get(BESHARED_LIBRARIES_URL)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# json = res.json()
|
|
||||||
# self.assertIsNotNone(json)
|
|
||||||
# for repo in json:
|
|
||||||
# self.assertIsNotNone(repo['user'])
|
|
||||||
# self.assertIsNotNone(repo['repo_id'])
|
|
||||||
# self.assertIsNotNone(repo['share_type'])
|
|
||||||
# self.assertIsNotNone(repo['permission'])
|
|
||||||
# self.assertIsNotNone(repo['encrypted'])
|
|
||||||
# self.assertIsNotNone(repo['last_modified'])
|
|
||||||
# self.assertIsNotNone(repo['repo_desc'])
|
|
||||||
# self.assertIsNotNone(repo['group_id'])
|
|
||||||
# self.assertIsNotNone(repo['repo_name'])
|
|
||||||
# self.assertIsNotNone(repo['is_virtual'])
|
|
||||||
|
|
||||||
# def test_share_library(self):
|
|
||||||
# data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid , \
|
|
||||||
# 'permission': 'rw' }
|
|
||||||
# slurl = SHARED_LIBRARIES_URL + str(self.test_repo_id) + u'/'
|
|
||||||
# res = self.put(slurl, params=data)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# self.assertEqual(res.text, u'"success"')
|
|
||||||
|
|
||||||
# def test_un_share_library(self):
|
|
||||||
# data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid , \
|
|
||||||
# 'permission': 'rw' }
|
|
||||||
# slurl = SHARED_LIBRARIES_URL + str(self.test_repo_id) + u'/'
|
|
||||||
# data = { 'share_type': 'group', 'user': USERNAME, 'group_id': self.gid }
|
|
||||||
# self.put(slurl, params=data)
|
|
||||||
# res = self.delete(slurl, params=data)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# self.assertEqual(res.text, u'"success"')
|
|
||||||
|
|
||||||
# def test_list_shared_files(self):
|
|
||||||
# res = self.get(SHARED_FILES_URL)
|
|
||||||
# self.assertEqual(res.status_code, 200)
|
|
||||||
# json = res.json()
|
|
||||||
# self.assertIsNotNone(json)
|
|
||||||
# self.assertIsNotNone(json['priv_share_in'])
|
|
||||||
# self.assertIsNotNone(json['priv_share_out'])
|
|
||||||
|
|
||||||
# for sfiles in zip(json['priv_share_in'], json['priv_share_out']):
|
|
||||||
# for sfile in sfiles:
|
|
||||||
# self.assertIsNotNone(sfile['s_type'])
|
|
||||||
# self.assertIsNotNone(sfile['repo_id'])
|
|
||||||
# self.assertIsNotNone(sfile['permission'])
|
|
||||||
# self.assertIsNotNone(sfile['to_user'])
|
|
||||||
# self.assertIsNotNone(sfile['token'])
|
|
||||||
# self.assertIsNotNone(sfile['from_user'])
|
|
||||||
# self.assertIsNotNone(sfile['path'])
|
|
||||||
|
@ -1,41 +0,0 @@
|
|||||||
from apitestbase import STARREDFILES_URL, get_authed_instance
|
|
||||||
from apitestbase import DEFAULT_LIBRARY_URL, LIBRARIES_URL
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
class StarredFilesApiTestCase(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.requests = get_authed_instance()
|
|
||||||
self.assertIsNotNone(self.requests)
|
|
||||||
res = self.requests.post(DEFAULT_LIBRARY_URL)
|
|
||||||
self.rid = res.json()['repo_id']
|
|
||||||
self.rurl = LIBRARIES_URL + str(self.rid) + u'/'
|
|
||||||
self.furl = self.rurl + u'file/'
|
|
||||||
|
|
||||||
def test_list_starred_files_api(self):
|
|
||||||
res = self.requests.get(STARREDFILES_URL)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
|
|
||||||
def test_star_file_api(self):
|
|
||||||
data = { 'operation': 'create' }
|
|
||||||
furl = self.furl + '?p=/test.c'
|
|
||||||
self.requests.post(furl, data=data)
|
|
||||||
data = { 'repo_id': self.rid, 'p': '/test.c' }
|
|
||||||
res = self.requests.post(STARREDFILES_URL, data=data)
|
|
||||||
self.assertEqual(res.status_code, 201)
|
|
||||||
self.assertEqual(res.text, u'"success"')
|
|
||||||
|
|
||||||
def test_un_star_file_api(self):
|
|
||||||
data = { 'operation': 'create' }
|
|
||||||
furl = self.furl + '?p=/test.c'
|
|
||||||
self.requests.post(furl, data=data)
|
|
||||||
data = { 'repo_id': self.rid, 'p': '/test.c' }
|
|
||||||
res = self.requests.post(STARREDFILES_URL, data=data)
|
|
||||||
res = self.requests.delete(STARREDFILES_URL, params=data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
self.assertEqual(res.text, u'"success"')
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
@ -1,38 +0,0 @@
|
|||||||
from apitestbase import USERNAME, get_authed_instance
|
|
||||||
from apitestbase import USERMSGS_URL, USERMSGS_COUNT_URL
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
class UserMsgsApiTestCase(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.requests = get_authed_instance()
|
|
||||||
self.assertIsNotNone(self.requests)
|
|
||||||
|
|
||||||
def test_list_user_msgs_api(self):
|
|
||||||
res = self.requests.get(USERMSGS_URL)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertEqual(json['to_email'], USERNAME)
|
|
||||||
self.assertIsNotNone(json['next_page'])
|
|
||||||
self.assertIsNotNone(json['msgs'])
|
|
||||||
|
|
||||||
def test_reply_user_msg_api(self):
|
|
||||||
data = { 'id': '0', 'message': 'test' }
|
|
||||||
res = self.requests.post(USERMSGS_URL, data=data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertIsNotNone(json['msgid'])
|
|
||||||
|
|
||||||
def test_count_unseen_msgs_api(self):
|
|
||||||
data = { 'id': '0', 'message': 'test' }
|
|
||||||
self.requests.post(USERMSGS_URL, data=data)
|
|
||||||
res = self.requests.get(USERMSGS_COUNT_URL, data=data)
|
|
||||||
self.assertEqual(res.status_code, 200)
|
|
||||||
json = res.json()
|
|
||||||
self.assertIsNotNone(json)
|
|
||||||
self.assertGreater(json['count'], 0)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main(verbosity=2)
|
|
@ -1,16 +1,12 @@
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
BASE_URL = os.getenv('TEST_BASE_URL', u'http://127.0.0.1:8000')
|
BASE_URL = os.getenv('SEAHUB_TEST_BASE_URL', u'http://127.0.0.1:8000')
|
||||||
USERNAME = os.getenv('TEST_USERNAME', u'test@seahubtest.com')
|
USERNAME = os.getenv('SEAHUB_TEST_USERNAME', u'test@seafiletest.com')
|
||||||
PASSWORD = os.getenv('TEST_PASSWORD', u'testtest')
|
PASSWORD = os.getenv('SEAHUB_TEST_PASSWORD', u'testtest')
|
||||||
|
ADMIN_USERNAME = os.getenv('SEAHUB_TEST_ADMIN_USERNAME', u'admin@seafiletest.com')
|
||||||
|
ADMIN_PASSWORD = os.getenv('SEAHUB_TEST_ADMIN_PASSWORD', u'adminadmin')
|
||||||
|
|
||||||
ADMIN_USERNAME = os.getenv('TEST_ADMIN_USERNAME', u'admin@seahubtest.com')
|
if os.getenv('SEAHUB_TEST_IS_PRO', u'') == u'':
|
||||||
ADMIN_PASSWORD = os.getenv('TEST_ADMIN_PASSWORD', u'adminadmin')
|
|
||||||
|
|
||||||
if BASE_URL[-1] != '/':
|
|
||||||
BASE_URL += '/'
|
|
||||||
|
|
||||||
if os.getenv('TEST_IS_PRO', u'') == u'':
|
|
||||||
IS_PRO = False
|
IS_PRO = False
|
||||||
else:
|
else:
|
||||||
S_PRO = True
|
S_PRO = True
|
||||||
|
@ -18,4 +18,4 @@ def urljoin(base, *args):
|
|||||||
return url
|
return url
|
||||||
|
|
||||||
def apiurl(*parts):
|
def apiurl(*parts):
|
||||||
return urljoin(BASE_URL, *parts)
|
return urljoin(BASE_URL, *parts)
|
||||||
|
@ -1,9 +1,15 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
: ${PYTHON=python}
|
: ${PYTHON=python}
|
||||||
export TEST_USERNAME="test@seahubtest.com"
|
|
||||||
export TEST_PASSWORD="testtest"
|
: ${SEAHUB_TEST_USERNAME="test@seafiletest.com"}
|
||||||
export TEST_ADMIN_USERNAME="admin@seahubtest.com"
|
: ${SEAHUB_TEST_PASSWORD="testtest"}
|
||||||
export TEST_ADMIN_PASSWORD="adminadmin"
|
: ${SEAHUB_TEST_ADMIN_USERNAME="admin@seafiletest.com"}
|
||||||
|
: ${SEAHUB_TEST_ADMIN_PASSWORD="adminadmin"}
|
||||||
|
|
||||||
|
export SEAHUB_TEST_USERNAME
|
||||||
|
export SEAHUB_TEST_PASSWORD
|
||||||
|
export SEAHUB_TEST_ADMIN_USERNAME
|
||||||
|
export SEAHUB_TEST_ADMIN_PASSWORD
|
||||||
|
|
||||||
# If you run this script on your local machine, you must set CCNET_CONF_DIR
|
# If you run this script on your local machine, you must set CCNET_CONF_DIR
|
||||||
# and SEAFILE_CONF_DIR like this:
|
# and SEAFILE_CONF_DIR like this:
|
||||||
@ -31,19 +37,20 @@ function init() {
|
|||||||
$PYTHON ./manage.py syncdb
|
$PYTHON ./manage.py syncdb
|
||||||
|
|
||||||
# create normal user
|
# create normal user
|
||||||
$PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${TEST_USERNAME}', '${TEST_PASSWORD}', 0, 1);"
|
$PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${SEAHUB_TEST_USERNAME}', '${SEAHUB_TEST_PASSWORD}', 0, 1);"
|
||||||
# create admin
|
# create admin
|
||||||
$PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${TEST_ADMIN_USERNAME}', '${TEST_ADMIN_PASSWORD}', 1, 1);"
|
$PYTHON -c "import ccnet; pool = ccnet.ClientPool('${CCNET_CONF_DIR}'); ccnet_threaded_rpc = ccnet.CcnetThreadedRpcClient(pool, req_pool=True); ccnet_threaded_rpc.add_emailuser('${SEAHUB_TEST_ADMIN_USERNAME}', '${SEAHUB_TEST_ADMIN_PASSWORD}', 1, 1);"
|
||||||
}
|
}
|
||||||
|
|
||||||
function start_seahub() {
|
function start_seahub() {
|
||||||
$PYTHON ./manage.py runserver 1>/dev/null 2>&1 &
|
# $PYTHON ./manage.py runserver 1>/dev/null 2>&1 &
|
||||||
|
$PYTHON ./manage.py runserver 2>&1 &
|
||||||
sleep 5
|
sleep 5
|
||||||
}
|
}
|
||||||
|
|
||||||
function run_tests() {
|
function run_tests() {
|
||||||
pushd tests
|
pushd tests
|
||||||
nosetests
|
nosetests "$nose_opts"
|
||||||
popd
|
popd
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,6 +62,8 @@ case $1 in
|
|||||||
start_seahub
|
start_seahub
|
||||||
;;
|
;;
|
||||||
"test")
|
"test")
|
||||||
|
shift
|
||||||
|
nose_opts=$@
|
||||||
run_tests
|
run_tests
|
||||||
;;
|
;;
|
||||||
*)
|
*)
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import unittest
|
import unittest
|
||||||
from common.common import BASE_URL, USERNAME, PASSWORD
|
from tests.common.common import BASE_URL, USERNAME, PASSWORD
|
||||||
from selenium import webdriver
|
from selenium import webdriver
|
||||||
from selenium.webdriver.common.keys import Keys
|
from selenium.webdriver.common.keys import Keys
|
||||||
|
|
||||||
@ -8,31 +8,31 @@ HOME_URL = BASE_URL + u'/home/my/'
|
|||||||
LOGOUT_URL = BASE_URL + u'/accounts/logout/'
|
LOGOUT_URL = BASE_URL + u'/accounts/logout/'
|
||||||
|
|
||||||
def get_logged_instance():
|
def get_logged_instance():
|
||||||
browser = webdriver.PhantomJS()
|
browser = webdriver.PhantomJS()
|
||||||
browser.get(LOGIN_URL)
|
browser.get(LOGIN_URL)
|
||||||
username_input = browser.find_element_by_name('username')
|
username_input = browser.find_element_by_name('username')
|
||||||
password_input = browser.find_element_by_name('password')
|
password_input = browser.find_element_by_name('password')
|
||||||
username_input.send_keys(USERNAME)
|
username_input.send_keys(USERNAME)
|
||||||
password_input.send_keys(PASSWORD)
|
password_input.send_keys(PASSWORD)
|
||||||
password_input.send_keys(Keys.RETURN)
|
password_input.send_keys(Keys.RETURN)
|
||||||
if (browser.current_url != HOME_URL):
|
if browser.current_url != HOME_URL:
|
||||||
browser.quit()
|
browser.quit()
|
||||||
return None
|
return None
|
||||||
return browser
|
return browser
|
||||||
|
|
||||||
class LoginTestCase(unittest.TestCase):
|
class LoginTestCase(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.browser = get_logged_instance()
|
self.browser = get_logged_instance()
|
||||||
self.assertIsNotNone(self.browser)
|
self.assertIsNotNone(self.browser)
|
||||||
self.addCleanup(self.browser.quit)
|
self.addCleanup(self.browser.quit)
|
||||||
|
|
||||||
def test_login(self):
|
def test_login(self):
|
||||||
self.assertRegexpMatches(self.browser.current_url, HOME_URL)
|
self.assertRegexpMatches(self.browser.current_url, HOME_URL)
|
||||||
|
|
||||||
def test_logout(self):
|
def test_logout(self):
|
||||||
myinfo_bar = self.browser.find_element_by_css_selector('#my-info')
|
myinfo_bar = self.browser.find_element_by_css_selector('#my-info')
|
||||||
logout_input = self.browser.find_element_by_css_selector('a#logout')
|
logout_input = self.browser.find_element_by_css_selector('a#logout')
|
||||||
myinfo_bar.click()
|
myinfo_bar.click()
|
||||||
logout_input.click()
|
logout_input.click()
|
||||||
self.assertRegexpMatches(self.browser.current_url, LOGOUT_URL)
|
self.assertRegexpMatches(self.browser.current_url, LOGOUT_URL)
|
||||||
|
Loading…
Reference in New Issue
Block a user