mirror of
https://github.com/haiwen/seahub.git
synced 2025-09-02 23:48:47 +00:00
add admin share/upload links api
1. get share/upload link info by token 1. get dirents of shared dir 1. get download/upload fileserver url of shared file/dir 1. check share/upload link password
This commit is contained in:
356
seahub/api2/endpoints/admin/share_links.py
Normal file
356
seahub/api2/endpoints/admin/share_links.py
Normal file
@@ -0,0 +1,356 @@
|
||||
# Copyright (c) 2012-2016 Seafile Ltd.
|
||||
import os
|
||||
import json
|
||||
import stat
|
||||
import logging
|
||||
import posixpath
|
||||
|
||||
from rest_framework.authentication import SessionAuthentication
|
||||
from rest_framework.permissions import IsAdminUser
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
from rest_framework import status
|
||||
|
||||
from django.contrib.auth.hashers import check_password
|
||||
|
||||
from seaserv import seafile_api
|
||||
import seaserv
|
||||
|
||||
from seahub.api2.authentication import TokenAuthentication
|
||||
from seahub.api2.throttling import UserRateThrottle
|
||||
from seahub.api2.utils import api_error
|
||||
|
||||
from seahub.base.templatetags.seahub_tags import email2nickname
|
||||
from seahub.share.models import FileShare
|
||||
from seahub.profile.models import Profile
|
||||
from seahub.utils import gen_file_get_url, gen_dir_zip_download_url, \
|
||||
is_windows_operating_system, gen_shared_link
|
||||
from seahub.utils.timeutils import timestamp_to_isoformat_timestr, \
|
||||
datetime_to_isoformat_timestr
|
||||
from seahub.views.file import send_file_access_msg
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_share_link_info(fileshare):
|
||||
data = {}
|
||||
token = fileshare.token
|
||||
|
||||
repo_id = fileshare.repo_id
|
||||
try:
|
||||
repo = seafile_api.get_repo(repo_id)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
repo = None
|
||||
|
||||
path = fileshare.path
|
||||
if path:
|
||||
obj_name = '/' if path == '/' else os.path.basename(path.rstrip('/'))
|
||||
else:
|
||||
obj_name = ''
|
||||
|
||||
if fileshare.expire_date:
|
||||
expire_date = datetime_to_isoformat_timestr(fileshare.expire_date)
|
||||
else:
|
||||
expire_date = ''
|
||||
|
||||
if fileshare.ctime:
|
||||
ctime = datetime_to_isoformat_timestr(fileshare.ctime)
|
||||
else:
|
||||
ctime = ''
|
||||
|
||||
ccnet_email = fileshare.username
|
||||
data['creator_email'] = ccnet_email
|
||||
data['creator_name'] = email2nickname(ccnet_email)
|
||||
data['creator_contact_email'] = \
|
||||
Profile.objects.get_contact_email_by_user(ccnet_email)
|
||||
|
||||
data['repo_id'] = repo_id
|
||||
data['repo_name'] = repo.repo_name if repo else ''
|
||||
|
||||
data['path'] = path
|
||||
data['obj_name'] = obj_name
|
||||
data['is_dir'] = True if fileshare.s_type == 'd' else False
|
||||
|
||||
data['token'] = token
|
||||
data['link'] = gen_shared_link(token, fileshare.s_type)
|
||||
data['view_cnt'] = fileshare.view_cnt
|
||||
data['ctime'] = ctime
|
||||
data['expire_date'] = expire_date
|
||||
data['is_expired'] = fileshare.is_expired()
|
||||
data['permissions'] = fileshare.get_permissions()
|
||||
return data
|
||||
|
||||
|
||||
class AdminShareLink(APIView):
|
||||
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAdminUser,)
|
||||
throttle_classes = (UserRateThrottle,)
|
||||
|
||||
def get(self, request, token):
|
||||
""" Get a special share link info.
|
||||
|
||||
Permission checking:
|
||||
1. only admin can perform this action.
|
||||
"""
|
||||
|
||||
try:
|
||||
sharelink = FileShare.objects.get(token=token)
|
||||
except FileShare.DoesNotExist:
|
||||
error_msg = 'Share link %s not found.' % token
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
link_info = get_share_link_info(sharelink)
|
||||
return Response(link_info)
|
||||
|
||||
|
||||
class AdminShareLinkDirents(APIView):
|
||||
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAdminUser,)
|
||||
throttle_classes = (UserRateThrottle,)
|
||||
|
||||
def get(self, request, token):
|
||||
""" Get dirents of shared download dir.
|
||||
|
||||
Permission checking:
|
||||
1. only admin can perform this action.
|
||||
"""
|
||||
|
||||
try:
|
||||
sharelink = FileShare.objects.get(token=token)
|
||||
except FileShare.DoesNotExist:
|
||||
error_msg = 'Share link %s not found.' % token
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
repo_id = sharelink.repo_id
|
||||
repo = seafile_api.get_repo(repo_id)
|
||||
if not repo:
|
||||
error_msg = 'Library not found.'
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
obj_path = sharelink.path
|
||||
obj_id = seafile_api.get_dir_id_by_path(repo_id, obj_path)
|
||||
if not obj_id:
|
||||
error_msg = 'Folder not found.'
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
req_path = request.GET.get('path', '/')
|
||||
|
||||
if req_path == '/':
|
||||
real_path = obj_path
|
||||
else:
|
||||
real_path = posixpath.join(obj_path, req_path.strip('/'))
|
||||
|
||||
if real_path[-1] != '/':
|
||||
real_path += '/'
|
||||
|
||||
real_obj_id = seafile_api.get_dir_id_by_path(repo_id, real_path)
|
||||
if not real_obj_id:
|
||||
error_msg = 'Folder %s not found.' % req_path
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
try:
|
||||
current_commit = seafile_api.get_commit_list(repo_id, 0, 1)[0]
|
||||
dirent_list = seafile_api.list_dir_by_commit_and_path(repo_id,
|
||||
current_commit.id, real_path, -1, -1)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_msg = 'Internal Server Error'
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||
|
||||
result = []
|
||||
for dirent in dirent_list:
|
||||
dirent_info = {}
|
||||
dirent_info['obj_name'] = dirent.obj_name
|
||||
dirent_info['path'] = posixpath.join(req_path, dirent.obj_name)
|
||||
dirent_info['size'] = dirent.size
|
||||
dirent_info['last_modified'] = timestamp_to_isoformat_timestr(dirent.mtime)
|
||||
if stat.S_ISDIR(dirent.mode):
|
||||
dirent_info['is_dir'] = True
|
||||
else:
|
||||
dirent_info['is_dir'] = False
|
||||
|
||||
result.append(dirent_info)
|
||||
|
||||
return Response(result)
|
||||
|
||||
|
||||
class AdminShareLinkDownload(APIView):
|
||||
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAdminUser,)
|
||||
throttle_classes = (UserRateThrottle,)
|
||||
|
||||
def get(self, request, token):
|
||||
""" Get FileServer download url of the shared file/dir.
|
||||
|
||||
Permission checking:
|
||||
1. only admin can perform this action.
|
||||
"""
|
||||
|
||||
try:
|
||||
sharelink = FileShare.objects.get(token=token)
|
||||
except FileShare.DoesNotExist:
|
||||
error_msg = 'Share link %s not found.' % token
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
repo_id = sharelink.repo_id
|
||||
repo = seafile_api.get_repo(repo_id)
|
||||
if not repo:
|
||||
error_msg = 'Library not found.'
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
result = {}
|
||||
obj_path = sharelink.path
|
||||
if sharelink.s_type == 'f':
|
||||
# download shared file
|
||||
obj_id = seafile_api.get_file_id_by_path(repo_id, obj_path)
|
||||
if not obj_id:
|
||||
error_msg = 'File not found.'
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
try:
|
||||
# `username` parameter only used for encrypted repo
|
||||
download_token = seafile_api.get_fileserver_access_token(repo_id,
|
||||
obj_id, 'download', sharelink.username, use_onetime=False)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_msg = 'Internal Server Error'
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||
|
||||
if not download_token:
|
||||
error_msg = 'Internal Server Error'
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||
|
||||
obj_name = os.path.basename(obj_path.rstrip('/'))
|
||||
result['download_link'] = gen_file_get_url(download_token, obj_name)
|
||||
else:
|
||||
# download (sub) file/folder in shared dir
|
||||
obj_id = seafile_api.get_dir_id_by_path(repo_id, obj_path)
|
||||
if not obj_id:
|
||||
error_msg = 'Folder not found.'
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
download_type = request.GET.get('type', None)
|
||||
if not download_type or download_type not in ('file', 'folder'):
|
||||
error_msg = 'type invalid.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
req_path = request.GET.get('path', None)
|
||||
if not req_path:
|
||||
error_msg = 'path invalid.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
if req_path == '/':
|
||||
real_path = obj_path
|
||||
else:
|
||||
real_path = posixpath.join(obj_path, req_path.strip('/'))
|
||||
|
||||
if download_type == 'file':
|
||||
# download sub file in shared dir
|
||||
real_obj_id = seafile_api.get_file_id_by_path(repo_id, real_path)
|
||||
if not real_obj_id:
|
||||
error_msg = 'File not found.'
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
try:
|
||||
download_token = seafile_api.get_fileserver_access_token(repo_id,
|
||||
real_obj_id, 'download', sharelink.username, use_onetime=False)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_msg = 'Internal Server Error'
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||
|
||||
if not download_token:
|
||||
error_msg = 'Internal Server Error'
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||
|
||||
file_name = os.path.basename(real_path.rstrip('/'))
|
||||
result['download_link'] = gen_file_get_url(download_token, file_name)
|
||||
else:
|
||||
# download sub folder in shared dir
|
||||
if real_path[-1] != '/':
|
||||
real_path += '/'
|
||||
|
||||
real_obj_id = seafile_api.get_dir_id_by_path(repo_id, real_path)
|
||||
if not real_obj_id:
|
||||
error_msg = 'Folder %s not found.' % req_path
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
dir_name = repo.name if real_path == '/' else \
|
||||
os.path.basename(real_path.rstrip('/'))
|
||||
|
||||
dir_size = seafile_api.get_dir_size(
|
||||
repo.store_id, repo.version, real_obj_id)
|
||||
if dir_size > seaserv.MAX_DOWNLOAD_DIR_SIZE:
|
||||
error_msg = 'Unable to download directory "%s": size is too large.' % dir_name
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
# get file server access token
|
||||
is_windows = 0
|
||||
if is_windows_operating_system(request):
|
||||
is_windows = 1
|
||||
|
||||
fake_obj_id = {
|
||||
'obj_id': real_obj_id,
|
||||
'dir_name': dir_name,
|
||||
'is_windows': is_windows
|
||||
}
|
||||
|
||||
try:
|
||||
zip_token = seafile_api.get_fileserver_access_token(repo_id,
|
||||
json.dumps(fake_obj_id), 'download-dir',
|
||||
sharelink.username, use_onetime=False)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
error_msg = 'Internal Server Error'
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||
|
||||
try:
|
||||
# used for file audit
|
||||
send_file_access_msg(request, repo, real_path, 'share-link')
|
||||
# used for traffic
|
||||
seaserv.send_message('seahub.stats', 'dir-download\t%s\t%s\t%s\t%s' %
|
||||
(repo_id, sharelink.username, real_obj_id, dir_size))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
result['download_link'] = gen_dir_zip_download_url(zip_token)
|
||||
|
||||
return Response(result)
|
||||
|
||||
|
||||
class AdminShareLinkCheckPassword(APIView):
|
||||
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAdminUser,)
|
||||
throttle_classes = (UserRateThrottle,)
|
||||
|
||||
def post(self, request, token):
|
||||
""" Check if password for an encrypted share link is correct.
|
||||
|
||||
Permission checking:
|
||||
1. only admin can perform this action.
|
||||
"""
|
||||
|
||||
try:
|
||||
sharelink = FileShare.objects.get(token=token)
|
||||
except FileShare.DoesNotExist:
|
||||
error_msg = 'Share link %s not found.' % token
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
if not sharelink.is_encrypted():
|
||||
error_msg = 'Share link is not encrypted.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
password = request.POST.get('password')
|
||||
if not password:
|
||||
error_msg = 'password invalid.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
if check_password(password, sharelink.password):
|
||||
return Response({'success': True})
|
||||
else:
|
||||
error_msg = 'Password is not correct.'
|
||||
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
|
166
seahub/api2/endpoints/admin/upload_links.py
Normal file
166
seahub/api2/endpoints/admin/upload_links.py
Normal file
@@ -0,0 +1,166 @@
|
||||
# Copyright (c) 2012-2016 Seafile Ltd.
|
||||
import os
|
||||
import logging
|
||||
|
||||
from rest_framework.authentication import SessionAuthentication
|
||||
from rest_framework.permissions import IsAdminUser
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
from rest_framework import status
|
||||
|
||||
from django.contrib.auth.hashers import check_password
|
||||
|
||||
from seaserv import seafile_api
|
||||
|
||||
from seahub.api2.utils import api_error
|
||||
from seahub.api2.authentication import TokenAuthentication
|
||||
from seahub.api2.throttling import UserRateThrottle
|
||||
|
||||
from seahub.base.templatetags.seahub_tags import email2nickname
|
||||
from seahub.utils import gen_file_upload_url, gen_shared_upload_link
|
||||
from seahub.utils.timeutils import datetime_to_isoformat_timestr
|
||||
|
||||
from seahub.share.models import UploadLinkShare
|
||||
from seahub.profile.models import Profile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_upload_link_info(uls):
|
||||
data = {}
|
||||
token = uls.token
|
||||
|
||||
repo_id = uls.repo_id
|
||||
try:
|
||||
repo = seafile_api.get_repo(repo_id)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
repo = None
|
||||
|
||||
path = uls.path
|
||||
if path:
|
||||
obj_name = '/' if path == '/' else os.path.basename(path.rstrip('/'))
|
||||
else:
|
||||
obj_name = ''
|
||||
|
||||
if uls.ctime:
|
||||
ctime = datetime_to_isoformat_timestr(uls.ctime)
|
||||
else:
|
||||
ctime = ''
|
||||
|
||||
data['repo_id'] = repo_id
|
||||
data['repo_name'] = repo.repo_name if repo else ''
|
||||
data['path'] = path
|
||||
data['obj_name'] = obj_name
|
||||
data['view_cnt'] = uls.view_cnt
|
||||
data['ctime'] = ctime
|
||||
data['link'] = gen_shared_upload_link(token)
|
||||
data['token'] = token
|
||||
|
||||
ccnet_email = uls.username
|
||||
data['creator_email'] = ccnet_email
|
||||
data['creator_name'] = email2nickname(ccnet_email)
|
||||
data['creator_contact_email'] = \
|
||||
Profile.objects.get_contact_email_by_user(ccnet_email)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class AdminUploadLink(APIView):
|
||||
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAdminUser,)
|
||||
throttle_classes = (UserRateThrottle,)
|
||||
|
||||
def get(self, request, token):
|
||||
""" Get a special upload link info.
|
||||
|
||||
Permission checking:
|
||||
1. only admin can perform this action.
|
||||
"""
|
||||
|
||||
try:
|
||||
uploadlink = UploadLinkShare.objects.get(token=token)
|
||||
except UploadLinkShare.DoesNotExist:
|
||||
error_msg = 'Upload link %s not found.' % token
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
link_info = get_upload_link_info(uploadlink)
|
||||
return Response(link_info)
|
||||
|
||||
|
||||
class AdminUploadLinkUpload(APIView):
|
||||
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAdminUser,)
|
||||
throttle_classes = (UserRateThrottle,)
|
||||
|
||||
def get(self, request, token):
|
||||
""" Get FileServer url of the shared file.
|
||||
|
||||
Permission checking:
|
||||
1. only admin can perform this action.
|
||||
"""
|
||||
|
||||
try:
|
||||
uploadlink = UploadLinkShare.objects.get(token=token)
|
||||
except UploadLinkShare.DoesNotExist:
|
||||
error_msg = 'Upload link %s not found.' % token
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
repo_id = uploadlink.repo_id
|
||||
repo = seafile_api.get_repo(repo_id)
|
||||
if not repo:
|
||||
error_msg = 'Library not found.'
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
path = uploadlink.path
|
||||
obj_id = seafile_api.get_dir_id_by_path(repo_id, path)
|
||||
if not obj_id:
|
||||
error_msg = 'Folder not found.'
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
upload_token = seafile_api.get_fileserver_access_token(repo_id,
|
||||
obj_id, 'upload', uploadlink.username, use_onetime=False)
|
||||
|
||||
if not upload_token:
|
||||
error_msg = 'Internal Server Error'
|
||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||
|
||||
result = {}
|
||||
result['upload_link'] = gen_file_upload_url(token, 'upload-api')
|
||||
|
||||
return Response(result)
|
||||
|
||||
class AdminUploadLinkCheckPassword(APIView):
|
||||
|
||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||
permission_classes = (IsAdminUser,)
|
||||
throttle_classes = (UserRateThrottle,)
|
||||
|
||||
def post(self, request, token):
|
||||
""" Check if password for an encrypted upload link is correct.
|
||||
|
||||
Permission checking:
|
||||
1. only admin can perform this action.
|
||||
"""
|
||||
|
||||
try:
|
||||
uploadlink = UploadLinkShare.objects.get(token=token)
|
||||
except UploadLinkShare.DoesNotExist:
|
||||
error_msg = 'Upload link %s not found.' % token
|
||||
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||
|
||||
if not uploadlink.is_encrypted():
|
||||
error_msg = 'Upload link is not encrypted.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
password = request.POST.get('password')
|
||||
if not password:
|
||||
error_msg = 'password invalid.'
|
||||
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||
|
||||
if check_password(password, uploadlink.password):
|
||||
return Response({'success': True})
|
||||
else:
|
||||
error_msg = 'Password is not correct.'
|
||||
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
|
@@ -42,6 +42,8 @@ from seahub.api2.endpoints.notifications import NotificationsView, NotificationV
|
||||
from seahub.api2.endpoints.user_enabled_modules import UserEnabledModulesView
|
||||
from seahub.api2.endpoints.repo_file_uploaded_bytes import RepoFileUploadedBytesView
|
||||
from seahub.api2.endpoints.user_avatar import UserAvatarView
|
||||
|
||||
# Admin
|
||||
from seahub.api2.endpoints.admin.login import Login
|
||||
from seahub.api2.endpoints.admin.file_audit import FileAudit
|
||||
from seahub.api2.endpoints.admin.file_update import FileUpdate
|
||||
@@ -58,6 +60,11 @@ from seahub.api2.endpoints.admin.groups import AdminGroups, AdminGroup
|
||||
from seahub.api2.endpoints.admin.group_libraries import AdminGroupLibraries, AdminGroupLibrary
|
||||
from seahub.api2.endpoints.admin.group_members import AdminGroupMembers, AdminGroupMember
|
||||
from seahub.api2.endpoints.admin.shares import AdminShares
|
||||
from seahub.api2.endpoints.admin.share_links import AdminShareLink, \
|
||||
AdminShareLinkDownload, AdminShareLinkCheckPassword, \
|
||||
AdminShareLinkDirents
|
||||
from seahub.api2.endpoints.admin.upload_links import AdminUploadLink, \
|
||||
AdminUploadLinkUpload, AdminUploadLinkCheckPassword
|
||||
from seahub.api2.endpoints.admin.users_batch import AdminUsersBatch
|
||||
from seahub.api2.endpoints.admin.logs import AdminLogs
|
||||
from seahub.api2.endpoints.admin.org_users import AdminOrgUsers, AdminOrgUser
|
||||
@@ -260,6 +267,23 @@ urlpatterns = patterns(
|
||||
url(r'^api/v2.1/admin/shares/$', AdminShares.as_view(), name='api-v2.1-admin-shares'),
|
||||
url(r'^api/v2.1/admin/admin-logs/$', AdminLogs.as_view(), name='api-v2.1-admin-admin-logs'),
|
||||
|
||||
## admin::share-links
|
||||
url(r'^api/v2.1/admin/share-links/(?P<token>[a-f0-9]+)/$', AdminShareLink.as_view(), name='api-v2.1-admin-share-link'),
|
||||
url(r'^api/v2.1/admin/share-links/(?P<token>[a-f0-9]+)/download/$',
|
||||
AdminShareLinkDownload.as_view(), name='api-v2.1-admin-share-link-download'),
|
||||
url(r'^api/v2.1/admin/share-links/(?P<token>[a-f0-9]+)/check-password/$',
|
||||
AdminShareLinkCheckPassword.as_view(), name='api-v2.1-admin-share-link-check-password'),
|
||||
url(r'^api/v2.1/admin/share-links/(?P<token>[a-f0-9]+)/dirents/$',
|
||||
AdminShareLinkDirents.as_view(), name='api-v2.1-admin-share-link-dirents'),
|
||||
|
||||
## admin::upload-links
|
||||
url(r'^api/v2.1/admin/upload-links/(?P<token>[a-f0-9]+)/$', AdminUploadLink.as_view(), name='api-v2.1-admin-upload-link'),
|
||||
url(r'^api/v2.1/admin/upload-links/(?P<token>[a-f0-9]+)/upload/$',
|
||||
AdminUploadLinkUpload.as_view(), name='api-v2.1-admin-upload-link-upload'),
|
||||
url(r'^api/v2.1/admin/upload-links/(?P<token>[a-f0-9]+)/check-password/$',
|
||||
AdminUploadLinkCheckPassword.as_view(), name='api-v2.1-admin-upload-link-check-password'),
|
||||
|
||||
## admin::users
|
||||
url(r'^api/v2.1/admin/users/batch/$', AdminUsersBatch.as_view(), name='api-v2.1-admin-users-batch'),
|
||||
|
||||
## admin::organizations
|
||||
|
@@ -451,6 +451,13 @@ def gen_file_get_url(token, filename):
|
||||
def gen_file_upload_url(token, op):
|
||||
return '%s/%s/%s' % (get_fileserver_root(), op, token)
|
||||
|
||||
def gen_dir_zip_download_url(token):
|
||||
"""
|
||||
Generate fileserver file url.
|
||||
Format: http://<domain:port>/files/<token>/<filename>
|
||||
"""
|
||||
return '%s/zip/%s' % (get_fileserver_root(), token)
|
||||
|
||||
def get_ccnet_server_addr_port():
|
||||
"""get ccnet server host and port"""
|
||||
return seaserv.CCNET_SERVER_ADDR, seaserv.CCNET_SERVER_PORT
|
||||
|
349
tests/api/endpoints/admin/test_share_links.py
Normal file
349
tests/api/endpoints/admin/test_share_links.py
Normal file
@@ -0,0 +1,349 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import json
|
||||
|
||||
from tests.common.utils import randstring
|
||||
from django.core.urlresolvers import reverse
|
||||
from seahub.test_utils import BaseTestCase
|
||||
from seahub.share.models import FileShare
|
||||
|
||||
from seaserv import seafile_api
|
||||
|
||||
try:
|
||||
from seahub.settings import LOCAL_PRO_DEV_ENV
|
||||
except ImportError:
|
||||
LOCAL_PRO_DEV_ENV = False
|
||||
|
||||
|
||||
class AdminShareLinkTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.repo_id = self.repo.id
|
||||
self.file_path= self.file
|
||||
self.folder_path= self.folder
|
||||
self.invalid_token = '00000000000000000000'
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_repo()
|
||||
|
||||
def _add_file_share_link(self, password=None):
|
||||
fs = FileShare.objects.create_file_link(
|
||||
self.user.username, self.repo.id, self.file, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _add_dir_share_link(self, password=None):
|
||||
fs = FileShare.objects.create_dir_link(
|
||||
self.user.username, self.repo.id, self.folder, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _remove_share_link(self, token):
|
||||
link = FileShare.objects.get(token=token)
|
||||
link.delete()
|
||||
|
||||
def test_get_file_share_link_info_by_token(self):
|
||||
self.login_as(self.admin)
|
||||
token = self._add_file_share_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert json_resp['token'] == token
|
||||
assert json_resp['is_dir'] == False
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_get_dir_share_link_info_by_token(self):
|
||||
self.login_as(self.admin)
|
||||
token = self._add_dir_share_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert json_resp['token'] == token
|
||||
assert json_resp['is_dir'] == True
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_get_share_link_info_with_invalid_permission(self):
|
||||
self.login_as(self.user)
|
||||
token = self._add_dir_share_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_get_share_link_info_with_invalid_share_token(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link',
|
||||
args=[self.invalid_token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(404, resp.status_code)
|
||||
|
||||
|
||||
class AdminShareLinkDirentsTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.repo_id = self.repo.id
|
||||
self.folder_path= self.folder
|
||||
self.invalid_token = '00000000000000000000'
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_repo()
|
||||
|
||||
def _add_dir_share_link(self, password=None):
|
||||
fs = FileShare.objects.create_dir_link(
|
||||
self.user.username, self.repo.id, self.folder, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _remove_share_link(self, token):
|
||||
link = FileShare.objects.get(token=token)
|
||||
link.delete()
|
||||
|
||||
def test_get_dirents(self):
|
||||
|
||||
username = self.user.username
|
||||
dir_name = randstring(6)
|
||||
file_name = randstring(6)
|
||||
|
||||
seafile_api.post_dir(self.repo_id,
|
||||
self.folder_path, dir_name, username)
|
||||
|
||||
seafile_api.post_empty_file(self.repo_id,
|
||||
self.folder_path, file_name, username)
|
||||
|
||||
self.login_as(self.admin)
|
||||
token = self._add_dir_share_link()
|
||||
url = reverse('api-v2.1-admin-share-link-dirents', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert json_resp[0]['is_dir'] == True
|
||||
assert dir_name in json_resp[0]['obj_name']
|
||||
assert json_resp[1]['is_dir'] == False
|
||||
assert file_name in json_resp[1]['obj_name']
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_get_dirents_with_invalid_permission(self):
|
||||
self.login_as(self.user)
|
||||
token = self._add_dir_share_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link-dirents', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_get_dirents_with_invalid_share_token(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link-dirents',
|
||||
args=[self.invalid_token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(404, resp.status_code)
|
||||
|
||||
|
||||
class AdminShareLinkDownloadTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.repo_id = self.repo.id
|
||||
self.file_path= self.file
|
||||
self.folder_path= self.folder
|
||||
self.invalid_token = '00000000000000000000'
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_repo()
|
||||
|
||||
def _add_dir_share_link(self, password=None):
|
||||
fs = FileShare.objects.create_dir_link(
|
||||
self.user.username, self.repo.id, self.folder, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _add_file_share_link(self, password=None):
|
||||
fs = FileShare.objects.create_file_link(
|
||||
self.user.username, self.repo.id, self.file, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _remove_share_link(self, token):
|
||||
link = FileShare.objects.get(token=token)
|
||||
link.delete()
|
||||
|
||||
def test_download_shared_file(self):
|
||||
self.login_as(self.admin)
|
||||
token = self._add_file_share_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link-download', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert '8082' in json_resp['download_link']
|
||||
assert 'files' in json_resp['download_link']
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_download_sub_file_in_shared_dir(self):
|
||||
|
||||
username = self.user.username
|
||||
file_name = randstring(6)
|
||||
seafile_api.post_empty_file(self.repo_id,
|
||||
self.folder_path, file_name, username)
|
||||
|
||||
self.login_as(self.admin)
|
||||
token = self._add_dir_share_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link-download', args=[token])
|
||||
resp = self.client.get(url + '?path=/%s&type=file' % file_name)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert '8082' in json_resp['download_link']
|
||||
assert 'files' in json_resp['download_link']
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_download_sub_dir_in_shared_dir(self):
|
||||
|
||||
username = self.user.username
|
||||
dir_name = randstring(6)
|
||||
seafile_api.post_dir(self.repo_id,
|
||||
self.folder_path, dir_name, username)
|
||||
|
||||
self.login_as(self.admin)
|
||||
token = self._add_dir_share_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link-download', args=[token])
|
||||
resp = self.client.get(url + '?path=/%s&type=folder' % dir_name)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert '8082' in json_resp['download_link']
|
||||
assert 'zip' in json_resp['download_link']
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_download_with_invalid_permission(self):
|
||||
self.login_as(self.user)
|
||||
token = self._add_dir_share_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link-download', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_download_with_invalid_share_token(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link-download',
|
||||
args=[self.invalid_token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(404, resp.status_code)
|
||||
|
||||
|
||||
class ShareLinkCheckPasswordTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.repo_id = self.repo.id
|
||||
self.file_path= self.file
|
||||
self.folder_path= self.folder
|
||||
self.invalid_token = '00000000000000000000'
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_repo()
|
||||
|
||||
def _add_file_share_link(self, password=None):
|
||||
fs = FileShare.objects.create_file_link(
|
||||
self.user.username, self.repo.id, self.file, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _add_dir_share_link(self, password=None):
|
||||
fs = FileShare.objects.create_dir_link(
|
||||
self.user.username, self.repo.id, self.folder, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _remove_share_link(self, token):
|
||||
link = FileShare.objects.get(token=token)
|
||||
link.delete()
|
||||
|
||||
def test_check_password(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
#### create file share link ####
|
||||
password = randstring(10)
|
||||
token = self._add_file_share_link(password)
|
||||
url = reverse('api-v2.1-admin-share-link-check-password', args=[token])
|
||||
|
||||
# check password for file share link
|
||||
resp = self.client.post(url, {'password': password})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
# remove file share link
|
||||
self._remove_share_link(token)
|
||||
|
||||
#### create dir share link ####
|
||||
password = randstring(10)
|
||||
token = self._add_dir_share_link(password)
|
||||
url = reverse('api-v2.1-admin-share-link-check-password', args=[token])
|
||||
|
||||
# check password for dir share link
|
||||
resp = self.client.post(url, {'password': password})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
# remove dir share link
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_invalid_password(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
password = randstring(10)
|
||||
token = self._add_file_share_link(password)
|
||||
url = reverse('api-v2.1-admin-share-link-check-password', args=[token])
|
||||
|
||||
# assert password is valid
|
||||
resp = self.client.post(url, {'password': password})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
# assert password is invalid
|
||||
resp = self.client.post(url, {'password': 'invalid_password'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_check_password_with_invalid_permission(self):
|
||||
self.login_as(self.user)
|
||||
token = self._add_dir_share_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link-check-password', args=[token])
|
||||
resp = self.client.post(url)
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
self._remove_share_link(token)
|
||||
|
||||
def test_check_password_with_invalid_share_token(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
url = reverse('api-v2.1-admin-share-link-check-password',
|
||||
args=[self.invalid_token])
|
||||
resp = self.client.post(url, {'password': 'invalid_password'})
|
||||
self.assertEqual(404, resp.status_code)
|
188
tests/api/endpoints/admin/test_upload_links.py
Normal file
188
tests/api/endpoints/admin/test_upload_links.py
Normal file
@@ -0,0 +1,188 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import json
|
||||
|
||||
from tests.common.utils import randstring
|
||||
from django.core.urlresolvers import reverse
|
||||
from seahub.test_utils import BaseTestCase
|
||||
from seahub.share.models import UploadLinkShare
|
||||
|
||||
try:
|
||||
from seahub.settings import LOCAL_PRO_DEV_ENV
|
||||
except ImportError:
|
||||
LOCAL_PRO_DEV_ENV = False
|
||||
|
||||
class AdminUploadLinkTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.repo_id = self.repo.id
|
||||
self.folder_path= self.folder
|
||||
self.invalid_token = '00000000000000000000'
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_repo()
|
||||
|
||||
def _add_upload_link(self, password=None):
|
||||
fs = UploadLinkShare.objects.create_upload_link_share(
|
||||
self.user.username, self.repo.id, self.folder_path, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _remove_upload_link(self, token):
|
||||
link = UploadLinkShare.objects.get(token=token)
|
||||
link.delete()
|
||||
|
||||
def test_get_upload_link_info(self):
|
||||
self.login_as(self.admin)
|
||||
token = self._add_upload_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-upload-link', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert json_resp['token'] == token
|
||||
|
||||
self._remove_upload_link(token)
|
||||
|
||||
def test_get_upload_link_info_with_invalid_permission(self):
|
||||
self.login_as(self.user)
|
||||
token = self._add_upload_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-upload-link', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
self._remove_upload_link(token)
|
||||
|
||||
def test_get_upload_link_info_with_invalid_token(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
url = reverse('api-v2.1-admin-upload-link',
|
||||
args=[self.invalid_token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(404, resp.status_code)
|
||||
|
||||
|
||||
class AdminUploadLinkUploadTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.repo_id = self.repo.id
|
||||
self.folder_path= self.folder
|
||||
self.invalid_token = '00000000000000000000'
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_repo()
|
||||
|
||||
def _add_upload_link(self, password=None):
|
||||
fs = UploadLinkShare.objects.create_upload_link_share(
|
||||
self.user.username, self.repo.id, self.folder_path, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _remove_upload_link(self, token):
|
||||
link = UploadLinkShare.objects.get(token=token)
|
||||
link.delete()
|
||||
|
||||
def test_upload(self):
|
||||
self.login_as(self.admin)
|
||||
token = self._add_upload_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-upload-link-upload', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
json_resp = json.loads(resp.content)
|
||||
|
||||
assert '8082' in json_resp['upload_link']
|
||||
assert 'upload' in json_resp['upload_link']
|
||||
|
||||
self._remove_upload_link(token)
|
||||
|
||||
def test_upload_with_invalid_permission(self):
|
||||
self.login_as(self.user)
|
||||
token = self._add_upload_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-upload-link-upload', args=[token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
self._remove_upload_link(token)
|
||||
|
||||
def test_get_upload_link_info_with_invalid_token(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
url = reverse('api-v2.1-admin-upload-link-upload',
|
||||
args=[self.invalid_token])
|
||||
resp = self.client.get(url)
|
||||
self.assertEqual(404, resp.status_code)
|
||||
|
||||
|
||||
class AdminUploadLinkCheckPasswordTest(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.repo_id = self.repo.id
|
||||
self.folder_path= self.folder
|
||||
self.invalid_token = '00000000000000000000'
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_repo()
|
||||
|
||||
def _add_upload_link(self, password=None):
|
||||
fs = UploadLinkShare.objects.create_upload_link_share(
|
||||
self.user.username, self.repo.id, self.folder_path, password, None)
|
||||
|
||||
return fs.token
|
||||
|
||||
def _remove_upload_link(self, token):
|
||||
link = UploadLinkShare.objects.get(token=token)
|
||||
link.delete()
|
||||
|
||||
def test_check_password(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
password = randstring(10)
|
||||
token = self._add_upload_link(password)
|
||||
url = reverse('api-v2.1-admin-upload-link-check-password', args=[token])
|
||||
|
||||
resp = self.client.post(url, {'password': password})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
self._remove_upload_link(token)
|
||||
|
||||
def test_invalid_password(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
password = randstring(10)
|
||||
token = self._add_upload_link(password)
|
||||
url = reverse('api-v2.1-admin-upload-link-check-password', args=[token])
|
||||
|
||||
# assert password is valid
|
||||
resp = self.client.post(url, {'password': password})
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
# assert password is invalid
|
||||
resp = self.client.post(url, {'password': 'invalid_password'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
self._remove_upload_link(token)
|
||||
|
||||
def test_check_password_with_invalid_permission(self):
|
||||
self.login_as(self.user)
|
||||
token = self._add_upload_link()
|
||||
|
||||
url = reverse('api-v2.1-admin-upload-link-check-password', args=[token])
|
||||
resp = self.client.post(url, {'password': 'invalid_password'})
|
||||
self.assertEqual(403, resp.status_code)
|
||||
|
||||
self._remove_upload_link(token)
|
||||
|
||||
def test_check_password_with_invalid_token(self):
|
||||
self.login_as(self.admin)
|
||||
|
||||
url = reverse('api-v2.1-admin-upload-link-check-password',
|
||||
args=[self.invalid_token])
|
||||
resp = self.client.post(url, {'password': 'invalid_password'})
|
||||
self.assertEqual(404, resp.status_code)
|
||||
|
||||
|
Reference in New Issue
Block a user