mirror of
https://github.com/haiwen/seahub.git
synced 2025-09-08 18:30:53 +00:00
update v2.1 dir api (#2798)
* update api v2.1 dir api refactor this api add with_thumbnail parameter and some test * remove is_img, is_video and is_xmind field * remove ENABLE_THUMBNAIL check
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
# Copyright (c) 2012-2016 Seafile Ltd.
|
||||
import os
|
||||
import stat
|
||||
import logging
|
||||
import posixpath
|
||||
|
||||
@@ -8,17 +9,27 @@ from rest_framework.permissions import IsAuthenticated
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
from rest_framework import status
|
||||
from django.utils.http import urlquote
|
||||
|
||||
from seahub.api2.throttling import UserRateThrottle
|
||||
from seahub.api2.authentication import TokenAuthentication
|
||||
from seahub.api2.utils import api_error
|
||||
from seahub.api2.views import get_dir_file_recursively, \
|
||||
get_dir_entrys_by_id
|
||||
from seahub.api2.views import get_dir_file_recursively
|
||||
|
||||
from seahub.thumbnail.utils import get_thumbnail_src
|
||||
from seahub.views import check_folder_permission
|
||||
from seahub.utils import check_filename_with_rename, is_valid_dirent_name, \
|
||||
normalize_dir_path
|
||||
normalize_dir_path, normalize_file_path, is_pro_version, \
|
||||
FILEEXT_TYPE_MAP
|
||||
from seahub.utils.timeutils import timestamp_to_isoformat_timestr
|
||||
from seahub.utils.star import get_dir_starred_files
|
||||
from seahub.utils.file_tags import get_files_tags_in_dir
|
||||
from seahub.utils.file_types import IMAGE, VIDEO, XMIND
|
||||
from seahub.base.templatetags.seahub_tags import email2nickname, \
|
||||
email2contact_email
|
||||
|
||||
from seahub.settings import ENABLE_VIDEO_THUMBNAIL, \
|
||||
THUMBNAIL_ROOT
|
||||
|
||||
from seaserv import seafile_api
|
||||
from pysearpc import SearpcError
|
||||
@@ -66,53 +77,188 @@ class DirView(APIView):
|
||||
error_msg = "'t'(type) should be 'f' or 'd'."
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
with_thumbnail = request.GET.get('with_thumbnail', 'false')
|
||||
if with_thumbnail not in ('true', 'false'):
|
||||
error_msg = 'with_thumbnail invalid.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
if with_thumbnail == 'true':
|
||||
thumbnail_size = request.GET.get('thumbnail_size', 48)
|
||||
try:
|
||||
thumbnail_size = int(thumbnail_size)
|
||||
except ValueError:
|
||||
error_msg = 'thumbnail_size invalid.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
# recource check
|
||||
repo = seafile_api.get_repo(repo_id)
|
||||
if not repo:
|
||||
error_msg = 'Library %s not found.' % repo_id
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
path = request.GET.get('p', '/')
|
||||
path = normalize_dir_path(path)
|
||||
parent_dir = request.GET.get('p', '/')
|
||||
parent_dir = normalize_dir_path(parent_dir)
|
||||
|
||||
dir_id = seafile_api.get_dir_id_by_path(repo_id, path)
|
||||
dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir)
|
||||
if not dir_id:
|
||||
error_msg = 'Folder %s not found.' % path
|
||||
error_msg = 'Folder %s not found.' % parent_dir
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
# permission check
|
||||
permission = check_folder_permission(request, repo_id, path)
|
||||
permission = check_folder_permission(request, repo_id, parent_dir)
|
||||
if not permission:
|
||||
error_msg = 'Permission denied.'
|
||||
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
|
||||
|
||||
old_oid = request.GET.get('oid', None)
|
||||
if old_oid and old_oid == dir_id:
|
||||
resp = Response({'success': True})
|
||||
resp["oid"] = dir_id
|
||||
return resp
|
||||
|
||||
# get dir/file list recursively
|
||||
username = request.user.username
|
||||
if recursive == '1':
|
||||
result = []
|
||||
username = request.user.username
|
||||
dir_file_list = get_dir_file_recursively(username, repo_id, path, [])
|
||||
dir_file_info_list = get_dir_file_recursively(username, repo_id,
|
||||
parent_dir, [])
|
||||
if request_type == 'f':
|
||||
for item in dir_file_list:
|
||||
for item in dir_file_info_list:
|
||||
if item['type'] == 'file':
|
||||
result.append(item)
|
||||
elif request_type == 'd':
|
||||
for item in dir_file_list:
|
||||
for item in dir_file_info_list:
|
||||
if item['type'] == 'dir':
|
||||
result.append(item)
|
||||
else:
|
||||
result = dir_file_list
|
||||
result = dir_file_info_list
|
||||
|
||||
resp = Response(result)
|
||||
resp["oid"] = dir_id
|
||||
resp["dir_perm"] = permission
|
||||
return resp
|
||||
return Response(result)
|
||||
|
||||
return get_dir_entrys_by_id(request, repo, path, dir_id, request_type)
|
||||
# get dirent(folder and file) list
|
||||
try:
|
||||
dir_file_list = seafile_api.list_dir_with_perm(repo_id,
|
||||
parent_dir, dir_id, username, -1, -1)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_msg = 'Internal Server Error'
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||
|
||||
dir_info_list = []
|
||||
file_info_list = []
|
||||
|
||||
# only get dir info list
|
||||
if not request_type or request_type == 'd':
|
||||
dir_list = [dirent for dirent in dir_file_list if stat.S_ISDIR(dirent.mode)]
|
||||
for dirent in dir_list:
|
||||
dir_info = {}
|
||||
dir_info["type"] = "dir"
|
||||
dir_info["id"] = dirent.obj_id
|
||||
dir_info["name"] = dirent.obj_name
|
||||
dir_info["mtime"] = dirent.mtime
|
||||
dir_info["permission"] = dirent.permission
|
||||
dir_info_list.append(dir_info)
|
||||
|
||||
# only get file info list
|
||||
if not request_type or request_type == 'f':
|
||||
|
||||
file_list = [dirent for dirent in dir_file_list if not stat.S_ISDIR(dirent.mode)]
|
||||
|
||||
# Use dict to reduce memcache fetch cost in large for-loop.
|
||||
nickname_dict = {}
|
||||
contact_email_dict = {}
|
||||
modifier_set = set([x.modifier for x in file_list])
|
||||
lock_owner_set = set([x.lock_owner for x in file_list])
|
||||
for e in modifier_set | lock_owner_set:
|
||||
if e not in nickname_dict:
|
||||
nickname_dict[e] = email2nickname(e)
|
||||
if e not in contact_email_dict:
|
||||
contact_email_dict[e] = email2contact_email(e)
|
||||
|
||||
try:
|
||||
starred_files = get_dir_starred_files(username, repo_id, parent_dir)
|
||||
files_tags_in_dir = get_files_tags_in_dir(repo_id, parent_dir)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
starred_files = []
|
||||
files_tags_in_dir = {}
|
||||
|
||||
for dirent in file_list:
|
||||
|
||||
file_name = dirent.obj_name
|
||||
file_path = posixpath.join(parent_dir, file_name)
|
||||
file_obj_id = dirent.obj_id
|
||||
|
||||
file_info = {}
|
||||
file_info["type"] = "file"
|
||||
file_info["id"] = file_obj_id
|
||||
file_info["name"] = file_name
|
||||
file_info["mtime"] = dirent.mtime
|
||||
file_info["permission"] = dirent.permission
|
||||
file_info["size"] = dirent.size
|
||||
|
||||
modifier_email = dirent.modifier
|
||||
file_info['modifier_email'] = modifier_email
|
||||
file_info['modifier_name'] = nickname_dict.get(modifier_email, '')
|
||||
file_info['modifier_contact_email'] = contact_email_dict.get(modifier_email, '')
|
||||
|
||||
# get lock info
|
||||
if is_pro_version():
|
||||
file_info["is_locked"] = dirent.is_locked
|
||||
file_info["lock_time"] = dirent.lock_time
|
||||
|
||||
lock_owner_email = dirent.lock_owner or ''
|
||||
file_info["lock_owner"] = lock_owner_email
|
||||
file_info['lock_owner_name'] = nickname_dict.get(lock_owner_email, '')
|
||||
file_info['lock_owner_contact_email'] = contact_email_dict.get(lock_owner_email, '')
|
||||
|
||||
if username == lock_owner_email:
|
||||
file_info["locked_by_me"] = True
|
||||
else:
|
||||
file_info["locked_by_me"] = False
|
||||
|
||||
# get star info
|
||||
file_info['starred'] = False
|
||||
if normalize_file_path(file_path) in starred_files:
|
||||
file_info['starred'] = True
|
||||
|
||||
# get tag info
|
||||
file_tags = files_tags_in_dir.get(file_name, [])
|
||||
if file_tags:
|
||||
file_info['file_tags'] = []
|
||||
for file_tag in file_tags:
|
||||
file_info['file_tags'].append(file_tag)
|
||||
|
||||
# get thumbnail info
|
||||
if with_thumbnail == 'true' and not repo.encrypted:
|
||||
|
||||
# used for providing a way to determine
|
||||
# if send a request to create thumbnail.
|
||||
|
||||
fileExt = os.path.splitext(file_name)[1][1:].lower()
|
||||
file_type = FILEEXT_TYPE_MAP.get(fileExt)
|
||||
|
||||
if file_type in (IMAGE, XMIND) or \
|
||||
file_type == VIDEO and ENABLE_VIDEO_THUMBNAIL:
|
||||
|
||||
# if thumbnail has already been created, return its src.
|
||||
# Then web browser will use this src to get thumbnail instead of
|
||||
# recreating it.
|
||||
thumbnail_file_path = os.path.join(THUMBNAIL_ROOT,
|
||||
str(thumbnail_size), file_obj_id)
|
||||
if os.path.exists(thumbnail_file_path):
|
||||
src = get_thumbnail_src(repo_id, thumbnail_size, file_path)
|
||||
file_info['encoded_thumbnail_src'] = urlquote(src)
|
||||
|
||||
file_info_list.append(file_info)
|
||||
|
||||
dir_info_list.sort(lambda x, y: cmp(x['name'].lower(), y['name'].lower()))
|
||||
file_info_list.sort(lambda x, y: cmp(x['name'].lower(), y['name'].lower()))
|
||||
|
||||
if request_type == 'f':
|
||||
result = file_info_list
|
||||
elif request_type == 'd':
|
||||
result = dir_info_list
|
||||
else:
|
||||
result = dir_info_list + file_info_list
|
||||
|
||||
response = Response(result)
|
||||
response["dir_perm"] = permission
|
||||
return response
|
||||
|
||||
def post(self, request, repo_id, format=None):
|
||||
""" Create, rename, revert dir.
|
||||
|
@@ -11,6 +11,7 @@ from seahub.test_utils import BaseTestCase
|
||||
from seahub.utils import check_filename_with_rename
|
||||
|
||||
from tests.common.utils import randstring
|
||||
from seahub.settings import THUMBNAIL_ROOT
|
||||
|
||||
try:
|
||||
from seahub.settings import LOCAL_PRO_DEV_ENV
|
||||
@@ -42,6 +43,10 @@ class DirViewTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.repo_id = self.repo.id
|
||||
|
||||
self.file_path = self.file
|
||||
self.file_name = os.path.basename(self.file_path.rstrip('/'))
|
||||
|
||||
self.folder_path = self.folder
|
||||
self.folder_name = os.path.basename(self.folder_path)
|
||||
|
||||
@@ -54,15 +59,233 @@ class DirViewTest(BaseTestCase):
|
||||
self.remove_repo()
|
||||
|
||||
# for test http GET request
|
||||
def test_can_get_dir(self):
|
||||
def test_can_get(self):
|
||||
|
||||
self.login_as(self.user)
|
||||
resp = self.client.get(self.url)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert len(json_resp) == 2
|
||||
|
||||
assert json_resp[0]['type'] == 'dir'
|
||||
assert json_resp[0]['name'] == self.folder_name
|
||||
|
||||
assert json_resp[1]['type'] == 'file'
|
||||
assert json_resp[1]['name'] == self.file_name
|
||||
|
||||
def test_can_get_with_dir_type_parameter(self):
|
||||
|
||||
self.login_as(self.user)
|
||||
resp = self.client.get(self.url + '?t=d')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'dir'
|
||||
assert json_resp[0]['name'] == self.folder_name
|
||||
|
||||
def test_can_get_with_file_type_parameter(self):
|
||||
|
||||
self.login_as(self.user)
|
||||
resp = self.client.get(self.url + '?t=f')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == self.file_name
|
||||
|
||||
def test_can_get_with_recursive_parameter(self):
|
||||
|
||||
# create a sub folder
|
||||
new_dir_name = randstring(6)
|
||||
seafile_api.post_dir(self.repo_id, self.folder_path,
|
||||
new_dir_name, self.user_name)
|
||||
|
||||
self.login_as(self.user)
|
||||
resp = self.client.get(self.url + '?recursive=1')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert len(json_resp) == 3
|
||||
assert json_resp[0]['type'] == 'dir'
|
||||
assert json_resp[0]['name'] == self.folder_name
|
||||
assert json_resp[0]['parent_dir'] == '/'
|
||||
|
||||
assert json_resp[1]['type'] == 'dir'
|
||||
assert json_resp[1]['name'] == new_dir_name
|
||||
assert json_resp[1]['parent_dir'] == self.folder_path
|
||||
|
||||
assert json_resp[2]['type'] == 'file'
|
||||
assert json_resp[2]['name'] == self.file_name
|
||||
|
||||
def test_can_get_with_recursive_and_dir_type_parameter(self):
|
||||
|
||||
# create a sub folder
|
||||
new_dir_name = randstring(6)
|
||||
seafile_api.post_dir(self.repo_id, self.folder_path,
|
||||
new_dir_name, self.user_name)
|
||||
|
||||
self.login_as(self.user)
|
||||
resp = self.client.get(self.url + '?recursive=1&t=d')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert len(json_resp) == 2
|
||||
assert json_resp[0]['type'] == 'dir'
|
||||
assert json_resp[0]['name'] == self.folder_name
|
||||
assert json_resp[0]['parent_dir'] == '/'
|
||||
|
||||
assert json_resp[1]['type'] == 'dir'
|
||||
assert json_resp[1]['name'] == new_dir_name
|
||||
assert json_resp[1]['parent_dir'] == self.folder_path
|
||||
|
||||
def test_can_get_with_recursive_and_file_type_parameter(self):
|
||||
|
||||
# create a sub folder
|
||||
new_dir_name = randstring(6)
|
||||
seafile_api.post_dir(self.repo_id, self.folder_path,
|
||||
new_dir_name, self.user_name)
|
||||
|
||||
self.login_as(self.user)
|
||||
resp = self.client.get(self.url + '?recursive=1&t=f')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == self.file_name
|
||||
|
||||
def test_can_get_file_with_lock_info(self):
|
||||
|
||||
if not LOCAL_PRO_DEV_ENV:
|
||||
return
|
||||
|
||||
self.login_as(self.user)
|
||||
|
||||
# no lock owner info returned
|
||||
resp = self.client.get(self.url + '?t=f')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == self.file_name
|
||||
assert json_resp[0]['lock_owner'] == ''
|
||||
|
||||
# lock file
|
||||
seafile_api.lock_file(self.repo_id, self.file_path, self.admin_name, 1)
|
||||
|
||||
# return lock owner info
|
||||
resp = self.client.get(self.url + '?t=f')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == self.file_name
|
||||
assert json_resp[0]['lock_owner'] == self.admin_name
|
||||
|
||||
def test_can_get_file_with_star_info(self):
|
||||
|
||||
self.login_as(self.user)
|
||||
|
||||
# file is not starred
|
||||
resp = self.client.get(self.url + '?t=f')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == self.file_name
|
||||
assert json_resp[0]['starred'] == False
|
||||
|
||||
# star file
|
||||
resp = self.client.post(reverse('starredfiles'), {'repo_id': self.repo.id, 'p': self.file_path})
|
||||
self.assertEqual(201, resp.status_code)
|
||||
|
||||
# file is starred
|
||||
resp = self.client.get(self.url + '?t=f')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == self.file_name
|
||||
assert json_resp[0]['starred'] == True
|
||||
|
||||
def test_can_get_file_with_tag_info(self):
|
||||
|
||||
self.login_as(self.user)
|
||||
|
||||
# file has no tags
|
||||
resp = self.client.get(self.url + '?t=f')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == self.file_name
|
||||
assert not json_resp[0].has_key('file_tags')
|
||||
|
||||
# add file tag
|
||||
tag_name = randstring(6)
|
||||
tag_color = randstring(6)
|
||||
repo_tag_data = {'name': tag_name, 'color': tag_color}
|
||||
resp = self.client.post(reverse('api-v2.1-repo-tags', args=[self.repo_id]), repo_tag_data)
|
||||
json_resp = json.loads(resp.content)
|
||||
repo_tag_id = json_resp['repo_tag']['repo_tag_id']
|
||||
file_tag_data = {'file_path': self.file_path, 'repo_tag_id': repo_tag_id}
|
||||
resp = self.client.post(reverse('api-v2.1-file-tags', args=[self.repo_id]), file_tag_data)
|
||||
|
||||
# file has tag
|
||||
resp = self.client.get(self.url + '?t=f')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == self.file_name
|
||||
assert json_resp[0]['file_tags'][0]['repo_tag_id'] == repo_tag_id
|
||||
assert json_resp[0]['file_tags'][0]['tag_name'] == tag_name
|
||||
assert json_resp[0]['file_tags'][0]['tag_color'] == tag_color
|
||||
|
||||
def test_can_get_file_with_thumbnail_info(self):
|
||||
|
||||
self.login_as(self.user)
|
||||
|
||||
# create a image file
|
||||
image_file_name = randstring(6) + '.jpg'
|
||||
seafile_api.post_empty_file(self.repo_id, self.folder_path,
|
||||
image_file_name, self.user_name)
|
||||
|
||||
# file has no thumbnail
|
||||
resp = self.client.get(self.url + '?t=f&with_thumbnail=true&p=%s' % self.folder_path)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == image_file_name
|
||||
assert not json_resp[0].has_key('encoded_thumbnail_src')
|
||||
|
||||
file_id = json_resp[0]['id']
|
||||
|
||||
# prepare thumbnail
|
||||
size = 48
|
||||
thumbnail_dir = os.path.join(THUMBNAIL_ROOT, str(size))
|
||||
if not os.path.exists(thumbnail_dir):
|
||||
os.makedirs(thumbnail_dir)
|
||||
thumbnail_file = os.path.join(thumbnail_dir, file_id)
|
||||
|
||||
with open(thumbnail_file, 'w'):
|
||||
pass
|
||||
assert os.path.exists(thumbnail_file)
|
||||
|
||||
# file has thumbnail
|
||||
resp = self.client.get(self.url + '?t=f&with_thumbnail=true&p=%s' % self.folder_path)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
json_resp = json.loads(resp.content)
|
||||
assert len(json_resp) == 1
|
||||
assert json_resp[0]['type'] == 'file'
|
||||
assert json_resp[0]['name'] == image_file_name
|
||||
assert image_file_name in json_resp[0]['encoded_thumbnail_src']
|
||||
|
||||
def test_get_dir_with_invalid_perm(self):
|
||||
# login as admin, then get dir info in user's repo
|
||||
self.login_as(self.admin)
|
||||
@@ -206,7 +429,6 @@ class DirViewTest(BaseTestCase):
|
||||
|
||||
# check old file has been renamed to new_name
|
||||
json_resp = json.loads(resp.content)
|
||||
print old_folder_name, new_name, checked_name
|
||||
assert checked_name == json_resp['obj_name']
|
||||
|
||||
def test_rename_folder_with_invalid_repo_perm(self):
|
||||
|
Reference in New Issue
Block a user