1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-08-26 18:51:03 +00:00
seahub/tests/api/test_files.py
2025-08-15 15:26:32 +08:00

426 lines
18 KiB
Python

#coding: UTF-8
"""
Test file/dir operations.
"""
import posixpath
import pytest
import urllib.request, urllib.parse, urllib.error
from urllib.parse import urlencode, quote
import urllib.parse
from tests.common.utils import randstring, urljoin
from tests.api.apitestbase import ApiTestBase
from tests.api.urls import REPOS_URL
class FilesApiTest(ApiTestBase):
def test_rename_file(self):
with self.get_tmp_repo() as repo:
name, furl = self.create_file(repo)
data = {
'operation': 'rename',
'newname': name + randstring(),
}
res = self.post(furl, data=data)
self.assertRegex(res.text, r'"http(.*)"')
def test_remove_file(self):
with self.get_tmp_repo() as repo:
_, furl = self.create_file(repo)
res = self.delete(furl)
self.assertEqual(res.text, '"success"')
def test_move_file(self):
with self.get_tmp_repo() as repo:
# TODO: create another repo here, and use it as dst_repo
# create sub folder(dpath)
dpath, _ = self.create_dir(repo)
# create tmp file in sub folder(dpath)
tmp_file = 'tmp_file.txt'
file_path = dpath + '/' + tmp_file
furl = repo.get_filepath_url(file_path)
data = {'operation': 'create'}
res = self.post(furl, data=data, expected=201)
# copy tmp file from sub folder(dpath) to dst dir('/')
data = {
'dst_repo': repo.repo_id,
'dst_dir': '/',
'operation': 'copy',
}
u = urllib.parse.urlparse(furl)
parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
assert tmp_file in res.text
# get info of copied file in dst dir('/')
fdurl = repo.file_url + 'detail/?p=/%s' % quote(tmp_file)
detail = self.get(fdurl).json()
self.assertIsNotNone(detail)
self.assertIsNotNone(detail['id'])
# copy tmp file from sub folder(dpath) to dst dir('/') again
# for test can rename file if a file with the same name is dst dir
data = {
'dst_repo': repo.repo_id,
'dst_dir': '/',
'operation': 'copy',
}
u = urllib.parse.urlparse(furl)
parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
assert 'tmp_file (1).txt' in res.text
# copy tmp file from sub folder(dpath) to dst dir('/') again
# for test can rename file if a file with the same name is dst dir
data = {
'dst_repo': repo.repo_id,
'dst_dir': '/',
'operation': 'copy',
}
u = urllib.parse.urlparse(furl)
parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
assert 'tmp_file (2).txt' in res.text
# then move file to dst dir
data = {
'dst_repo': repo.repo_id,
'dst_dir': '/',
'operation': 'move',
}
u = urllib.parse.urlparse(furl)
parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
assert 'tmp_file%20%283%29.txt' in res.text
def test_copy_file(self):
with self.get_tmp_repo() as repo:
# TODO: create another repo here, and use it as dst_repo
# create sub folder(dpath)
dpath, _ = self.create_dir(repo)
# create tmp file in sub folder(dpath)
tmp_file = 'tmp_file.txt'
file_path = dpath + '/' + tmp_file
furl = repo.get_filepath_url(file_path)
data = {'operation': 'create'}
res = self.post(furl, data=data, expected=201)
# copy tmp file from sub folder(dpath) to dst dir('/')
data = {
'dst_repo': repo.repo_id,
'dst_dir': '/',
'operation': 'copy',
}
u = urllib.parse.urlparse(furl)
parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
assert tmp_file in res.text
# get info of copied file in dst dir('/')
fdurl = repo.file_url + 'detail/?p=/%s' % quote(tmp_file)
detail = self.get(fdurl).json()
self.assertIsNotNone(detail)
self.assertIsNotNone(detail['id'])
# copy tmp file from sub folder(dpath) to dst dir('/') again
# for test can rename file if a file with the same name is dst dir
data = {
'dst_repo': repo.repo_id,
'dst_dir': '/',
'operation': 'copy',
}
u = urllib.parse.urlparse(furl)
parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
assert 'tmp_file (1).txt' in res.text
# copy tmp file from sub folder(dpath) to dst dir('/') again
# for test can rename file if a file with the same name is dst dir
data = {
'dst_repo': repo.repo_id,
'dst_dir': '/',
'operation': 'copy',
}
u = urllib.parse.urlparse(furl)
parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
assert 'tmp_file (2).txt' in res.text
def test_download_file(self):
with self.get_tmp_repo() as repo:
fname, furl = self.create_file(repo)
res = self.get(furl)
self.assertRegex(res.text, '"http(.*)/%s"' % quote(fname))
def test_download_file_without_reuse_token(self):
with self.get_tmp_repo() as repo:
fname, furl = self.create_file(repo)
res = self.get(furl)
self.assertRegex(res.text, '"http(.*)/%s"' % quote(fname))
# download for the first time
url = urllib.request.urlopen(res.text.strip('"'))
code = url.getcode()
self.assertEqual(code, 200)
# download for the second time
try:
url = urllib.request.urlopen(res.text.strip('"'))
except Exception as e:
assert 'HTTP Error 403: Forbidden' in str(e)
# url = urllib.request.urlopen(res.text.strip('"'))
# code = url.getcode()
# self.assertEqual(code, 400)
def test_download_file_with_reuse_token(self):
with self.get_tmp_repo() as repo:
fname, furl = self.create_file(repo)
res = self.get(furl + '&reuse=1')
self.assertRegex(res.text, '"http(.*)/%s"' % quote(fname))
# download for the first time
url = urllib.request.urlopen(res.text.strip('"'))
code = url.getcode()
self.assertEqual(code, 200)
# download for the second time
url = urllib.request.urlopen(res.text.strip('"'))
code = url.getcode()
self.assertEqual(code, 200)
def test_download_file_from_history(self):
with self.get_tmp_repo() as repo:
fname, _ = self.create_file(repo)
file_history_url = urljoin(repo.repo_url, 'history/') + \
'?p=/%s' % quote(fname)
res = self.get(file_history_url).json()
commit_id = res['commits'][0]['id']
self.assertEqual(len(commit_id), 40)
data = {
'p': fname,
'commit_id': commit_id,
}
query = '?' + urlencode(data)
res = self.get(repo.file_url + query)
self.assertRegex(res.text, r'"http(.*)/%s"' % quote(fname))
def test_get_file_detail(self):
with self.get_tmp_repo() as repo:
fname, _ = self.create_file(repo)
fdurl = repo.file_url + 'detail/?p=/%s' % quote(fname)
detail = self.get(fdurl).json()
self.assertIsNotNone(detail)
self.assertIsNotNone(detail['id'])
self.assertIsNotNone(detail['mtime'])
self.assertIsNotNone(detail['type'])
self.assertIsNotNone(detail['name'])
self.assertIsNotNone(detail['size'])
self.assertIsNotNone(detail['starred'])
self.assertIsNotNone(detail['last_modifier_email'])
self.assertIsNotNone(detail['last_modifier_name'])
self.assertIsNotNone(detail['last_modifier_contact_email'])
def test_get_file_history(self):
with self.get_tmp_repo() as repo:
fname, _ = self.create_file(repo)
fhurl = repo.file_url + 'history/?p=%s' % quote(fname)
history = self.get(fhurl).json()
for commit in history['commits']:
self.assertIsNotNone(commit['rev_file_size'])
#self.assertIsNotNone(commit['rev_file_id']) #allow null
self.assertIsNotNone(commit['ctime'])
self.assertIsNotNone(commit['creator_name'])
self.assertIsNotNone(commit['creator'])
self.assertIsNotNone(commit['root_id'])
#self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
#self.assertIsNotNone(commit['parent_id']) #allow null
self.assertIsNotNone(commit['new_merge'])
self.assertIsNotNone(commit['repo_id'])
self.assertIsNotNone(commit['desc'])
self.assertIsNotNone(commit['id'])
self.assertIsNotNone(commit['conflict'])
assert commit['user_info']['email'] == commit['creator_name']
self.assertIsNotNone(commit['user_info']['name'])
self.assertIsNotNone(commit['user_info']['contact_email'])
#self.assertIsNotNone(commit['second_parent_id']) #allow null
def test_get_upload_link(self):
with self.get_tmp_repo() as repo:
upload_url = urljoin(repo.repo_url, 'upload-link')
res = self.get(upload_url)
self.assertRegex(res.text, r'"http(.*)/upload-api/[^/]+"')
def test_get_upload_link_with_invalid_repo_id(self):
repo_url = urljoin(REPOS_URL, '12345678-1234-1234-1234-12345678901b')
upload_url = urljoin(repo_url, 'upload-link')
self.get(upload_url, expected=404)
def test_get_update_link(self):
with self.get_tmp_repo() as repo:
update_url = urljoin(repo.repo_url, 'update-link')
res = self.get(update_url)
self.assertRegex(res.text, r'"http(.*)/update-api/[^/]+"')
def test_get_update_link_with_invalid_repo_id(self):
repo_url = urljoin(REPOS_URL, '12345678-1234-1234-1234-12345678901b')
update_url = urljoin(repo_url, 'update-link')
self.get(update_url, expected=404)
# def test_upload_file(self):
# # XXX: requests has problems when post a file whose name contains
# # non-ascii data
# fname = 'file-upload-test %s.txt' % randstring()
# furl = self.test_file_url + '?p=/%s' % quote(fname)
# self.delete(furl)
# upload_url = self.test_repo_url + u'upload-link/'
# res = self.get(upload_url)
# upload_api_url = re.match(r'"(.*)"', res.text).group(1)
# files = {
# 'file': (fname, 'Some lines in this file'),
# 'parent_dir': '/',
# }
# res = self.post(upload_api_url, files=files)
# self.assertRegexpMatches(res.text, r'\w{40,40}')
# def test_update_file(self):
# fname = 'file-update-test %s.txt' % randstring()
# _, furl = self.create_file(fname=fname)
# update_url = self.test_repo_url + u'update-link/'
# res = self.get(update_url)
# update_api_url = re.match(r'"(.*)"', res.text).group(1)
# files = {
# 'file': ('filename', 'Updated content of this file'),
# 'target_file': '/test_update.c'
# }
# res = self.post(update_api_url, files=files)
# self.assertRegexpMatches(res.text, r'\w{40,40}')
def test_get_upload_blocks_link(self):
with self.get_tmp_repo() as repo:
upload_blks_url = urljoin(repo.repo_url, 'upload-blks-link')
res = self.get(upload_blks_url)
self.assertRegex(res.text, r'"http(.*)/upload-blks-api/[^/]+"')
def test_get_upload_blocks_link_with_invalid_repo_id(self):
repo_url = urljoin(REPOS_URL, '12345678-1234-1234-1234-12345678901b')
upload_blks_url = urljoin(repo_url, 'upload-blks-link')
self.get(upload_blks_url, expected=404)
def test_get_update_blocks_link(self):
with self.get_tmp_repo() as repo:
update_blks_url = urljoin(repo.repo_url, 'update-blks-link')
res = self.get(update_blks_url)
self.assertRegex(res.text, r'"http(.*)/update-blks-api/[^/]+"')
def test_get_update_blocks_link_with_invalid_repo_id(self):
repo_url = urljoin(REPOS_URL, '12345678-1234-1234-1234-12345678901b')
update_blks_url = urljoin(repo_url, 'update-blks-link')
self.get(update_blks_url, expected=404)
def test_only_list_dir(self):
with self.get_tmp_repo() as repo:
self.create_file(repo)
self.create_dir(repo)
dirents = self.get(repo.dir_url + '?t=d').json()
self.assertHasLen(dirents, 1)
for dirent in dirents:
self.assertIsNotNone(dirent['id'])
self.assertIsNotNone(dirent['name'])
self.assertEqual(dirent['type'], 'dir')
def test_only_list_file(self):
with self.get_tmp_repo() as repo:
self.create_file(repo)
self.create_dir(repo)
dirents = self.get(repo.dir_url + '?t=f').json()
self.assertHasLen(dirents, 1)
for dirent in dirents:
self.assertIsNotNone(dirent['id'])
self.assertIsNotNone(dirent['name'])
self.assertIsNotNone(dirent['size'])
self.assertEqual(dirent['type'], 'file')
def test_list_dir_and_file(self):
with self.get_tmp_repo() as repo:
self.create_file(repo)
self.create_dir(repo)
dirents = self.get(repo.dir_url).json()
self.assertHasLen(dirents, 2)
for dirent in dirents:
self.assertIsNotNone(dirent['id'])
self.assertIsNotNone(dirent['name'])
self.assertIn(dirent['type'], ('file', 'dir'))
if dirent['type'] == 'file':
self.assertIsNotNone(dirent['size'])
def test_list_recursive_dir(self):
with self.get_tmp_repo() as repo:
# create test dir
data = {'operation': 'mkdir'}
dir_list = ['/1/', '/1/2/', '/1/2/3/', '/4/', '/4/5/', '/6/']
for dpath in dir_list:
durl = repo.get_dirpath_url(dpath)
self.post(durl, data=data, expected=201)
# get recursive dir
dirents = self.get(repo.dir_url + '?t=d&recursive=1').json()
self.assertHasLen(dirents, len(dir_list))
for dirent in dirents:
self.assertIsNotNone(dirent['id'])
self.assertEqual(dirent['type'], 'dir')
full_path = posixpath.join(dirent['parent_dir'], dirent['name']) + '/'
self.assertIn(full_path, dir_list)
# get recursive dir with files info
# create test file
tmp_file_name = '%s.txt' % randstring()
self.create_file(repo, fname=tmp_file_name)
dirents = self.get(repo.dir_url + '?recursive=1').json()
self.assertHasLen(dirents, len(dir_list) + 1)
for dirent in dirents:
self.assertIsNotNone(dirent['id'])
if dirent['type'] == 'dir':
full_dir_path = posixpath.join(dirent['parent_dir'], dirent['name']) + '/'
self.assertIn(full_dir_path, dir_list)
else:
full_file_path = posixpath.join(dirent['parent_dir'], dirent['name'])
self.assertEqual('/' + tmp_file_name, full_file_path)
def test_remove_dir(self):
with self.get_tmp_repo() as repo:
_, durl = self.create_dir(repo)
res = self.delete(durl)
self.assertEqual(res.text, '"success"')
self.get(durl, expected=404)
@pytest.mark.xfail
def test_create_dir_with_parents(self):
with self.get_tmp_repo() as repo:
path = '/level1/level 2/level_3/目录4'
self.create_dir_with_parents(repo, path)
def create_dir_with_parents(self, repo, path):
data = {'operation': 'mkdir', 'create_parents': 'true'}
durl = repo.get_dirpath_url(path.encode('utf-8'))
self.post(durl, data=data, expected=201)
curpath = ''
# check the parents are created along the way
parts = path.split('/')
for i, name in enumerate(parts):
curpath += '/' + name
url = repo.get_dirpath_url(curpath.encode('utf-8'))
if i < len(parts) - 1:
assert self.get(url).json()[0]['name'] == parts[i+1]
else:
assert self.get(url).json() == []