diff --git a/seahub/api2/endpoints/dir.py b/seahub/api2/endpoints/dir.py index 153d18f867..d5c8112ac2 100644 --- a/seahub/api2/endpoints/dir.py +++ b/seahub/api2/endpoints/dir.py @@ -1,5 +1,6 @@ import os import logging +import posixpath from rest_framework.authentication import SessionAuthentication from rest_framework.permissions import IsAuthenticated @@ -10,11 +11,12 @@ from rest_framework import status from seahub.api2.throttling import UserRateThrottle from seahub.api2.authentication import TokenAuthentication from seahub.api2.utils import api_error -from seahub.api2.views import reloaddir, get_dir_recursively, \ +from seahub.api2.views import get_dir_recursively, \ get_dir_entrys_by_id from seahub.views import check_folder_permission -from seahub.utils import check_filename_with_rename, is_pro_version +from seahub.utils import check_filename_with_rename +from seahub.utils.timeutils import timestamp_to_isoformat_timestr from seaserv import seafile_api from pysearpc import SearpcError @@ -30,17 +32,37 @@ class DirView(APIView): permission_classes = (IsAuthenticated, ) throttle_classes = (UserRateThrottle, ) + def get_dir_info(self, repo_id, dir_path): + + dir_obj = seafile_api.get_dirent_by_path(repo_id, dir_path) + dir_info = { + 'type': 'dir', + 'repo_id': repo_id, + 'parent_dir': os.path.dirname(dir_path.rstrip('/')), + 'obj_name': dir_obj.obj_name, + 'obj_id': dir_obj.obj_id, + 'mtime': timestamp_to_isoformat_timestr(dir_obj.mtime), + } + + return dir_info + def get(self, request, repo_id, format=None): - # list dir - repo = seafile_api.get_repo(repo_id) - if not repo: - error_msg = 'Library %s not found.' % repo_id - return api_error(status.HTTP_404_NOT_FOUND, error_msg) + """ Get dir info. + + Permission checking: + 1. user with either 'r' or 'rw' permission. + """ path = request.GET.get('p', '/') if path[-1] != '/': path = path + '/' + # recource check + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + try: dir_id = seafile_api.get_dir_id_by_path(repo_id, path) except SearpcError as e: @@ -52,7 +74,8 @@ class DirView(APIView): error_msg = 'Folder %s not found.' % path return api_error(status.HTTP_404_NOT_FOUND, error_msg) - if check_folder_permission(request, repo_id, path) is None: + # permission check + if not check_folder_permission(request, repo_id, path): error_msg = 'Permission denied.' return api_error(status.HTTP_403_FORBIDDEN, error_msg) @@ -86,11 +109,14 @@ class DirView(APIView): return get_dir_entrys_by_id(request, repo, path, dir_id, request_type) def post(self, request, repo_id, format=None): - repo = seafile_api.get_repo(repo_id) - if not repo: - return api_error(status.HTTP_404_NOT_FOUND, 'Library not found.') + """ Create, rename dir. - path = request.GET.get('p', '') + Permission checking: + 1. user with 'rw' permission. + """ + + # argument check + path = request.GET.get('p', None) if not path or path[0] != '/': error_msg = 'p invalid.' return api_error(status.HTTP_400_BAD_REQUEST, error_msg) @@ -99,126 +125,135 @@ class DirView(APIView): error_msg = 'Can not make or rename root dir.' return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - if path[-1] == '/': - path = path[:-1] - - username = request.user.username - parent_dir = os.path.dirname(path) - operation = request.POST.get('operation', '') - if operation.lower() == 'mkdir': - parent_dir = os.path.dirname(path) - if check_folder_permission(request, repo_id, parent_dir) != 'rw': - error_msg = 'Permission denied.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - create_parents = request.POST.get('create_parents', '').lower() in ('true', '1') - if not create_parents: - parent_dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir) - if not parent_dir_id: - error_msg = 'Folder %s not found.' % parent_dir - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - new_dir_name = os.path.basename(path) - new_dir_name = check_filename_with_rename(repo_id, parent_dir, new_dir_name) - try: - seafile_api.post_dir(repo_id, parent_dir, new_dir_name, username) - except SearpcError as e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - else: - if not is_pro_version(): - error_msg = 'Feature not supported.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - try: - seafile_api.mkdir_with_parents(repo_id, '/', path[1:], username) - except SearpcError as e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - - if request.GET.get('reloaddir', '').lower() == 'true': - resp = reloaddir(request, repo, parent_dir) - else: - resp = Response({'success': True}) - - return resp - - elif operation.lower() == 'rename': - dir_id = seafile_api.get_dir_id_by_path(repo_id, path) - if not dir_id: - error_msg = 'Folder %s not found.' % path - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - if check_folder_permission(request, repo.id, path) != 'rw': - error_msg = 'Permission denied.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - parent_dir = os.path.dirname(path) - old_dir_name = os.path.basename(path) - newname = request.POST.get('newname', '') - if not newname: - error_msg = 'newname invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - if newname == old_dir_name: - return Response({'success': True}) - - try: - # rename duplicate name - checked_newname = check_filename_with_rename(repo_id, parent_dir, newname) - # rename dir - seafile_api.rename_file(repo_id, parent_dir, old_dir_name, - checked_newname, username) - return Response({'success': True}) - except SearpcError, e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - else: - error_msg = "Operation not supported." + operation = request.data.get('operation', None) + if not operation: + error_msg = 'operation invalid.' return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - def delete(self, request, repo_id, format=None): - # delete dir or file + operation = operation.lower() + if operation not in ('mkdir', 'rename'): + error_msg = "operation can only be 'mkdir', 'rename'." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check repo = seafile_api.get_repo(repo_id) if not repo: error_msg = 'Library %s not found.' % repo_id return api_error(status.HTTP_404_NOT_FOUND, error_msg) + path = path.rstrip('/') + username = request.user.username + parent_dir = os.path.dirname(path) + if operation == 'mkdir': + # permission check + if check_folder_permission(request, repo_id, parent_dir) != 'rw': + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # resource check + parent_dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir) + if not parent_dir_id: + error_msg = 'Folder %s not found.' % parent_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + new_dir_name = os.path.basename(path) + new_dir_name = check_filename_with_rename(repo_id, parent_dir, new_dir_name) + try: + seafile_api.post_dir(repo_id, parent_dir, new_dir_name, username) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + new_dir_path = posixpath.join(parent_dir, new_dir_name) + dir_info = self.get_dir_info(repo_id, new_dir_path) + resp = Response(dir_info) + + return resp + + if operation == 'rename': + # resource check + dir_id = seafile_api.get_dir_id_by_path(repo_id, path) + if not dir_id: + error_msg = 'Folder %s not found.' % path + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check + if check_folder_permission(request, repo_id, path) != 'rw': + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + old_dir_name = os.path.basename(path) + new_dir_name = request.data.get('newname', None) + if not new_dir_name: + error_msg = 'newname invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if new_dir_name == old_dir_name: + dir_info = self.get_dir_info(repo_id, path) + resp = Response(dir_info) + return resp + + try: + # rename duplicate name + new_dir_name = check_filename_with_rename(repo_id, parent_dir, new_dir_name) + # rename dir + seafile_api.rename_file(repo_id, parent_dir, old_dir_name, + new_dir_name, username) + + new_dir_path = posixpath.join(parent_dir, new_dir_name) + dir_info = self.get_dir_info(repo_id, new_dir_path) + resp = Response(dir_info) + return resp + except SearpcError, e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + def delete(self, request, repo_id, format=None): + """ Delete dir. + + Permission checking: + 1. user with 'rw' permission. + """ + + # argument check path = request.GET.get('p', None) if not path: error_msg = 'p invalid.' return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - dir_id = seafile_api.get_dir_id_by_path(repo_id, path) - if not dir_id: - error_msg = 'Folder %s not found.' % path - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - if check_folder_permission(request, repo_id, path) != 'rw': - error_msg = 'Permission denied.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - if path == '/': error_msg = 'Can not delete root path.' return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + # resource check + dir_id = seafile_api.get_dir_id_by_path(repo_id, path) + if not dir_id: + return Response({'success': True}) + + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check + if check_folder_permission(request, repo_id, path) != 'rw': + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + if path[-1] == '/': path = path[:-1] + path = path.rstrip('/') username = request.user.username parent_dir = os.path.dirname(path) - file_name = os.path.basename(path) + dir_name = os.path.basename(path) try: - seafile_api.del_file(repo_id, parent_dir, file_name, username) + seafile_api.del_file(repo_id, parent_dir, dir_name, username) except SearpcError as e: logger.error(e) error_msg = 'Internal Server Error' return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - if request.GET.get('reloaddir', '').lower() == 'true': - return reloaddir(request, repo, parent_dir) - else: - return Response({'success': True}) + return Response({'success': True}) diff --git a/seahub/api2/endpoints/file.py b/seahub/api2/endpoints/file.py index c514ee8c60..74d0a1b8d8 100644 --- a/seahub/api2/endpoints/file.py +++ b/seahub/api2/endpoints/file.py @@ -1,5 +1,6 @@ import os import logging +import posixpath from rest_framework.authentication import SessionAuthentication from rest_framework.permissions import IsAuthenticated @@ -10,12 +11,10 @@ from rest_framework import status from seahub.api2.throttling import UserRateThrottle from seahub.api2.authentication import TokenAuthentication from seahub.api2.utils import api_error -from seahub.api2.views import reloaddir, get_shared_link -from seahub.utils import gen_file_get_url, EMPTY_SHA1, \ - check_filename_with_rename +from seahub.utils import check_filename_with_rename, is_pro_version +from seahub.utils.timeutils import timestamp_to_isoformat_timestr from seahub.views import check_folder_permission, check_file_lock -from seahub.views.file import send_file_access_msg from seahub.settings import MAX_UPLOAD_FILE_NAME_LEN, FILE_LOCK_EXPIRATION_DAYS @@ -24,6 +23,7 @@ from pysearpc import SearpcError logger = logging.getLogger(__name__) + class FileView(APIView): """ Support uniform interface for file related operations, @@ -34,18 +34,42 @@ class FileView(APIView): permission_classes = (IsAuthenticated, ) throttle_classes = (UserRateThrottle, ) + def get_file_info(self, username, repo_id, file_path): + + file_obj = seafile_api.get_dirent_by_path(repo_id, file_path) + is_locked, locked_by_me = check_file_lock(repo_id, file_path, username) + file_info = { + 'type': 'file', + 'repo_id': repo_id, + 'parent_dir': os.path.dirname(file_path), + 'obj_name': file_obj.obj_name, + 'obj_id': file_obj.obj_id, + 'size': file_obj.size, + 'mtime': timestamp_to_isoformat_timestr(file_obj.mtime), + 'is_locked': is_locked, + } + + return file_info + def get(self, request, repo_id, format=None): + """ Get file info. - repo = seafile_api.get_repo(repo_id) - if not repo: - error_msg = 'Library %s not found.' % repo_id - return api_error(status.HTTP_404_NOT_FOUND, error_msg) + Permission checking: + 1. user with either 'r' or 'rw' permission. + """ + # argument check path = request.GET.get('p', None) if not path: error_msg = 'p invalid.' return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + # resource check + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + try: file_id = seafile_api.get_file_id_by_path(repo_id, path) except SearpcError as e: @@ -57,254 +81,52 @@ class FileView(APIView): error_msg = 'File %s not found.' % path return api_error(status.HTTP_404_NOT_FOUND, error_msg) - if check_folder_permission(request, repo_id, path) is None: + # permission check + parent_dir = os.path.dirname(path) + if check_folder_permission(request, repo_id, parent_dir) is None: error_msg = 'Permission denied.' return api_error(status.HTTP_403_FORBIDDEN, error_msg) - # send stats message - send_file_access_msg(request, repo, path, 'api') - op = request.GET.get('op', 'download') - if op == 'download': - reuse = request.GET.get('reuse', '0') - if reuse not in ('1', '0'): - error_msg = "If you want to reuse file server access token for download file, you should set 'reuse' argument as '1'." - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - use_onetime = False if reuse == '1' else True - - file_name = os.path.basename(path) - token = seafile_api.get_fileserver_access_token(repo_id, - file_id, op, request.user.username, use_onetime) - redirect_url = gen_file_get_url(token, file_name) - - return Response({"url": redirect_url}) - - elif op == 'downloadblks': - blklist = [] - encrypted = False - enc_version = 0 - if file_id != EMPTY_SHA1: - try: - blks = seafile_api.list_blocks_by_file_id(repo_id, file_id) - blklist = blks.split('\n') - except SearpcError as e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - - blklist = [i for i in blklist if len(i) == 40] - if len(blklist) > 0: - repo = seafile_api.get_repo(repo_id) - encrypted = repo.encrypted - enc_version = repo.enc_version - - res = { - 'file_id': file_id, - 'blklist': blklist, - 'encrypted': encrypted, - 'enc_version': enc_version, - } - - return Response(res) - - elif op == 'sharelink': - link = get_shared_link(request, repo_id, path) - return Response({"link": link}) - - else: - error_msg = 'op invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + file_info = self.get_file_info(request.user.username, repo_id, path) + return Response(file_info) def post(self, request, repo_id, format=None): - # rename, move, copy or create file + """ Create, rename, move, copy file + + Permission checking: + 1. create: user with 'rw' permission for current parent dir; + 2. rename: user with 'rw' permission for current file; + 3. move : user with 'rw' permission for current file, 'rw' permission for dst parent dir; + 4. copy : user with 'r' permission for current file, 'rw' permission for dst parent dir; + """ + + # argument check + path = request.GET.get('p', None) + if not path or path[0] != '/': + error_msg = 'p invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + operation = request.data.get('operation', None) + if not operation: + error_msg = 'operation invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + operation = operation.lower() + if operation not in ('create', 'rename', 'move', 'copy'): + error_msg = "operation can only be 'create', 'rename', 'move' or 'copy'." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check repo = seafile_api.get_repo(repo_id) if not repo: error_msg = 'Library %s not found.' % repo_id return api_error(status.HTTP_404_NOT_FOUND, error_msg) - path = request.GET.get('p', '') - if not path or path[0] != '/': - error_msg = 'p invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - username = request.user.username parent_dir = os.path.dirname(path) - operation = request.POST.get('operation', '') - if operation.lower() == 'rename': - try: - file_id = seafile_api.get_file_id_by_path(repo_id, path) - except SearpcError as e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - if not file_id: - error_msg = 'File %s not found.' % path - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - if check_folder_permission(request, repo_id, path) != 'rw': - error_msg = 'Permission denied.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - newname = request.POST.get('newname', '') - if not newname: - error_msg = 'newname invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - if len(newname) > MAX_UPLOAD_FILE_NAME_LEN: - error_msg = 'newname is too long.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - oldname = os.path.basename(path) - if oldname == newname: - error_msg = 'The new name is the same to the old' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - newname = check_filename_with_rename(repo_id, parent_dir, newname) - try: - seafile_api.rename_file(repo_id, parent_dir, oldname, newname, username) - except SearpcError as e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - - if request.GET.get('reloaddir', '').lower() == 'true': - return reloaddir(request, repo, parent_dir) - else: - return Response({'success': True}) - - elif operation.lower() == 'move': - try: - file_id = seafile_api.get_file_id_by_path(repo_id, path) - except SearpcError as e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - - if not file_id: - error_msg = 'File %s not found.' % path - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - # check src validation - src_repo_id = repo_id - src_dir = os.path.dirname(path) - if check_folder_permission(request, src_repo_id, src_dir) != 'rw': - error_msg = 'Permission denied.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - # check dst validation - dst_repo_id = request.POST.get('dst_repo', '') - dst_dir = request.POST.get('dst_dir', '') - if not dst_repo_id: - error_msg = 'dst_repo_id invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - dst_repo = seafile_api.get_repo(dst_repo_id) - if not dst_repo: - error_msg = 'Library %s not found.' % dst_repo_id - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - if not dst_dir: - error_msg = 'dst_dir invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - dst_dir_id = seafile_api.get_dir_id_by_path(dst_repo_id, dst_dir) - if not dst_dir_id: - error_msg = 'Folder %s not found.' % dst_dir - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - if check_folder_permission(request, dst_repo_id, dst_dir) != 'rw': - error_msg = 'Permission denied.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - # move file - if dst_dir[-1] != '/': # Append '/' to the end of directory if necessary - dst_dir += '/' - - if src_repo_id == dst_repo_id and src_dir == dst_dir: - return Response({'success': True}) - - filename = os.path.basename(path) - new_filename = check_filename_with_rename(dst_repo_id, dst_dir, filename) - try: - seafile_api.move_file(src_repo_id, src_dir, filename, dst_repo_id, - dst_dir, new_filename, username, 0, synchronous=1) - except SearpcError as e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - - if request.GET.get('reloaddir', '').lower() == 'true': - return reloaddir(request, dst_repo, dst_dir) - else: - return Response({'success': True}) - - elif operation.lower() == 'copy': - try: - file_id = seafile_api.get_file_id_by_path(repo_id, path) - except SearpcError as e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - - if not file_id: - error_msg = 'File %s not found.' % path - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - # check src validation - src_repo_id = repo_id - src_dir = os.path.dirname(path) - if check_folder_permission(request, src_repo_id, src_dir) is None: - error_msg = 'Permission denied.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - # check dst validation - dst_repo_id = request.POST.get('dst_repo', '') - dst_dir = request.POST.get('dst_dir', '') - if not dst_repo_id: - error_msg = 'dst_repo_id invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - dst_repo = seafile_api.get_repo(dst_repo_id) - if not dst_repo: - error_msg = 'Library %s not found.' % dst_repo_id - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - if not dst_dir: - error_msg = 'dst_dir invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - dst_dir_id = seafile_api.get_dir_id_by_path(dst_repo_id, dst_dir) - if not dst_dir_id: - error_msg = 'Folder %s not found.' % dst_dir - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - if check_folder_permission(request, dst_repo_id, dst_dir) != 'rw': - error_msg = 'Permission denied.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - # copy file - if dst_dir[-1] != '/': # Append '/' to the end of directory if necessary - dst_dir += '/' - - if src_repo_id == dst_repo_id and src_dir == dst_dir: - return Response({'success': True}) - - filename = os.path.basename(path) - new_filename = check_filename_with_rename(dst_repo_id, dst_dir, filename) - try: - seafile_api.copy_file(src_repo_id, src_dir, filename, dst_repo_id, - dst_dir, new_filename, username, 0, synchronous=1) - except SearpcError as e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - - if request.GET.get('reloaddir', '').lower() == 'true': - return reloaddir(request, dst_repo, dst_dir) - else: - return Response({'success': True}) - - elif operation.lower() == 'create': + if operation == 'create': + # resource check try: parent_dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir) except SearpcError as e: @@ -316,10 +138,12 @@ class FileView(APIView): error_msg = 'Folder %s not found.' % parent_dir return api_error(status.HTTP_404_NOT_FOUND, error_msg) + # permission check if check_folder_permission(request, repo_id, parent_dir) != 'rw': error_msg = 'Permission denied.' return api_error(status.HTTP_403_FORBIDDEN, error_msg) + # create file new_file_name = os.path.basename(path) new_file_name = check_filename_with_rename(repo_id, parent_dir, new_file_name) @@ -330,95 +154,299 @@ class FileView(APIView): error_msg = 'Internal Server Error' return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - if request.GET.get('reloaddir', '').lower() == 'true': - return reloaddir(request, repo, parent_dir) - else: - return Response({'success': True}) - else: - error_msg = "Operation can only be rename, create or move." - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + file_info = self.get_file_info(username, repo_id, path) + return Response(file_info) + + if operation == 'rename': + # argument check + new_file_name = request.data.get('newname', None) + if not new_file_name: + error_msg = 'newname invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if len(new_file_name) > MAX_UPLOAD_FILE_NAME_LEN: + error_msg = 'newname is too long.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + oldname = os.path.basename(path) + if oldname == new_file_name: + error_msg = 'The new name is the same to the old' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check + try: + file_id = seafile_api.get_file_id_by_path(repo_id, path) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + if not file_id: + error_msg = 'File %s not found.' % path + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check + if check_folder_permission(request, repo_id, parent_dir) != 'rw': + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # rename file + new_file_name = check_filename_with_rename(repo_id, parent_dir, + new_file_name) + try: + seafile_api.rename_file(repo_id, parent_dir, oldname, + new_file_name, username) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + new_file_path = posixpath.join(parent_dir, new_file_name) + file_info = self.get_file_info(username, repo_id, new_file_path) + return Response(file_info) + + if operation == 'move': + # argument check + dst_repo_id = request.data.get('dst_repo', None) + dst_dir = request.data.get('dst_dir', None) + if not dst_repo_id: + error_msg = 'dst_repo invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if not dst_dir: + error_msg = 'dst_dir invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check for source file + try: + file_id = seafile_api.get_file_id_by_path(repo_id, path) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + if not file_id: + error_msg = 'File %s not found.' % path + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # resource check for dst repo and dir + dst_repo = seafile_api.get_repo(dst_repo_id) + if not dst_repo: + error_msg = 'Library %s not found.' % dst_repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + dst_dir_id = seafile_api.get_dir_id_by_path(dst_repo_id, dst_dir) + if not dst_dir_id: + error_msg = 'Folder %s not found.' % dst_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check for source file + src_repo_id = repo_id + src_dir = os.path.dirname(path) + if check_folder_permission(request, src_repo_id, src_dir) != 'rw': + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # permission check for dst dir + if check_folder_permission(request, dst_repo_id, dst_dir) != 'rw': + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # move file + if dst_dir[-1] != '/': # Append '/' to the end of directory if necessary + dst_dir += '/' + + if src_repo_id == dst_repo_id and src_dir == dst_dir: + file_info = self.get_file_info(username, repo_id, path) + return Response(file_info) + + filename = os.path.basename(path) + new_file_name = check_filename_with_rename(dst_repo_id, dst_dir, filename) + try: + seafile_api.move_file(src_repo_id, src_dir, filename, dst_repo_id, + dst_dir, new_file_name, username, 0, synchronous=1) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + dst_file_path = posixpath.join(dst_dir, new_file_name) + dst_file_info = self.get_file_info(username, dst_repo_id, dst_file_path) + return Response(dst_file_info) + + if operation == 'copy': + # argument check + dst_repo_id = request.data.get('dst_repo', None) + dst_dir = request.data.get('dst_dir', None) + if not dst_repo_id: + error_msg = 'dst_repo_id invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if not dst_dir: + error_msg = 'dst_dir invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check for source file + try: + file_id = seafile_api.get_file_id_by_path(repo_id, path) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + if not file_id: + error_msg = 'File %s not found.' % path + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # resource check for dst repo and dir + dst_repo = seafile_api.get_repo(dst_repo_id) + if not dst_repo: + error_msg = 'Library %s not found.' % dst_repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + dst_dir_id = seafile_api.get_dir_id_by_path(dst_repo_id, dst_dir) + if not dst_dir_id: + error_msg = 'Folder %s not found.' % dst_dir + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + # permission check for source file + src_repo_id = repo_id + src_dir = os.path.dirname(path) + if not check_folder_permission(request, src_repo_id, src_dir): + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # permission check for dst dir + if check_folder_permission(request, dst_repo_id, dst_dir) != 'rw': + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # copy file + if dst_dir[-1] != '/': # Append '/' to the end of directory if necessary + dst_dir += '/' + + if src_repo_id == dst_repo_id and src_dir == dst_dir: + file_info = self.get_file_info(username, repo_id, path) + return Response(file_info) + + filename = os.path.basename(path) + new_file_name = check_filename_with_rename(dst_repo_id, dst_dir, filename) + try: + seafile_api.copy_file(src_repo_id, src_dir, filename, dst_repo_id, + dst_dir, new_file_name, username, 0, synchronous=1) + except SearpcError as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + dst_file_path = posixpath.join(dst_dir, new_file_name) + dst_file_info = self.get_file_info(username, dst_repo_id, dst_file_path) + return Response(dst_file_info) def put(self, request, repo_id, format=None): + """ Currently only for lock and unlock file operation. + + Permission checking: + 1. user with 'rw' permission for current file; + """ + + if not is_pro_version(): + error_msg = 'file lock feature only supported in professional edition.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # argument check + path = request.GET.get('p', None) + if not path: + error_msg = 'p invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + operation = request.data.get('operation', None) + if not operation: + error_msg = 'operation invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + operation = operation.lower() + if operation not in ('lock', 'unlock'): + error_msg = "operation can only be 'lock', or 'unlock'." + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check repo = seafile_api.get_repo(repo_id) if not repo: error_msg = 'Library %s not found.' % repo_id return api_error(status.HTTP_404_NOT_FOUND, error_msg) - path = request.GET.get('p', '') - if not path: - error_msg = 'p invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - file_id = seafile_api.get_file_id_by_path(repo_id, path) if not file_id: error_msg = 'File %s not found.' % path return api_error(status.HTTP_404_NOT_FOUND, error_msg) - if check_folder_permission(request, repo_id, path) != 'rw': - error_msg = 'Permission denied.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - username = request.user.username - operation = request.data.get('operation', '') - if operation.lower() == 'lock': - is_locked, locked_by_me = check_file_lock(repo_id, path, username) - if is_locked: - error_msg = 'File is already locked.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - # lock file - expire = request.data.get('expire', FILE_LOCK_EXPIRATION_DAYS) - try: - seafile_api.lock_file(repo_id, path.lstrip('/'), username, expire) - return Response({'success': True}) - except SearpcError, e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - - if operation.lower() == 'unlock': - is_locked, locked_by_me = check_file_lock(repo_id, path, username) - if not is_locked: - error_msg = 'File is not locked.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - if not locked_by_me: - error_msg = 'You can not unlock this file.' - return api_error(status.HTTP_403_FORBIDDEN, error_msg) - - # unlock file - try: - seafile_api.unlock_file(repo_id, path.lstrip('/')) - return Response({'success': True}) - except SearpcError, e: - logger.error(e) - error_msg = 'Internal Server Error' - return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - else: - error_msg = "Operation can only be lock or unlock" - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - def delete(self, request, repo_id, format=None): - # delete file - repo = seafile_api.get_repo(repo_id) - if not repo: - error_msg = 'Library %s not found.' % repo_id - return api_error(status.HTTP_404_NOT_FOUND, error_msg) - - path = request.GET.get('p', None) - if not path: - error_msg = 'p invalid.' - return api_error(status.HTTP_400_BAD_REQUEST, error_msg) - - file_id = seafile_api.get_file_id_by_path(repo_id, path) - if not file_id: - return Response({'success': True}) - + # permission check parent_dir = os.path.dirname(path) if check_folder_permission(request, repo_id, parent_dir) != 'rw': error_msg = 'Permission denied.' return api_error(status.HTTP_403_FORBIDDEN, error_msg) + username = request.user.username + is_locked, locked_by_me = check_file_lock(repo_id, path, username) + if operation == 'lock': + if not is_locked: + # lock file + expire = request.data.get('expire', FILE_LOCK_EXPIRATION_DAYS) + try: + seafile_api.lock_file(repo_id, path.lstrip('/'), username, expire) + except SearpcError, e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + if operation == 'unlock': + if is_locked: + if not locked_by_me: + error_msg = 'You can not unlock this file.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # unlock file + try: + seafile_api.unlock_file(repo_id, path.lstrip('/')) + except SearpcError, e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + file_info = self.get_file_info(username, repo_id, path) + return Response(file_info) + + def delete(self, request, repo_id, format=None): + """ Delete file. + + Permission checking: + 1. user with 'rw' permission. + """ + + # argument check + path = request.GET.get('p', None) + if not path: + error_msg = 'p invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # resource check + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library %s not found.' % repo_id + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + file_id = seafile_api.get_file_id_by_path(repo_id, path) + if not file_id: + return Response({'success': True}) + + # permission check parent_dir = os.path.dirname(path) + if check_folder_permission(request, repo_id, parent_dir) != 'rw': + error_msg = 'Permission denied.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) + + # delete file file_name = os.path.basename(path) try: seafile_api.del_file(repo_id, parent_dir, @@ -428,7 +456,4 @@ class FileView(APIView): error_msg = 'Internal Server Error' return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) - if request.GET.get('reloaddir', '').lower() == 'true': - return reloaddir(request, repo, parent_dir) - else: - return Response({'success': True}) + return Response({'success': True}) diff --git a/seahub/api2/views.py b/seahub/api2/views.py index e5c0bf90eb..d8f1c4c04b 100644 --- a/seahub/api2/views.py +++ b/seahub/api2/views.py @@ -1833,7 +1833,8 @@ class FileView(APIView): if not path: return api_error(status.HTTP_400_BAD_REQUEST, 'Path is missing.') - if check_folder_permission(request, repo_id, path) is None: + parent_dir = os.path.dirname(path) + if check_folder_permission(request, repo_id, parent_dir) is None: return api_error(status.HTTP_403_FORBIDDEN, 'You do not have permission to access this file.') @@ -1880,7 +1881,7 @@ class FileView(APIView): operation = request.POST.get('operation', '') if operation.lower() == 'rename': - if check_folder_permission(request, repo_id, path) != 'rw': + if check_folder_permission(request, repo_id, parent_dir) != 'rw': return api_error(status.HTTP_403_FORBIDDEN, 'You do not have permission to rename file.') @@ -1916,7 +1917,7 @@ class FileView(APIView): return resp elif operation.lower() == 'move': - if check_folder_permission(request, repo_id, path) != 'rw': + if check_folder_permission(request, repo_id, parent_dir) != 'rw': return api_error(status.HTTP_403_FORBIDDEN, 'You do not have permission to move file.') @@ -1984,7 +1985,7 @@ class FileView(APIView): return Response('success', status=status.HTTP_200_OK) # check src folder permission - if check_folder_permission(request, repo_id, path) is None: + if check_folder_permission(request, repo_id, src_dir) is None: return api_error(status.HTTP_403_FORBIDDEN, 'You do not have permission to copy file.') @@ -2059,7 +2060,8 @@ class FileView(APIView): username = request.user.username # check file access permission - if check_folder_permission(request, repo_id, path) != 'rw': + parent_dir = os.path.dirname(path) + if check_folder_permission(request, repo_id, parent_dir) != 'rw': return api_error(status.HTTP_403_FORBIDDEN, 'Permission denied.') operation = request.data.get('operation', '') diff --git a/tests/api/endpoints/test_dir_view.py b/tests/api/endpoints/test_dir_view.py index fe238c0fb6..c334294bbd 100644 --- a/tests/api/endpoints/test_dir_view.py +++ b/tests/api/endpoints/test_dir_view.py @@ -1,17 +1,22 @@ # -*- coding: utf-8 -*- import os import json +import posixpath from seaserv import seafile_api from django.core.urlresolvers import reverse from seahub.test_utils import BaseTestCase -from seahub.share.models import FileShare from tests.common.utils import randstring -class FileViewTest(BaseTestCase): +try: + from seahub.settings import LOCAL_PRO_DEV_ENV +except ImportError: + LOCAL_PRO_DEV_ENV = False + +class DirViewTest(BaseTestCase): def create_new_repo(self): new_repo_id = seafile_api.create_repo(name='test-repo-2', desc='', @@ -25,21 +30,29 @@ class FileViewTest(BaseTestCase): resp = self.client.get(url, HTTP_X_REQUESTED_WITH='XMLHttpRequest') json_resp = json.loads(resp.content) - if len(json_resp['dirent_list']) == 0: - return None + if len(json_resp['dirent_list']) > 0: + for dirent in json_resp['dirent_list']: + if dirent.has_key('is_dir') and dirent['is_dir']: + return dirent['obj_name'] + else: + continue - return json_resp['dirent_list'][0]['obj_name'] + return None def setUp(self): self.repo_id = self.repo.id self.folder_path = self.folder self.folder_name = os.path.basename(self.folder_path) + self.user_name = self.user.username + self.admin_name = self.admin.username + self.url = reverse('api-v2.1-dir-view', args=[self.repo_id]) def tearDown(self): self.remove_repo() + # for test http GET request def test_can_get_dir(self): self.login_as(self.user) resp = self.client.get(self.url) @@ -49,11 +62,27 @@ class FileViewTest(BaseTestCase): assert json_resp[0]['type'] == 'dir' assert json_resp[0]['name'] == self.folder_name + def test_get_dir_with_invalid_perm(self): + # login as admin, then get dir info in user's repo + self.login_as(self.admin) + resp = self.client.get(self.url) + self.assertEqual(403, resp.status_code) + + # for test http POST request + def test_post_operation_invalid(self): + self.login_as(self.user) + + data = {'operation': 'invalid',} + resp = self.client.post(self.url + '?p=' + self.folder_path, data) + + self.assertEqual(400, resp.status_code) + def test_can_create_folder(self): self.login_as(self.user) # delete old folder - resp = self.client.delete(self.url + '?p=' + self.folder_path, {}, 'application/x-www-form-urlencoded') + resp = self.client.delete(self.url + '?p=' + self.folder_path, + {}, 'application/x-www-form-urlencoded') assert None == self.get_lib_folder_name(self.repo_id) # check folder has been deleted @@ -63,9 +92,7 @@ class FileViewTest(BaseTestCase): new_folder_path = '/' + new_name # create file - data = { - 'operation': 'mkdir', - } + data = {'operation': 'mkdir',} resp = self.client.post(self.url + '?p=' + new_folder_path, data) self.assertEqual(200, resp.status_code) @@ -73,20 +100,46 @@ class FileViewTest(BaseTestCase): # check new folder has been created assert new_name == self.get_lib_folder_name(self.repo_id) - def test_can_delete_folder(self): - self.login_as(self.user) + def test_create_folder_with_invalid_repo_perm(self): - # check folder exist when init - assert self.folder_name == self.get_lib_folder_name(self.repo_id) + # login as admin, then create dir in user's repo + self.login_as(self.admin) - # delete folder - resp = self.client.delete(self.url + '?p=' + self.folder_path, {}, 'application/x-www-form-urlencoded') - assert None == self.get_lib_folder_name(self.repo_id) + new_name = randstring(6) + new_folder_path = '/' + new_name - self.assertEqual(200, resp.status_code) + # create file + data = {'operation': 'mkdir',} + resp = self.client.post(self.url + '?p=' + new_folder_path, data) + self.assertEqual(403, resp.status_code) - # check folder has been deleted - assert None == self.get_lib_folder_name(self.repo_id) + def test_create_folder_with_invalid_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit sub-folder with 'r' permission + assert seafile_api.check_permission_by_path(self.repo_id, + self.folder_path, self.admin_name) == 'r' + + # login as admin, then create dir in a 'r' permission folder + self.login_as(self.admin) + + new_name = randstring(6) + new_folder_path = posixpath.join(self.folder_path, new_name) + data = {'operation': 'mkdir',} + + resp = self.client.post(self.url + '?p=' + new_folder_path, data) + + self.assertEqual(403, resp.status_code) def test_can_rename_folder(self): self.login_as(self.user) @@ -99,16 +152,92 @@ class FileViewTest(BaseTestCase): resp = self.client.post(self.url + '?p=' + self.folder_path, data) self.assertEqual(200, resp.status_code) + # check old file has been renamed to new_name assert new_name == self.get_lib_folder_name(self.repo_id) - def test_can_post_operation_invalid(self): + def test_rename_folder_with_invalid_repo_perm(self): + + # login as admin, then rename dir in user's repo + self.login_as(self.admin) + + new_name = randstring(6) + data = {'operation': 'rename', 'newname': new_name} + + resp = self.client.post(self.url + '?p=' + self.folder_path, data) + self.assertEqual(403, resp.status_code) + + def test_rename_folder_with_invalid_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit sub-folder with 'r' permission + assert seafile_api.check_permission_by_path(self.repo_id, + self.folder_path, self.admin_name) == 'r' + + # login as admin, then rename a 'r' permission folder + self.login_as(self.admin) + + new_name = randstring(6) + data = {'operation': 'rename', 'newname': new_name} + + resp = self.client.post(self.url + '?p=' + self.folder_path, data) + self.assertEqual(403, resp.status_code) + + # for test http DELETE request + def test_can_delete_folder(self): self.login_as(self.user) - data = { - 'operation': 'invalid', - } - resp = self.client.post(self.url + '?p=' + self.folder_path, data) + # check folder exist when init + assert self.folder_name == self.get_lib_folder_name(self.repo_id) - self.assertEqual(400, resp.status_code) + # delete folder + resp = self.client.delete(self.url + '?p=' + self.folder_path, + {}, 'application/x-www-form-urlencoded') + assert None == self.get_lib_folder_name(self.repo_id) + self.assertEqual(200, resp.status_code) + + # check folder has been deleted + assert None == self.get_lib_folder_name(self.repo_id) + + def test_delete_folder_with_invalid_repo_perm(self): + + # login as admin, then delete dir in user's repo + self.login_as(self.admin) + resp = self.client.delete(self.url + '?p=' + self.folder_path, + {}, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code) + + def test_delete_folder_with_invalid_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit sub-folder with 'r' permission + assert seafile_api.check_permission_by_path(self.repo_id, + self.folder_path, self.admin_name) == 'r' + + # login as admin, then delete a 'r' permission folder + self.login_as(self.admin) + + resp = self.client.delete(self.url + '?p=' + self.folder_path, + {}, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code) diff --git a/tests/api/endpoints/test_file_view.py b/tests/api/endpoints/test_file_view.py index f1232af7c1..7669994d52 100644 --- a/tests/api/endpoints/test_file_view.py +++ b/tests/api/endpoints/test_file_view.py @@ -1,13 +1,13 @@ # -*- coding: utf-8 -*- import os import json +import posixpath from seaserv import seafile_api from django.core.urlresolvers import reverse from seahub.test_utils import BaseTestCase -from seahub.share.models import FileShare from tests.common.utils import randstring @@ -25,73 +25,179 @@ class FileViewTest(BaseTestCase): return new_repo_id + def admin_create_new_repo(self): + new_repo_id = seafile_api.create_repo(name='test-repo-2', desc='', + username=self.admin.username, passwd=None) + + return new_repo_id + def get_lib_file_name(self, repo_id): url = reverse('list_lib_dir', args=[repo_id]) resp = self.client.get(url, HTTP_X_REQUESTED_WITH='XMLHttpRequest') json_resp = json.loads(resp.content) - if len(json_resp['dirent_list']) == 0: - return None + if len(json_resp['dirent_list']) > 0: + for dirent in json_resp['dirent_list']: + if dirent.has_key('is_file') and dirent['is_file']: + return dirent['obj_name'] + else: + continue - return json_resp['dirent_list'][0]['obj_name'] + return None def setUp(self): + self.user_name = self.user.username + self.admin_name = self.admin.username + self.repo_id = self.repo.id self.file_path = self.file self.file_name = os.path.basename(self.file_path) + self.folder_path = self.folder + self.url = reverse('api-v2.1-file-view', args=[self.repo_id]) def tearDown(self): self.remove_repo() - def test_can_get_file_download_url(self): + # for test http GET request + def test_can_get_file_info(self): self.login_as(self.user) resp = self.client.get(self.url + '?p=' + self.file_path) self.assertEqual(200, resp.status_code) json_resp = json.loads(resp.content) - assert '8082' in json_resp['url'] + assert self.file_name == json_resp['obj_name'] - def test_can_get_file_downloadblks_info(self): - file_id = seafile_api.get_file_id_by_path(self.repo_id, self.file_path) + def test_get_file_info_with_invalid_perm(self): + # login as admin, then visit user's file + self.login_as(self.admin) + resp = self.client.get(self.url + '?p=' + self.file_path) + self.assertEqual(403, resp.status_code) + # for test http POST request + def test_post_operation_invalid(self): self.login_as(self.user) - resp = self.client.get(self.url + '?p=' + self.file_path + '&op=downloadblks') - self.assertEqual(200, resp.status_code) - json_resp = json.loads(resp.content) - assert json_resp['file_id'] == file_id - - def test_can_get_file_share_link_info(self): - fs = FileShare.objects.create_file_link(self.user.username, - self.repo_id, self.file_path, None, None) - - self.login_as(self.user) - - resp = self.client.get(self.url + '?p=' + self.file_path + '&op=sharelink') - self.assertEqual(200, resp.status_code) - json_resp = json.loads(resp.content) - assert fs.token in json_resp['link'] - - def test_get_operation_invalid(self): - self.login_as(self.user) - resp = self.client.get(self.url + '?p=' + self.file_path + '&op=invalid') + data = {'operation': 'invalid',} + resp = self.client.post(self.url + '?p=' + self.file_path, data) self.assertEqual(400, resp.status_code) + def test_can_create_file(self): + self.login_as(self.user) + + # delete old file + resp = self.client.delete(self.url + '?p=' + self.file_path, + {}, 'application/x-www-form-urlencoded') + assert None == self.get_lib_file_name(self.repo_id) + + new_name = randstring(6) + new_file_path = '/' + new_name + data = {'operation': 'create',} + + # create file + resp = self.client.post(self.url + '?p=' + new_file_path, data) + self.assertEqual(200, resp.status_code) + + # check new file in repo + assert new_name == self.get_lib_file_name(self.repo_id) + + def test_create_file_with_invalid_repo_perm(self): + + # login as admin, then create file in user's repo + self.login_as(self.admin) + + new_name = randstring(6) + new_file_path = '/' + new_name + data = {'operation': 'create',} + + resp = self.client.post(self.url + '?p=' + new_file_path, data) + self.assertEqual(403, resp.status_code) + + def test_create_file_with_invalid_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit sub-folder with 'r' permission + assert seafile_api.check_permission_by_path(self.repo_id, + self.folder_path, self.admin_name) == 'r' + + # login as admin, then create file in a 'r' permission folder + self.login_as(self.admin) + + new_name = randstring(6) + new_file_path = posixpath.join(self.folder_path, new_name) + data = {'operation': 'create',} + + resp = self.client.post(self.url + '?p=' + new_file_path, data) + self.assertEqual(403, resp.status_code) + def test_can_rename_file(self): self.login_as(self.user) new_name = randstring(6) - # check old file name exist + # check old file exist assert self.file_name == self.get_lib_file_name(self.repo_id) data = {'operation': 'rename', 'newname': new_name} resp = self.client.post(self.url + '?p=' + self.file_path, data) - self.assertEqual(200, resp.status_code) + # check old file has been renamed to new_name assert new_name == self.get_lib_file_name(self.repo_id) + def test_rename_file_with_invalid_repo_perm(self): + + # login as admin, then rename file in user's repo + self.login_as(self.admin) + + new_name = randstring(6) + data = {'operation': 'rename', 'newname': new_name} + + resp = self.client.post(self.url + '?p=' + self.file_path, data) + self.assertEqual(403, resp.status_code) + + def test_rename_file_with_invalid_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # create a file as old file in user repo sub-folder + old_file_name = randstring(6) + seafile_api.post_empty_file(repo_id=self.repo_id, + parent_dir=self.folder_path, filename=old_file_name, + username=self.user_name) + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit old file with 'r' permission + old_file_path = posixpath.join(self.folder_path, old_file_name) + assert seafile_api.check_permission_by_path(self.repo_id, + old_file_path, self.admin_name) == 'r' + + # login as admin, then rename a 'r' permission old file + self.login_as(self.admin) + + new_name = randstring(6) + data = {'operation': 'rename', 'newname': new_name} + + resp = self.client.post(self.url + '?p=' + old_file_path, data) + self.assertEqual(403, resp.status_code) + def test_can_move_file(self): self.login_as(self.user) @@ -105,8 +211,8 @@ class FileViewTest(BaseTestCase): 'dst_repo': dst_repo_id, 'dst_dir': '/', } - resp = self.client.post(self.url + '?p=' + self.file_path, data) + resp = self.client.post(self.url + '?p=' + self.file_path, data) self.assertEqual(200, resp.status_code) # check old file has been delete @@ -117,6 +223,113 @@ class FileViewTest(BaseTestCase): self.remove_repo(dst_repo_id) + def test_move_file_with_invalid_src_repo_perm(self): + + # login as admin, then move file in user's repo + self.login_as(self.admin) + + dst_repo_id = self.admin_create_new_repo() + data = { + 'operation': 'move', + 'dst_repo': dst_repo_id, + 'dst_dir': '/', + } + + resp = self.client.post(self.url + '?p=' + self.file_path, data) + self.assertEqual(403, resp.status_code) + + def test_move_file_with_invalid_src_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # create a file as old file in user repo sub-folder + old_file_name = randstring(6) + seafile_api.post_empty_file(repo_id=self.repo_id, + parent_dir=self.folder_path, filename=old_file_name, + username=self.user_name) + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit old file with 'r' permission + old_file_path = posixpath.join(self.folder_path, old_file_name) + assert seafile_api.check_permission_by_path(self.repo_id, + old_file_path, self.admin_name) == 'r' + + # login as admin, then move a 'r' permission file + self.login_as(self.admin) + + dst_repo_id = self.admin_create_new_repo() + data = { + 'operation': 'move', + 'dst_repo': dst_repo_id, + 'dst_dir': '/', + } + + resp = self.client.post(self.url + '?p=' + old_file_path, data) + + self.assertEqual(403, resp.status_code) + + def test_move_file_with_invalid_dst_repo_perm(self): + + # login as user, then move file to admin's repo + self.login_as(self.user) + + # create new repo for admin + dst_repo_id = self.admin_create_new_repo() + data = { + 'operation': 'move', + 'dst_repo': dst_repo_id, + 'dst_dir': '/', + } + resp = self.client.post(self.url + '?p=' + self.file_path, data) + + self.assertEqual(403, resp.status_code) + + def test_move_file_with_invalid_dst_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit sub-folder with 'r' permission + assert seafile_api.check_permission_by_path(self.repo_id, + self.folder_path, self.admin_name) == 'r' + + # create a file for admin repo + admin_repo_id = self.admin_create_new_repo() + admin_file_name = randstring(6) + seafile_api.post_empty_file(repo_id=admin_repo_id, + parent_dir='/', filename=admin_file_name, + username=self.admin_name) + + # login as admin, then move file to a 'r' permission folder + self.login_as(self.admin) + + # create new repo for admin + data = { + 'operation': 'move', + 'dst_repo': self.repo_id, + 'dst_dir': self.folder_path, + } + + url = reverse('api-v2.1-file-view', args=[admin_repo_id]) + resp = self.client.post(url + '?p=/' + admin_file_name, data) + self.assertEqual(403, resp.status_code) + def test_can_copy_file(self): self.login_as(self.user) @@ -142,50 +355,79 @@ class FileViewTest(BaseTestCase): self.remove_repo(dst_repo_id) - def test_can_delete_file(self): - self.login_as(self.user) + def test_copy_file_with_invalid_src_repo_perm(self): - # check old file name exist - assert self.file_name == self.get_lib_file_name(self.repo_id) + # login as admin, then copy file in user's repo + self.login_as(self.admin) - # delete file - resp = self.client.delete(self.url + '?p=' + self.file_path, {}, 'application/x-www-form-urlencoded') - - self.assertEqual(200, resp.status_code) - - # check old file has been deleted - assert None == self.get_lib_file_name(self.repo_id) - - def test_can_create_file(self): - self.login_as(self.user) - # delete old file - resp = self.client.delete(self.url + '?p=' + self.file_path, {}, 'application/x-www-form-urlencoded') - assert None == self.get_lib_file_name(self.repo_id) - - new_name = randstring(6) - new_file_path = '/' + new_name - - # create file + # copy file + dst_repo_id = self.admin_create_new_repo() data = { - 'operation': 'create', - } - resp = self.client.post(self.url + '?p=' + new_file_path, data) - - self.assertEqual(200, resp.status_code) - - # check old file still in old repo - assert new_name == self.get_lib_file_name(self.repo_id) - - def test_post_operation_invalid(self): - self.login_as(self.user) - # create file - data = { - 'operation': 'invalid', + 'operation': 'copy', + 'dst_repo': dst_repo_id, + 'dst_dir': '/', } resp = self.client.post(self.url + '?p=' + self.file_path, data) - self.assertEqual(400, resp.status_code) + self.assertEqual(403, resp.status_code) + + def test_copy_file_with_invalid_dst_repo_perm(self): + + # login as user, then copy file to admin's repo + self.login_as(self.user) + + # create new repo for admin + dst_repo_id = self.admin_create_new_repo() + data = { + 'operation': 'copy', + 'dst_repo': dst_repo_id, + 'dst_dir': '/', + } + resp = self.client.post(self.url + '?p=' + self.file_path, data) + + self.assertEqual(403, resp.status_code) + + def test_copy_file_with_invalid_dst_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit sub-folder with 'r' permission + assert seafile_api.check_permission_by_path(self.repo_id, + self.folder_path, self.admin_name) == 'r' + + # create a file for admin repo + admin_repo_id = self.admin_create_new_repo() + admin_file_name = randstring(6) + seafile_api.post_empty_file(repo_id=admin_repo_id, + parent_dir='/', filename=admin_file_name, + username=self.admin_name) + + # login as admin, then move file to a 'r' permission folder + self.login_as(self.admin) + + # create new repo for admin + data = { + 'operation': 'copy', + 'dst_repo': self.repo_id, + 'dst_dir': self.folder_path, + } + + url = reverse('api-v2.1-file-view', args=[admin_repo_id]) + resp = self.client.post(url + '?p=/' + admin_file_name, data) + self.assertEqual(403, resp.status_code) + + # for test http PUT request def test_can_lock_file(self): + if not LOCAL_PRO_DEV_ENV: return @@ -208,7 +450,54 @@ class FileViewTest(BaseTestCase): self.file_path.lstrip('/'), self.user.username) assert return_value == 2 + def test_lock_file_with_invalid_repo_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # login as admin, then lock file in user's repo + self.login_as(self.admin) + + # lock file + data = 'operation=lock' + resp = self.client.put(self.url + '?p=' + self.file_path, data, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code) + + def test_lock_file_with_invalid_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # create a file in user repo sub-folder + file_name = randstring(6) + seafile_api.post_empty_file(repo_id=self.repo_id, + parent_dir=self.folder_path, filename=file_name, + username=self.user_name) + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit file with 'r' permission + file_path = posixpath.join(self.folder_path, file_name) + assert seafile_api.check_permission_by_path(self.repo_id, + file_path, self.admin_name) == 'r' + + # login as admin, then lock a 'r' permission file + self.login_as(self.admin) + + data = 'operation=lock' + resp = self.client.put(self.url + '?p=' + file_path, + data, 'application/x-www-form-urlencoded') + + self.assertEqual(403, resp.status_code) + def test_can_unlock_file(self): + if not LOCAL_PRO_DEV_ENV: return @@ -233,3 +522,105 @@ class FileViewTest(BaseTestCase): return_value = seafile_api.check_file_lock(self.repo_id, self.file_path.lstrip('/'), self.user.username) assert return_value == 0 + + def test_unlock_file_with_invalid_repo_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # login as admin, then unlock file in user's repo + self.login_as(self.admin) + + # unlock file + data = 'operation=unlock' + resp = self.client.put(self.url + '?p=' + self.file_path, data, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code) + + def test_unlock_file_with_invalid_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # create a file in user repo sub-folder + file_name = randstring(6) + seafile_api.post_empty_file(repo_id=self.repo_id, + parent_dir=self.folder_path, filename=file_name, + username=self.user_name) + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit file with 'r' permission + file_path = posixpath.join(self.folder_path, file_name) + assert seafile_api.check_permission_by_path(self.repo_id, + file_path, self.admin_name) == 'r' + + # login as admin, then lock a 'r' permission file + self.login_as(self.admin) + + data = 'operation=unlock' + resp = self.client.put(self.url + '?p=' + file_path, + data, 'application/x-www-form-urlencoded') + + self.assertEqual(403, resp.status_code) + + # for test http DELETE request + def test_can_delete_file(self): + self.login_as(self.user) + + # check old file name exist + assert self.file_name == self.get_lib_file_name(self.repo_id) + + # delete file + resp = self.client.delete(self.url + '?p=' + self.file_path, + {}, 'application/x-www-form-urlencoded') + self.assertEqual(200, resp.status_code) + + # check old file has been deleted + assert None == self.get_lib_file_name(self.repo_id) + + def test_delete_file_with_invalid_repo_perm(self): + + # login as admin, then delete file in user's repo + self.login_as(self.admin) + + # delete file + resp = self.client.delete(self.url + '?p=' + self.file_path, + {}, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code) + + def test_delete_file_with_invalid_folder_perm(self): + + if not LOCAL_PRO_DEV_ENV: + return + + # create a file in user repo sub-folder + file_name = randstring(6) + seafile_api.post_empty_file(repo_id=self.repo_id, + parent_dir=self.folder_path, filename=file_name, + username=self.user_name) + + # share user's repo to admin with 'rw' permission + seafile_api.share_repo(self.repo_id, self.user_name, + self.admin_name, 'rw') + + # set sub-folder permisson as 'r' for admin + seafile_api.add_folder_user_perm(self.repo_id, + self.folder_path, 'r', self.admin_name) + + # admin can visit file with 'r' permission + file_path = posixpath.join(self.folder_path, file_name) + assert seafile_api.check_permission_by_path(self.repo_id, + file_path, self.admin_name) == 'r' + + # login as admin, then delete a 'r' permission file + self.login_as(self.admin) + + resp = self.client.delete(self.url + '?p=' + file_path, + {}, 'application/x-www-form-urlencoded') + self.assertEqual(403, resp.status_code)