1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-04-28 03:10:45 +00:00

repo commit dir

This commit is contained in:
sniper-py 2019-06-03 18:25:22 +08:00
parent 4f78baf41c
commit 9860fe63d7
5 changed files with 185 additions and 131 deletions

View File

@ -0,0 +1,87 @@
# Copyright (c) 2012-2019 Seafile Ltd.
# encoding: utf-8
import stat
import logging
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework import status
from seahub.api2.throttling import UserRateThrottle
from seahub.api2.authentication import TokenAuthentication
from seahub.api2.utils import api_error
from seahub.views import check_folder_permission
from seaserv import seafile_api
logger = logging.getLogger(__name__)
class RepoCommitDirView(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAuthenticated,)
throttle_classes = (UserRateThrottle,)
def _get_item_info(self, item, path):
item_info = {
'parent_dir': path,
'name': item.obj_name,
}
if stat.S_ISDIR(item.mode):
item_info['type'] = 'dir'
else:
item_info['type'] = 'file'
item_info['size'] = item.size
return item_info
def get(self, request, repo_id, commit_id, format=None):
""" List dir by commit
used when get files/dirs in a trash or history dir
Permission checking:
1. all authenticated user can perform this action.
"""
# argument check
path = request.GET.get('path', '/')
# resource check
repo = seafile_api.get_repo(repo_id)
if not repo:
error_msg = 'Library %s not found.' % repo_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
commit = seafile_api.get_commit(repo.id, repo.version, commit_id)
if not commit:
error_msg = 'Commit %s not found.' % commit
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
dir_id = seafile_api.get_dir_id_by_commit_and_path(repo_id, commit_id, path)
if not dir_id:
error_msg = 'Folder %s not found.' % path
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
if not check_folder_permission(request, repo_id, '/'):
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
# # seafile_api.list_dir_by_commit_and_path
# def list_dir_by_commit_and_path(self, repo_id, commit_id, path, offset=-1, limit=-1):
# dir_id = seafserv_threaded_rpc.get_dir_id_by_commit_and_path(repo_id, commit_id, path)
# if dir_id is None:
# return None
# return seafserv_threaded_rpc.list_dir(repo_id, dir_id, offset, limit)
dir_entries = seafile_api.list_dir_by_dir_id(repo_id, dir_id)
items = []
for dirent in dir_entries:
items.append(self._get_item_info(dirent, path))
return Response({'dirent_list': items})

View File

@ -164,72 +164,3 @@ class RepoTrash(APIView):
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
return Response({'success': True})
class RepoDirTrash(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAuthenticated, )
throttle_classes = (UserRateThrottle, )
def _get_item_info(self, item, path):
item_info = {
'parent_dir': path,
'obj_name': item.obj_name,
}
if stat.S_ISDIR(item.mode):
is_dir = True
else:
is_dir = False
item_info['is_dir'] = is_dir
item_info['size'] = item.size if not is_dir else ''
return item_info
def get(self, request, repo_id, format=None):
""" Return files/dirs in a deleted dir
Permission checking:
1. all authenticated user can perform this action.
"""
# argument check
path = request.GET.get('path', None)
if not path:
error_msg = 'path invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
commit_id = request.GET.get('commit_id', None)
if not commit_id:
error_msg = 'commit_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
# resource check
repo = seafile_api.get_repo(repo_id)
if not repo:
error_msg = 'Library %s not found.' % repo_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
commit = seafile_api.get_commit(repo.id, repo.version, commit_id)
if not commit:
error_msg = 'Commit %s not found.' % commit
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
if check_folder_permission(request, repo_id, '/') is None:
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
dir_entries = seafile_api.list_dir_by_commit_and_path(repo_id, commit_id, path)
if dir_entries is None:
error_msg = 'Folder %s not found.' % path
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
items = []
for dirent in dir_entries:
items.append(self._get_item_info(dirent, path))
return Response({'data': items})

View File

@ -51,7 +51,8 @@ from seahub.api2.endpoints.file_history import FileHistoryView, NewFileHistoryVi
from seahub.api2.endpoints.dir import DirView, DirDetailView
from seahub.api2.endpoints.file_tag import FileTagView
from seahub.api2.endpoints.file_tag import FileTagsView
from seahub.api2.endpoints.repo_trash import RepoTrash,RepoDirTrash
from seahub.api2.endpoints.repo_trash import RepoTrash
from seahub.api2.endpoints.repo_commit_dir import RepoCommitDirView
from seahub.api2.endpoints.deleted_repos import DeletedRepos
from seahub.api2.endpoints.repo_history import RepoHistory
from seahub.api2.endpoints.repo_set_password import RepoSetPassword
@ -324,9 +325,9 @@ urlpatterns = [
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/file/history/$', FileHistoryView.as_view(), name='api-v2.1-file-history-view'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/file/new_history/$', NewFileHistoryView.as_view(), name='api-v2.1-new-file-history-view'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/dir/$', DirView.as_view(), name='api-v2.1-dir-view'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/commits/(?P<commit_id>[0-9a-f]{40})/dir/$', RepoCommitDirView.as_view(), name='api-v2.1-repo-commit-dir'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/dir/detail/$', DirDetailView.as_view(), name='api-v2.1-dir-detail-view'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/trash/$', RepoTrash.as_view(), name='api-v2.1-repo-trash'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/trash/dir/$', RepoDirTrash.as_view(), name='api-v2.1-repo-dir-trash'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/history/$', RepoHistory.as_view(), name='api-v2.1-repo-history'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/set-password/$', RepoSetPassword.as_view(), name="api-v2.1-repo-set-password"),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/send-new-password/$', RepoSendNewPassword.as_view(), name="api-v2.1-repo-send-new-password"),

View File

@ -0,0 +1,95 @@
import os
import json
from django.core.urlresolvers import reverse
from seaserv import seafile_api
from seahub.test_utils import BaseTestCase
from tests.common.utils import randstring
class RepoCommitDirTest(BaseTestCase):
def setUp(self):
self.user_name = self.user.username
self.admin_name = self.admin.username
self.repo_id = self.repo.id
self.repo_name = self.repo.repo_name
self.file_path = self.file
self.file_name = os.path.basename(self.file_path)
self.folder_path = self.folder
self.folder_name = os.path.basename(self.folder.rstrip('/'))
self.inner_file_name = 'inner.txt'
self.inner_file = self.create_file(
repo_id=self.repo.id, parent_dir=self.folder_path,
filename=self.inner_file_name, username=self.user_name)
self.trash_url = reverse('api-v2.1-repo-trash', args=[self.repo_id])
def tearDown(self):
self.remove_repo()
self.remove_group()
def test_get(self):
# delete a folder first
seafile_api.del_file(self.repo_id, '/', self.folder_name, self.user_name)
self.login_as(self.user)
# get commit id
trash_resp = self.client.get(self.trash_url)
self.assertEqual(200, trash_resp.status_code)
trash_json_resp = json.loads(trash_resp.content)
assert trash_json_resp['data'][0]['obj_name'] == self.folder_name
assert trash_json_resp['data'][0]['is_dir']
assert trash_json_resp['data'][0]['commit_id']
commit_id = trash_json_resp['data'][0]['commit_id']
url = reverse('api-v2.1-repo-commit-dir', args=[self.repo_id, commit_id])
# test can get
resp = self.client.get(url + '?path=' + self.folder_path)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['dirent_list']
assert json_resp['dirent_list'][0]
assert json_resp['dirent_list'][0]['type'] == 'file'
assert json_resp['dirent_list'][0]['name'] == self.inner_file_name
assert json_resp['dirent_list'][0]['parent_dir'] == self.folder_path
assert json_resp['dirent_list'][0]['size'] == 0
# test can get without path
resp = self.client.get(url)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['dirent_list']
assert json_resp['dirent_list'][0]
assert json_resp['dirent_list'][0]['type'] == 'dir'
assert json_resp['dirent_list'][0]['name'] == self.folder_name
assert json_resp['dirent_list'][0]['parent_dir'] == '/'
assert json_resp['dirent_list'][0].get('size') is None
assert json_resp['dirent_list'][1]
assert json_resp['dirent_list'][1]['type'] == 'file'
assert json_resp['dirent_list'][1]['name'] == self.file_name
assert json_resp['dirent_list'][1]['parent_dir'] == '/'
assert json_resp['dirent_list'][1]['size'] == 0
# test_can_not_get_with_invalid_path_parameter
invalid_path = randstring(6)
resp = self.client.get(url + '?path=' + invalid_path)
self.assertEqual(404, resp.status_code)
# test_can_not_get_with_invalid_repo_permission
self.logout()
self.login_as(self.admin)
resp = self.client.get(url + '?path=' + self.folder_path)
self.assertEqual(403, resp.status_code)

View File

@ -88,63 +88,3 @@ class RepoTrashTest(BaseTestCase):
self.login_as(self.admin)
resp = self.client.delete(self.url)
self.assertEqual(403, resp.status_code)
class RepoDirTrashTest(BaseTestCase):
def setUp(self):
self.user_name = self.user.username
self.admin_name = self.admin.username
self.repo_id = self.repo.id
self.repo_name = self.repo.repo_name
self.file_path = self.file
self.file_name = os.path.basename(self.file_path)
self.folder_path = self.folder
self.folder_name = os.path.basename(self.folder.rstrip('/'))
self.url = reverse('api-v2.1-repo-dir-trash', args=[self.repo_id])
self.commit_id_url = reverse('api-v2.1-repo-trash', args=[self.repo_id])
def tearDown(self):
self.remove_repo()
self.remove_group()
def test_get(self):
# delete a folder first
seafile_api.del_file(self.repo_id, '/', self.folder_name, self.user_name)
self.login_as(self.user)
# get commit id
commit_id_resp = self.client.get(self.commit_id_url)
self.assertEqual(200, commit_id_resp.status_code)
commit_id_json_resp = json.loads(commit_id_resp.content)
assert commit_id_json_resp['data'][0]['obj_name'] == self.folder_name
assert commit_id_json_resp['data'][0]['is_dir']
assert commit_id_json_resp['data'][0]['commit_id']
commit_id = commit_id_json_resp['data'][0]['commit_id']
# test can get
resp = self.client.get(self.url + '?path=' + self.folder_path + '&commit_id=' + commit_id)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp.get('data') is not None
# test_can_not_get_with_invalid_path_parameter
invalid_path = randstring(6)
resp = self.client.get(self.url + '?path=' + invalid_path + '&commit_id=' + commit_id)
self.assertEqual(404, resp.status_code)
# test_can_not_get_with_invalid_repo_permission
self.logout()
self.login_as(self.admin)
resp = self.client.get(self.url + '?path=' + self.folder_path + '&commit_id=' + commit_id)
self.assertEqual(403, resp.status_code)