1
0
mirror of https://github.com/haiwen/seafile-server.git synced 2025-04-28 19:35:10 +00:00
seafile-server/tests/test_file_operation/test_file_operation.py
feiniks c80bf17efb
Add batch del files RPC (#694)
* Add batch del files RPC

* Add pack multi-level files

* Go add pack multi-level files

* Use change set to batch del files

* Improve args

---------

Co-authored-by: 杨赫然 <heran.yang@seafile.com>
2024-09-13 14:58:45 +08:00

172 lines
7.2 KiB
Python

import pytest
import os
import time
import json
from tests.config import USER
from seaserv import seafile_api as api
file_name = 'test.txt'
new_file_name = 'new_test.txt'
new_file_name_2 = 'new_test_2.txt'
empty_file_name = 'empty_test.txt'
new_empty_file_name = 'new_empty_test.txt'
file_content = 'test file content'
file_path = os.getcwd() + '/' + file_name
dir_name = "test_dir"
def create_the_file ():
with open(file_path, 'w') as fp:
fp.write(file_content)
@pytest.mark.parametrize('in_batch',
[True, False])
def test_file_operation(in_batch):
t_repo_version = 1
t_repo_id1 = api.create_repo('test_file_operation1', '', USER, passwd = None)
create_the_file()
# test post_file
assert api.post_file(t_repo_id1, file_path, '/', file_name, USER) == 0
t_file_id = api.get_file_id_by_path(t_repo_id1, '/' + file_name)
t_file_size = len(file_content)
assert t_file_size == api.get_file_size(t_repo_id1, t_repo_version, t_file_id)
# test post_dir
assert api.post_dir(t_repo_id1, '/', dir_name, USER) == 0
# test copy_file (synchronize)
t_copy_file_result1 = api.copy_file(t_repo_id1, '/', '[\"'+file_name+'\"]', t_repo_id1, '/', '[\"'+new_file_name+'\"]', USER, 0, 1)
assert t_copy_file_result1
assert t_copy_file_result1.task_id is None
assert not t_copy_file_result1.background
t_file_id = api.get_file_id_by_path(t_repo_id1, '/' + new_file_name)
assert t_file_size == api.get_file_size(t_repo_id1, t_repo_version, t_file_id)
# test copy_file (asynchronous)
t_repo_id2 = api.create_repo('test_file_operation2', '', USER, passwd = None)
usage = api.get_user_self_usage (USER)
api.set_user_quota(USER, usage + 1);
t_copy_file_result2 = api.copy_file(t_repo_id1, '/', '[\"'+file_name+'\"]', t_repo_id2, '/', '[\"'+file_name+'\"]', USER, 1, 0)
assert t_copy_file_result2
assert t_copy_file_result2.background
while True:
time.sleep(0.1)
t_copy_task = api.get_copy_task(t_copy_file_result2.task_id)
assert t_copy_task.failed
assert t_copy_task.failed_reason == 'Quota is full'
if t_copy_task.failed:
break;
api.set_user_quota(USER, -1);
t_copy_file_result2 = api.copy_file(t_repo_id1, '/', '[\"'+file_name+'\"]', t_repo_id2, '/', '[\"'+file_name+'\"]', USER, 1, 0)
assert t_copy_file_result2
assert t_copy_file_result2.task_id
assert t_copy_file_result2.background
while True:
time.sleep(0.1)
t_copy_task = api.get_copy_task(t_copy_file_result2.task_id)
if t_copy_task.successful:
break;
t_file_id = api.get_file_id_by_path(t_repo_id2, '/' + file_name)
assert t_file_size == api.get_file_size(t_repo_id2, t_repo_version, t_file_id)
# test move_file (synchronize)
t_move_file_info1 = api.get_dirent_by_path(t_repo_id1, '/' + new_file_name)
t_move_file_result1 = api.move_file(t_repo_id1, '/', '[\"'+new_file_name+'\"]', t_repo_id1, '/' + dir_name, '[\"'+new_file_name+'\"]', 1, USER, 0, 1)
assert t_move_file_result1
t_move_file_info2 = api.get_dirent_by_path(t_repo_id1, '/' + dir_name + '/' + new_file_name)
assert t_move_file_info1.mtime == t_move_file_info2.mtime
t_file_id = api.get_file_id_by_path(t_repo_id1, '/' + new_file_name)
assert t_file_id is None
# test move_file (synchronize)
t_move_file_result1 = api.move_file(t_repo_id1, '/' + dir_name, '[\"'+new_file_name+'\"]', t_repo_id1, '/', '[\"'+new_file_name_2+'\"]', 1, USER, 0, 1)
assert t_move_file_result1
t_file_id = api.get_file_id_by_path(t_repo_id1, '/' + dir_name + '/' + new_file_name)
assert t_file_id is None
# test move_file (asynchronous)
usage = api.get_user_self_usage (USER)
api.set_user_quota(USER, usage + 1);
t_move_file_result2 = api.move_file(t_repo_id1, '/', '[\"'+file_name+'\"]', t_repo_id2, '/' , '[\"'+new_file_name+'\"]', 1, USER, 1, 0)
assert t_move_file_result2
assert t_move_file_result2.task_id
assert t_move_file_result2.background
while True:
time.sleep(0.1)
t_move_task = api.get_copy_task(t_move_file_result2.task_id)
assert t_move_task.failed
assert t_move_task.failed_reason == 'Quota is full'
if t_move_task.failed:
break
api.set_user_quota(USER, -1);
t_move_file_result2 = api.move_file(t_repo_id1, '/', '[\"'+file_name+'\"]', t_repo_id2, '/' , '[\"'+new_file_name+'\"]', 1, USER, 1, 0)
assert t_move_file_result2
assert t_move_file_result2.task_id
assert t_move_file_result2.background
while True:
time.sleep(0.1)
t_move_task = api.get_copy_task(t_move_file_result2.task_id)
if t_move_task.successful:
break
t_file_id = api.get_file_id_by_path(t_repo_id2, '/' + new_file_name)
assert t_file_size == api.get_file_size(t_repo_id2, t_repo_version, t_file_id)
# test post_empty_file
assert api.post_empty_file(t_repo_id1, '/' + dir_name, empty_file_name, USER) == 0
t_file_id = api.get_file_id_by_path(t_repo_id1, '/' + dir_name + '/' + empty_file_name)
assert api.get_file_size(t_repo_id1, t_repo_version, t_file_id) == 0
# test rename_file
assert api.rename_file(t_repo_id1, '/' + dir_name, empty_file_name, new_empty_file_name, USER) == 0
#test put_file
t_new_file_id = api.put_file(t_repo_id1, file_path, '/' + dir_name, new_empty_file_name, USER, None)
assert t_new_file_id
# test get_file_revisions
t_commit_list = api.get_file_revisions(t_repo_id2, None, '/' + file_name, 2)
assert t_commit_list
assert len(t_commit_list) == 2
assert t_commit_list[0].creator_name == USER
# test del_file
if in_batch:
assert api.batch_del_files(t_repo_id2, '[\"'+'/'+file_name+'\"]', USER) == 0
else:
assert api.del_file(t_repo_id2, '/', '[\"'+file_name+'\"]', USER) == 0
# test get_deleted
t_deleted_file_list = api.get_deleted(t_repo_id2, 1)
assert t_deleted_file_list
assert len(t_deleted_file_list) == 2
assert t_deleted_file_list[0].obj_name == file_name
assert t_deleted_file_list[0].basedir == '/'
# test del a non-exist file. should return 0.
if in_batch:
file_list = ["/"+file_name, "/"+new_file_name]
assert api.batch_del_files(t_repo_id2, json.dumps(file_list), USER) == 0
t_deleted_file_list = api.get_deleted(t_repo_id2, 1)
assert t_deleted_file_list
assert len(t_deleted_file_list) == 3
file_list = ["/"+dir_name+"/"+new_empty_file_name, "/"+dir_name+"/"+new_file_name, "/"+new_file_name_2]
assert api.batch_del_files(t_repo_id1, json.dumps(file_list), USER) == 0
t_deleted_file_list = api.get_deleted(t_repo_id1, 1)
assert t_deleted_file_list
assert len(t_deleted_file_list) == 4
else:
assert api.del_file(t_repo_id2, '/', '[\"'+file_name+'\"]', USER) == 0
assert api.del_file(t_repo_id1, '/' + dir_name, '[\"'+new_empty_file_name+'\"]', USER) == 0
assert api.del_file(t_repo_id1, '/' + dir_name, '[\"'+new_file_name+'\"]', USER) == 0
assert api.del_file(t_repo_id2, '/', '[\"'+new_file_name+'\"]', USER) == 0
assert api.del_file(t_repo_id1, '/', '[\"'+new_file_name_2+'\"]', USER) == 0
time.sleep(1)
api.remove_repo(t_repo_id1)
api.remove_repo(t_repo_id2)