2019-04-28 06:16:36 +00:00
|
|
|
import pytest
|
|
|
|
import os
|
|
|
|
import time
|
|
|
|
from tests.config import USER
|
|
|
|
from seaserv import seafile_api as api
|
|
|
|
|
|
|
|
file_name = 'test.txt'
|
|
|
|
new_file_name = 'new_test.txt'
|
|
|
|
empty_file_name = 'empty_test.txt'
|
|
|
|
new_empty_file_name = 'new_empty_test.txt'
|
|
|
|
file_content = 'test file content'
|
|
|
|
file_path = os.getcwd() + '/' + file_name
|
|
|
|
dir_name = "test_dir"
|
|
|
|
|
|
|
|
def create_the_file ():
|
|
|
|
with open(file_path, 'w') as fp:
|
|
|
|
fp.write(file_content)
|
|
|
|
|
|
|
|
def test_file_operation():
|
|
|
|
t_repo_version = 1
|
|
|
|
t_repo_id1 = api.create_repo('test_file_operation1', '', USER, passwd = None)
|
|
|
|
|
|
|
|
create_the_file()
|
|
|
|
|
|
|
|
# test post_file
|
|
|
|
assert api.post_file(t_repo_id1, file_path, '/', file_name, USER) == 0
|
|
|
|
t_file_id = api.get_file_id_by_path(t_repo_id1, '/' + file_name)
|
|
|
|
t_file_size = len(file_content)
|
|
|
|
assert t_file_size == api.get_file_size(t_repo_id1, t_repo_version, t_file_id)
|
|
|
|
|
|
|
|
# test post_dir
|
|
|
|
assert api.post_dir(t_repo_id1, '/', dir_name, USER) == 0
|
|
|
|
|
|
|
|
# test copy_file (synchronize)
|
|
|
|
t_copy_file_result1 = api.copy_file(t_repo_id1, '/', file_name, t_repo_id1, '/', new_file_name, USER, 0, 1)
|
|
|
|
assert t_copy_file_result1
|
|
|
|
assert t_copy_file_result1.task_id is None
|
|
|
|
assert not t_copy_file_result1.background
|
|
|
|
t_file_id = api.get_file_id_by_path(t_repo_id1, '/' + new_file_name)
|
|
|
|
assert t_file_size == api.get_file_size(t_repo_id1, t_repo_version, t_file_id)
|
|
|
|
|
|
|
|
# test copy_file (asynchronous)
|
|
|
|
t_repo_id2 = api.create_repo('test_file_operation2', '', USER, passwd = None)
|
|
|
|
t_copy_file_result2 = api.copy_file(t_repo_id1, '/', file_name, t_repo_id2, '/', file_name, USER, 1, 0)
|
|
|
|
assert t_copy_file_result2
|
|
|
|
assert t_copy_file_result2.task_id
|
|
|
|
assert t_copy_file_result2.background
|
|
|
|
while True:
|
|
|
|
time.sleep(0.1)
|
|
|
|
t_copy_task = api.get_copy_task(t_copy_file_result2.task_id)
|
|
|
|
if t_copy_task.successful:
|
|
|
|
break;
|
|
|
|
t_file_id = api.get_file_id_by_path(t_repo_id2, '/' + file_name)
|
|
|
|
assert t_file_size == api.get_file_size(t_repo_id2, t_repo_version, t_file_id)
|
|
|
|
|
|
|
|
# test move_file (synchronize)
|
|
|
|
t_move_file_info1 = api.get_dirent_by_path(t_repo_id1, '/' + new_file_name)
|
|
|
|
t_move_file_result1 = api.move_file(t_repo_id1, '/', new_file_name, t_repo_id1, '/' + dir_name, new_file_name, 1, USER, 0, 1)
|
|
|
|
assert t_move_file_result1
|
|
|
|
t_move_file_info2 = api.get_dirent_by_path(t_repo_id1, '/' + dir_name + '/' + new_file_name)
|
|
|
|
assert t_move_file_info1.mtime == t_move_file_info2.mtime
|
|
|
|
t_file_id = api.get_file_id_by_path(t_repo_id1, '/' + new_file_name)
|
|
|
|
assert t_file_id is None
|
|
|
|
|
|
|
|
# test move_file (asynchronous)
|
|
|
|
t_move_file_result2 = api.move_file(t_repo_id1, '/', file_name, t_repo_id2, '/' , new_file_name, 1, USER, 1, 0)
|
|
|
|
assert t_move_file_result2
|
|
|
|
assert t_move_file_result2.task_id
|
|
|
|
assert t_move_file_result2.background
|
|
|
|
while True:
|
|
|
|
time.sleep(0.1)
|
|
|
|
t_move_task = api.get_copy_task(t_move_file_result2.task_id)
|
|
|
|
if t_move_task.successful:
|
|
|
|
break
|
|
|
|
t_file_id = api.get_file_id_by_path(t_repo_id2, '/' + new_file_name)
|
|
|
|
assert t_file_size == api.get_file_size(t_repo_id2, t_repo_version, t_file_id)
|
|
|
|
|
|
|
|
# test post_empty_file
|
|
|
|
assert api.post_empty_file(t_repo_id1, '/' + dir_name, empty_file_name, USER) == 0
|
|
|
|
t_file_id = api.get_file_id_by_path(t_repo_id1, '/' + dir_name + '/' + empty_file_name)
|
|
|
|
assert api.get_file_size(t_repo_id1, t_repo_version, t_file_id) == 0
|
|
|
|
|
|
|
|
# test rename_file
|
|
|
|
assert api.rename_file(t_repo_id1, '/' + dir_name, empty_file_name, new_empty_file_name, USER) == 0
|
|
|
|
|
|
|
|
#test put_file
|
|
|
|
t_new_file_id = api.put_file(t_repo_id1, file_path, '/' + dir_name, new_empty_file_name, USER, None)
|
|
|
|
assert t_new_file_id
|
|
|
|
|
|
|
|
# test get_file_revisions
|
|
|
|
t_commit_list = api.get_file_revisions(t_repo_id2, None, '/' + file_name, 2)
|
|
|
|
assert t_commit_list
|
|
|
|
assert len(t_commit_list) == 2
|
|
|
|
assert t_commit_list[0].creator_name == USER
|
|
|
|
|
|
|
|
# test del_file
|
|
|
|
assert api.del_file(t_repo_id2, '/', file_name, USER) == 0
|
|
|
|
|
|
|
|
# test get_deleted
|
|
|
|
t_deleted_file_list = api.get_deleted(t_repo_id2, 1)
|
|
|
|
assert t_deleted_file_list
|
|
|
|
assert len(t_deleted_file_list) == 2
|
|
|
|
assert t_deleted_file_list[0].obj_name == file_name
|
|
|
|
assert t_deleted_file_list[0].basedir == '/'
|
|
|
|
|
2019-05-18 08:53:59 +00:00
|
|
|
# test del a non-exist file. should return 0.
|
|
|
|
assert api.del_file(t_repo_id2, '/', file_name, USER) == 0
|
|
|
|
|
2019-04-28 06:16:36 +00:00
|
|
|
assert api.del_file(t_repo_id1, '/' + dir_name, new_empty_file_name, USER) == 0
|
|
|
|
assert api.del_file(t_repo_id1, '/' + dir_name, new_file_name, USER) == 0
|
|
|
|
assert api.del_file(t_repo_id2, '/', new_file_name, USER) == 0
|
|
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
api.remove_repo(t_repo_id1)
|