From 9750e8baaf8f56a8f1434ecefab1820fc1645d24 Mon Sep 17 00:00:00 2001 From: lian Date: Wed, 21 Jun 2017 14:48:48 +0800 Subject: [PATCH] add admin share/upload links api 1. get share/upload link info by token 1. get dirents of shared dir 1. get download/upload fileserver url of shared file/dir 1. check share/upload link password --- seahub/api2/endpoints/admin/share_links.py | 356 ++++++++++++++++++ seahub/api2/endpoints/admin/upload_links.py | 166 ++++++++ seahub/urls.py | 24 ++ seahub/utils/__init__.py | 7 + tests/api/endpoints/admin/test_share_links.py | 349 +++++++++++++++++ .../api/endpoints/admin/test_upload_links.py | 188 +++++++++ 6 files changed, 1090 insertions(+) create mode 100644 seahub/api2/endpoints/admin/share_links.py create mode 100644 seahub/api2/endpoints/admin/upload_links.py create mode 100644 tests/api/endpoints/admin/test_share_links.py create mode 100644 tests/api/endpoints/admin/test_upload_links.py diff --git a/seahub/api2/endpoints/admin/share_links.py b/seahub/api2/endpoints/admin/share_links.py new file mode 100644 index 0000000000..d8cd61a837 --- /dev/null +++ b/seahub/api2/endpoints/admin/share_links.py @@ -0,0 +1,356 @@ +# Copyright (c) 2012-2016 Seafile Ltd. +import os +import json +import stat +import logging +import posixpath + +from rest_framework.authentication import SessionAuthentication +from rest_framework.permissions import IsAdminUser +from rest_framework.response import Response +from rest_framework.views import APIView +from rest_framework import status + +from django.contrib.auth.hashers import check_password + +from seaserv import seafile_api +import seaserv + +from seahub.api2.authentication import TokenAuthentication +from seahub.api2.throttling import UserRateThrottle +from seahub.api2.utils import api_error + +from seahub.base.templatetags.seahub_tags import email2nickname +from seahub.share.models import FileShare +from seahub.profile.models import Profile +from seahub.utils import gen_file_get_url, gen_dir_zip_download_url, \ + is_windows_operating_system, gen_shared_link +from seahub.utils.timeutils import timestamp_to_isoformat_timestr, \ + datetime_to_isoformat_timestr +from seahub.views.file import send_file_access_msg + +logger = logging.getLogger(__name__) + +def get_share_link_info(fileshare): + data = {} + token = fileshare.token + + repo_id = fileshare.repo_id + try: + repo = seafile_api.get_repo(repo_id) + except Exception as e: + logger.error(e) + repo = None + + path = fileshare.path + if path: + obj_name = '/' if path == '/' else os.path.basename(path.rstrip('/')) + else: + obj_name = '' + + if fileshare.expire_date: + expire_date = datetime_to_isoformat_timestr(fileshare.expire_date) + else: + expire_date = '' + + if fileshare.ctime: + ctime = datetime_to_isoformat_timestr(fileshare.ctime) + else: + ctime = '' + + ccnet_email = fileshare.username + data['creator_email'] = ccnet_email + data['creator_name'] = email2nickname(ccnet_email) + data['creator_contact_email'] = \ + Profile.objects.get_contact_email_by_user(ccnet_email) + + data['repo_id'] = repo_id + data['repo_name'] = repo.repo_name if repo else '' + + data['path'] = path + data['obj_name'] = obj_name + data['is_dir'] = True if fileshare.s_type == 'd' else False + + data['token'] = token + data['link'] = gen_shared_link(token, fileshare.s_type) + data['view_cnt'] = fileshare.view_cnt + data['ctime'] = ctime + data['expire_date'] = expire_date + data['is_expired'] = fileshare.is_expired() + data['permissions'] = fileshare.get_permissions() + return data + + +class AdminShareLink(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAdminUser,) + throttle_classes = (UserRateThrottle,) + + def get(self, request, token): + """ Get a special share link info. + + Permission checking: + 1. only admin can perform this action. + """ + + try: + sharelink = FileShare.objects.get(token=token) + except FileShare.DoesNotExist: + error_msg = 'Share link %s not found.' % token + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + link_info = get_share_link_info(sharelink) + return Response(link_info) + + +class AdminShareLinkDirents(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAdminUser,) + throttle_classes = (UserRateThrottle,) + + def get(self, request, token): + """ Get dirents of shared download dir. + + Permission checking: + 1. only admin can perform this action. + """ + + try: + sharelink = FileShare.objects.get(token=token) + except FileShare.DoesNotExist: + error_msg = 'Share link %s not found.' % token + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + repo_id = sharelink.repo_id + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + obj_path = sharelink.path + obj_id = seafile_api.get_dir_id_by_path(repo_id, obj_path) + if not obj_id: + error_msg = 'Folder not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + req_path = request.GET.get('path', '/') + + if req_path == '/': + real_path = obj_path + else: + real_path = posixpath.join(obj_path, req_path.strip('/')) + + if real_path[-1] != '/': + real_path += '/' + + real_obj_id = seafile_api.get_dir_id_by_path(repo_id, real_path) + if not real_obj_id: + error_msg = 'Folder %s not found.' % req_path + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + try: + current_commit = seafile_api.get_commit_list(repo_id, 0, 1)[0] + dirent_list = seafile_api.list_dir_by_commit_and_path(repo_id, + current_commit.id, real_path, -1, -1) + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + result = [] + for dirent in dirent_list: + dirent_info = {} + dirent_info['obj_name'] = dirent.obj_name + dirent_info['path'] = posixpath.join(req_path, dirent.obj_name) + dirent_info['size'] = dirent.size + dirent_info['last_modified'] = timestamp_to_isoformat_timestr(dirent.mtime) + if stat.S_ISDIR(dirent.mode): + dirent_info['is_dir'] = True + else: + dirent_info['is_dir'] = False + + result.append(dirent_info) + + return Response(result) + + +class AdminShareLinkDownload(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAdminUser,) + throttle_classes = (UserRateThrottle,) + + def get(self, request, token): + """ Get FileServer download url of the shared file/dir. + + Permission checking: + 1. only admin can perform this action. + """ + + try: + sharelink = FileShare.objects.get(token=token) + except FileShare.DoesNotExist: + error_msg = 'Share link %s not found.' % token + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + repo_id = sharelink.repo_id + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + result = {} + obj_path = sharelink.path + if sharelink.s_type == 'f': + # download shared file + obj_id = seafile_api.get_file_id_by_path(repo_id, obj_path) + if not obj_id: + error_msg = 'File not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + try: + # `username` parameter only used for encrypted repo + download_token = seafile_api.get_fileserver_access_token(repo_id, + obj_id, 'download', sharelink.username, use_onetime=False) + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + if not download_token: + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + obj_name = os.path.basename(obj_path.rstrip('/')) + result['download_link'] = gen_file_get_url(download_token, obj_name) + else: + # download (sub) file/folder in shared dir + obj_id = seafile_api.get_dir_id_by_path(repo_id, obj_path) + if not obj_id: + error_msg = 'Folder not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + download_type = request.GET.get('type', None) + if not download_type or download_type not in ('file', 'folder'): + error_msg = 'type invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + req_path = request.GET.get('path', None) + if not req_path: + error_msg = 'path invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if req_path == '/': + real_path = obj_path + else: + real_path = posixpath.join(obj_path, req_path.strip('/')) + + if download_type == 'file': + # download sub file in shared dir + real_obj_id = seafile_api.get_file_id_by_path(repo_id, real_path) + if not real_obj_id: + error_msg = 'File not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + try: + download_token = seafile_api.get_fileserver_access_token(repo_id, + real_obj_id, 'download', sharelink.username, use_onetime=False) + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + if not download_token: + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + file_name = os.path.basename(real_path.rstrip('/')) + result['download_link'] = gen_file_get_url(download_token, file_name) + else: + # download sub folder in shared dir + if real_path[-1] != '/': + real_path += '/' + + real_obj_id = seafile_api.get_dir_id_by_path(repo_id, real_path) + if not real_obj_id: + error_msg = 'Folder %s not found.' % req_path + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + dir_name = repo.name if real_path == '/' else \ + os.path.basename(real_path.rstrip('/')) + + dir_size = seafile_api.get_dir_size( + repo.store_id, repo.version, real_obj_id) + if dir_size > seaserv.MAX_DOWNLOAD_DIR_SIZE: + error_msg = 'Unable to download directory "%s": size is too large.' % dir_name + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + # get file server access token + is_windows = 0 + if is_windows_operating_system(request): + is_windows = 1 + + fake_obj_id = { + 'obj_id': real_obj_id, + 'dir_name': dir_name, + 'is_windows': is_windows + } + + try: + zip_token = seafile_api.get_fileserver_access_token(repo_id, + json.dumps(fake_obj_id), 'download-dir', + sharelink.username, use_onetime=False) + except Exception as e: + logger.error(e) + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + try: + # used for file audit + send_file_access_msg(request, repo, real_path, 'share-link') + # used for traffic + seaserv.send_message('seahub.stats', 'dir-download\t%s\t%s\t%s\t%s' % + (repo_id, sharelink.username, real_obj_id, dir_size)) + except Exception as e: + logger.error(e) + + result['download_link'] = gen_dir_zip_download_url(zip_token) + + return Response(result) + + +class AdminShareLinkCheckPassword(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAdminUser,) + throttle_classes = (UserRateThrottle,) + + def post(self, request, token): + """ Check if password for an encrypted share link is correct. + + Permission checking: + 1. only admin can perform this action. + """ + + try: + sharelink = FileShare.objects.get(token=token) + except FileShare.DoesNotExist: + error_msg = 'Share link %s not found.' % token + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not sharelink.is_encrypted(): + error_msg = 'Share link is not encrypted.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + password = request.POST.get('password') + if not password: + error_msg = 'password invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if check_password(password, sharelink.password): + return Response({'success': True}) + else: + error_msg = 'Password is not correct.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) diff --git a/seahub/api2/endpoints/admin/upload_links.py b/seahub/api2/endpoints/admin/upload_links.py new file mode 100644 index 0000000000..6b10188566 --- /dev/null +++ b/seahub/api2/endpoints/admin/upload_links.py @@ -0,0 +1,166 @@ +# Copyright (c) 2012-2016 Seafile Ltd. +import os +import logging + +from rest_framework.authentication import SessionAuthentication +from rest_framework.permissions import IsAdminUser +from rest_framework.response import Response +from rest_framework.views import APIView +from rest_framework import status + +from django.contrib.auth.hashers import check_password + +from seaserv import seafile_api + +from seahub.api2.utils import api_error +from seahub.api2.authentication import TokenAuthentication +from seahub.api2.throttling import UserRateThrottle + +from seahub.base.templatetags.seahub_tags import email2nickname +from seahub.utils import gen_file_upload_url, gen_shared_upload_link +from seahub.utils.timeutils import datetime_to_isoformat_timestr + +from seahub.share.models import UploadLinkShare +from seahub.profile.models import Profile + +logger = logging.getLogger(__name__) + +def get_upload_link_info(uls): + data = {} + token = uls.token + + repo_id = uls.repo_id + try: + repo = seafile_api.get_repo(repo_id) + except Exception as e: + logger.error(e) + repo = None + + path = uls.path + if path: + obj_name = '/' if path == '/' else os.path.basename(path.rstrip('/')) + else: + obj_name = '' + + if uls.ctime: + ctime = datetime_to_isoformat_timestr(uls.ctime) + else: + ctime = '' + + data['repo_id'] = repo_id + data['repo_name'] = repo.repo_name if repo else '' + data['path'] = path + data['obj_name'] = obj_name + data['view_cnt'] = uls.view_cnt + data['ctime'] = ctime + data['link'] = gen_shared_upload_link(token) + data['token'] = token + + ccnet_email = uls.username + data['creator_email'] = ccnet_email + data['creator_name'] = email2nickname(ccnet_email) + data['creator_contact_email'] = \ + Profile.objects.get_contact_email_by_user(ccnet_email) + + return data + + +class AdminUploadLink(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAdminUser,) + throttle_classes = (UserRateThrottle,) + + def get(self, request, token): + """ Get a special upload link info. + + Permission checking: + 1. only admin can perform this action. + """ + + try: + uploadlink = UploadLinkShare.objects.get(token=token) + except UploadLinkShare.DoesNotExist: + error_msg = 'Upload link %s not found.' % token + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + link_info = get_upload_link_info(uploadlink) + return Response(link_info) + + +class AdminUploadLinkUpload(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAdminUser,) + throttle_classes = (UserRateThrottle,) + + def get(self, request, token): + """ Get FileServer url of the shared file. + + Permission checking: + 1. only admin can perform this action. + """ + + try: + uploadlink = UploadLinkShare.objects.get(token=token) + except UploadLinkShare.DoesNotExist: + error_msg = 'Upload link %s not found.' % token + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + repo_id = uploadlink.repo_id + repo = seafile_api.get_repo(repo_id) + if not repo: + error_msg = 'Library not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + path = uploadlink.path + obj_id = seafile_api.get_dir_id_by_path(repo_id, path) + if not obj_id: + error_msg = 'Folder not found.' + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + upload_token = seafile_api.get_fileserver_access_token(repo_id, + obj_id, 'upload', uploadlink.username, use_onetime=False) + + if not upload_token: + error_msg = 'Internal Server Error' + return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg) + + result = {} + result['upload_link'] = gen_file_upload_url(token, 'upload-api') + + return Response(result) + +class AdminUploadLinkCheckPassword(APIView): + + authentication_classes = (TokenAuthentication, SessionAuthentication) + permission_classes = (IsAdminUser,) + throttle_classes = (UserRateThrottle,) + + def post(self, request, token): + """ Check if password for an encrypted upload link is correct. + + Permission checking: + 1. only admin can perform this action. + """ + + try: + uploadlink = UploadLinkShare.objects.get(token=token) + except UploadLinkShare.DoesNotExist: + error_msg = 'Upload link %s not found.' % token + return api_error(status.HTTP_404_NOT_FOUND, error_msg) + + if not uploadlink.is_encrypted(): + error_msg = 'Upload link is not encrypted.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + password = request.POST.get('password') + if not password: + error_msg = 'password invalid.' + return api_error(status.HTTP_400_BAD_REQUEST, error_msg) + + if check_password(password, uploadlink.password): + return Response({'success': True}) + else: + error_msg = 'Password is not correct.' + return api_error(status.HTTP_403_FORBIDDEN, error_msg) diff --git a/seahub/urls.py b/seahub/urls.py index f8b1937c97..ed39134483 100644 --- a/seahub/urls.py +++ b/seahub/urls.py @@ -42,6 +42,8 @@ from seahub.api2.endpoints.notifications import NotificationsView, NotificationV from seahub.api2.endpoints.user_enabled_modules import UserEnabledModulesView from seahub.api2.endpoints.repo_file_uploaded_bytes import RepoFileUploadedBytesView from seahub.api2.endpoints.user_avatar import UserAvatarView + +# Admin from seahub.api2.endpoints.admin.login import Login from seahub.api2.endpoints.admin.file_audit import FileAudit from seahub.api2.endpoints.admin.file_update import FileUpdate @@ -58,6 +60,11 @@ from seahub.api2.endpoints.admin.groups import AdminGroups, AdminGroup from seahub.api2.endpoints.admin.group_libraries import AdminGroupLibraries, AdminGroupLibrary from seahub.api2.endpoints.admin.group_members import AdminGroupMembers, AdminGroupMember from seahub.api2.endpoints.admin.shares import AdminShares +from seahub.api2.endpoints.admin.share_links import AdminShareLink, \ + AdminShareLinkDownload, AdminShareLinkCheckPassword, \ + AdminShareLinkDirents +from seahub.api2.endpoints.admin.upload_links import AdminUploadLink, \ + AdminUploadLinkUpload, AdminUploadLinkCheckPassword from seahub.api2.endpoints.admin.users_batch import AdminUsersBatch from seahub.api2.endpoints.admin.logs import AdminLogs from seahub.api2.endpoints.admin.org_users import AdminOrgUsers, AdminOrgUser @@ -260,6 +267,23 @@ urlpatterns = patterns( url(r'^api/v2.1/admin/shares/$', AdminShares.as_view(), name='api-v2.1-admin-shares'), url(r'^api/v2.1/admin/admin-logs/$', AdminLogs.as_view(), name='api-v2.1-admin-admin-logs'), + ## admin::share-links + url(r'^api/v2.1/admin/share-links/(?P[a-f0-9]+)/$', AdminShareLink.as_view(), name='api-v2.1-admin-share-link'), + url(r'^api/v2.1/admin/share-links/(?P[a-f0-9]+)/download/$', + AdminShareLinkDownload.as_view(), name='api-v2.1-admin-share-link-download'), + url(r'^api/v2.1/admin/share-links/(?P[a-f0-9]+)/check-password/$', + AdminShareLinkCheckPassword.as_view(), name='api-v2.1-admin-share-link-check-password'), + url(r'^api/v2.1/admin/share-links/(?P[a-f0-9]+)/dirents/$', + AdminShareLinkDirents.as_view(), name='api-v2.1-admin-share-link-dirents'), + + ## admin::upload-links + url(r'^api/v2.1/admin/upload-links/(?P[a-f0-9]+)/$', AdminUploadLink.as_view(), name='api-v2.1-admin-upload-link'), + url(r'^api/v2.1/admin/upload-links/(?P[a-f0-9]+)/upload/$', + AdminUploadLinkUpload.as_view(), name='api-v2.1-admin-upload-link-upload'), + url(r'^api/v2.1/admin/upload-links/(?P[a-f0-9]+)/check-password/$', + AdminUploadLinkCheckPassword.as_view(), name='api-v2.1-admin-upload-link-check-password'), + + ## admin::users url(r'^api/v2.1/admin/users/batch/$', AdminUsersBatch.as_view(), name='api-v2.1-admin-users-batch'), ## admin::organizations diff --git a/seahub/utils/__init__.py b/seahub/utils/__init__.py index 82aed1bf21..c2ee586cc1 100644 --- a/seahub/utils/__init__.py +++ b/seahub/utils/__init__.py @@ -451,6 +451,13 @@ def gen_file_get_url(token, filename): def gen_file_upload_url(token, op): return '%s/%s/%s' % (get_fileserver_root(), op, token) +def gen_dir_zip_download_url(token): + """ + Generate fileserver file url. + Format: http:///files// + """ + return '%s/zip/%s' % (get_fileserver_root(), token) + def get_ccnet_server_addr_port(): """get ccnet server host and port""" return seaserv.CCNET_SERVER_ADDR, seaserv.CCNET_SERVER_PORT diff --git a/tests/api/endpoints/admin/test_share_links.py b/tests/api/endpoints/admin/test_share_links.py new file mode 100644 index 0000000000..97b0ddc439 --- /dev/null +++ b/tests/api/endpoints/admin/test_share_links.py @@ -0,0 +1,349 @@ +# -*- coding: utf-8 -*- +import json + +from tests.common.utils import randstring +from django.core.urlresolvers import reverse +from seahub.test_utils import BaseTestCase +from seahub.share.models import FileShare + +from seaserv import seafile_api + +try: + from seahub.settings import LOCAL_PRO_DEV_ENV +except ImportError: + LOCAL_PRO_DEV_ENV = False + + +class AdminShareLinkTest(BaseTestCase): + + def setUp(self): + self.repo_id = self.repo.id + self.file_path= self.file + self.folder_path= self.folder + self.invalid_token = '00000000000000000000' + + def tearDown(self): + self.remove_repo() + + def _add_file_share_link(self, password=None): + fs = FileShare.objects.create_file_link( + self.user.username, self.repo.id, self.file, password, None) + + return fs.token + + def _add_dir_share_link(self, password=None): + fs = FileShare.objects.create_dir_link( + self.user.username, self.repo.id, self.folder, password, None) + + return fs.token + + def _remove_share_link(self, token): + link = FileShare.objects.get(token=token) + link.delete() + + def test_get_file_share_link_info_by_token(self): + self.login_as(self.admin) + token = self._add_file_share_link() + + url = reverse('api-v2.1-admin-share-link', args=[token]) + resp = self.client.get(url) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert json_resp['token'] == token + assert json_resp['is_dir'] == False + + self._remove_share_link(token) + + def test_get_dir_share_link_info_by_token(self): + self.login_as(self.admin) + token = self._add_dir_share_link() + + url = reverse('api-v2.1-admin-share-link', args=[token]) + resp = self.client.get(url) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert json_resp['token'] == token + assert json_resp['is_dir'] == True + + self._remove_share_link(token) + + def test_get_share_link_info_with_invalid_permission(self): + self.login_as(self.user) + token = self._add_dir_share_link() + + url = reverse('api-v2.1-admin-share-link', args=[token]) + resp = self.client.get(url) + self.assertEqual(403, resp.status_code) + + self._remove_share_link(token) + + def test_get_share_link_info_with_invalid_share_token(self): + self.login_as(self.admin) + + url = reverse('api-v2.1-admin-share-link', + args=[self.invalid_token]) + resp = self.client.get(url) + self.assertEqual(404, resp.status_code) + + +class AdminShareLinkDirentsTest(BaseTestCase): + + def setUp(self): + self.repo_id = self.repo.id + self.folder_path= self.folder + self.invalid_token = '00000000000000000000' + + def tearDown(self): + self.remove_repo() + + def _add_dir_share_link(self, password=None): + fs = FileShare.objects.create_dir_link( + self.user.username, self.repo.id, self.folder, password, None) + + return fs.token + + def _remove_share_link(self, token): + link = FileShare.objects.get(token=token) + link.delete() + + def test_get_dirents(self): + + username = self.user.username + dir_name = randstring(6) + file_name = randstring(6) + + seafile_api.post_dir(self.repo_id, + self.folder_path, dir_name, username) + + seafile_api.post_empty_file(self.repo_id, + self.folder_path, file_name, username) + + self.login_as(self.admin) + token = self._add_dir_share_link() + url = reverse('api-v2.1-admin-share-link-dirents', args=[token]) + resp = self.client.get(url) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert json_resp[0]['is_dir'] == True + assert dir_name in json_resp[0]['obj_name'] + assert json_resp[1]['is_dir'] == False + assert file_name in json_resp[1]['obj_name'] + + self._remove_share_link(token) + + def test_get_dirents_with_invalid_permission(self): + self.login_as(self.user) + token = self._add_dir_share_link() + + url = reverse('api-v2.1-admin-share-link-dirents', args=[token]) + resp = self.client.get(url) + self.assertEqual(403, resp.status_code) + + self._remove_share_link(token) + + def test_get_dirents_with_invalid_share_token(self): + self.login_as(self.admin) + + url = reverse('api-v2.1-admin-share-link-dirents', + args=[self.invalid_token]) + resp = self.client.get(url) + self.assertEqual(404, resp.status_code) + + +class AdminShareLinkDownloadTest(BaseTestCase): + + def setUp(self): + self.repo_id = self.repo.id + self.file_path= self.file + self.folder_path= self.folder + self.invalid_token = '00000000000000000000' + + def tearDown(self): + self.remove_repo() + + def _add_dir_share_link(self, password=None): + fs = FileShare.objects.create_dir_link( + self.user.username, self.repo.id, self.folder, password, None) + + return fs.token + + def _add_file_share_link(self, password=None): + fs = FileShare.objects.create_file_link( + self.user.username, self.repo.id, self.file, password, None) + + return fs.token + + def _remove_share_link(self, token): + link = FileShare.objects.get(token=token) + link.delete() + + def test_download_shared_file(self): + self.login_as(self.admin) + token = self._add_file_share_link() + + url = reverse('api-v2.1-admin-share-link-download', args=[token]) + resp = self.client.get(url) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert '8082' in json_resp['download_link'] + assert 'files' in json_resp['download_link'] + + self._remove_share_link(token) + + def test_download_sub_file_in_shared_dir(self): + + username = self.user.username + file_name = randstring(6) + seafile_api.post_empty_file(self.repo_id, + self.folder_path, file_name, username) + + self.login_as(self.admin) + token = self._add_dir_share_link() + + url = reverse('api-v2.1-admin-share-link-download', args=[token]) + resp = self.client.get(url + '?path=/%s&type=file' % file_name) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert '8082' in json_resp['download_link'] + assert 'files' in json_resp['download_link'] + + self._remove_share_link(token) + + def test_download_sub_dir_in_shared_dir(self): + + username = self.user.username + dir_name = randstring(6) + seafile_api.post_dir(self.repo_id, + self.folder_path, dir_name, username) + + self.login_as(self.admin) + token = self._add_dir_share_link() + + url = reverse('api-v2.1-admin-share-link-download', args=[token]) + resp = self.client.get(url + '?path=/%s&type=folder' % dir_name) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert '8082' in json_resp['download_link'] + assert 'zip' in json_resp['download_link'] + + self._remove_share_link(token) + + def test_download_with_invalid_permission(self): + self.login_as(self.user) + token = self._add_dir_share_link() + + url = reverse('api-v2.1-admin-share-link-download', args=[token]) + resp = self.client.get(url) + self.assertEqual(403, resp.status_code) + + self._remove_share_link(token) + + def test_download_with_invalid_share_token(self): + self.login_as(self.admin) + + url = reverse('api-v2.1-admin-share-link-download', + args=[self.invalid_token]) + resp = self.client.get(url) + self.assertEqual(404, resp.status_code) + + +class ShareLinkCheckPasswordTest(BaseTestCase): + + def setUp(self): + self.repo_id = self.repo.id + self.file_path= self.file + self.folder_path= self.folder + self.invalid_token = '00000000000000000000' + + def tearDown(self): + self.remove_repo() + + def _add_file_share_link(self, password=None): + fs = FileShare.objects.create_file_link( + self.user.username, self.repo.id, self.file, password, None) + + return fs.token + + def _add_dir_share_link(self, password=None): + fs = FileShare.objects.create_dir_link( + self.user.username, self.repo.id, self.folder, password, None) + + return fs.token + + def _remove_share_link(self, token): + link = FileShare.objects.get(token=token) + link.delete() + + def test_check_password(self): + self.login_as(self.admin) + + #### create file share link #### + password = randstring(10) + token = self._add_file_share_link(password) + url = reverse('api-v2.1-admin-share-link-check-password', args=[token]) + + # check password for file share link + resp = self.client.post(url, {'password': password}) + self.assertEqual(200, resp.status_code) + + # remove file share link + self._remove_share_link(token) + + #### create dir share link #### + password = randstring(10) + token = self._add_dir_share_link(password) + url = reverse('api-v2.1-admin-share-link-check-password', args=[token]) + + # check password for dir share link + resp = self.client.post(url, {'password': password}) + self.assertEqual(200, resp.status_code) + + # remove dir share link + self._remove_share_link(token) + + def test_invalid_password(self): + self.login_as(self.admin) + + password = randstring(10) + token = self._add_file_share_link(password) + url = reverse('api-v2.1-admin-share-link-check-password', args=[token]) + + # assert password is valid + resp = self.client.post(url, {'password': password}) + self.assertEqual(200, resp.status_code) + + # assert password is invalid + resp = self.client.post(url, {'password': 'invalid_password'}) + self.assertEqual(403, resp.status_code) + + self._remove_share_link(token) + + def test_check_password_with_invalid_permission(self): + self.login_as(self.user) + token = self._add_dir_share_link() + + url = reverse('api-v2.1-admin-share-link-check-password', args=[token]) + resp = self.client.post(url) + self.assertEqual(403, resp.status_code) + + self._remove_share_link(token) + + def test_check_password_with_invalid_share_token(self): + self.login_as(self.admin) + + url = reverse('api-v2.1-admin-share-link-check-password', + args=[self.invalid_token]) + resp = self.client.post(url, {'password': 'invalid_password'}) + self.assertEqual(404, resp.status_code) diff --git a/tests/api/endpoints/admin/test_upload_links.py b/tests/api/endpoints/admin/test_upload_links.py new file mode 100644 index 0000000000..a5b647715e --- /dev/null +++ b/tests/api/endpoints/admin/test_upload_links.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +import json + +from tests.common.utils import randstring +from django.core.urlresolvers import reverse +from seahub.test_utils import BaseTestCase +from seahub.share.models import UploadLinkShare + +try: + from seahub.settings import LOCAL_PRO_DEV_ENV +except ImportError: + LOCAL_PRO_DEV_ENV = False + +class AdminUploadLinkTest(BaseTestCase): + + def setUp(self): + self.repo_id = self.repo.id + self.folder_path= self.folder + self.invalid_token = '00000000000000000000' + + def tearDown(self): + self.remove_repo() + + def _add_upload_link(self, password=None): + fs = UploadLinkShare.objects.create_upload_link_share( + self.user.username, self.repo.id, self.folder_path, password, None) + + return fs.token + + def _remove_upload_link(self, token): + link = UploadLinkShare.objects.get(token=token) + link.delete() + + def test_get_upload_link_info(self): + self.login_as(self.admin) + token = self._add_upload_link() + + url = reverse('api-v2.1-admin-upload-link', args=[token]) + resp = self.client.get(url) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert json_resp['token'] == token + + self._remove_upload_link(token) + + def test_get_upload_link_info_with_invalid_permission(self): + self.login_as(self.user) + token = self._add_upload_link() + + url = reverse('api-v2.1-admin-upload-link', args=[token]) + resp = self.client.get(url) + self.assertEqual(403, resp.status_code) + + self._remove_upload_link(token) + + def test_get_upload_link_info_with_invalid_token(self): + self.login_as(self.admin) + + url = reverse('api-v2.1-admin-upload-link', + args=[self.invalid_token]) + resp = self.client.get(url) + self.assertEqual(404, resp.status_code) + + +class AdminUploadLinkUploadTest(BaseTestCase): + + def setUp(self): + self.repo_id = self.repo.id + self.folder_path= self.folder + self.invalid_token = '00000000000000000000' + + def tearDown(self): + self.remove_repo() + + def _add_upload_link(self, password=None): + fs = UploadLinkShare.objects.create_upload_link_share( + self.user.username, self.repo.id, self.folder_path, password, None) + + return fs.token + + def _remove_upload_link(self, token): + link = UploadLinkShare.objects.get(token=token) + link.delete() + + def test_upload(self): + self.login_as(self.admin) + token = self._add_upload_link() + + url = reverse('api-v2.1-admin-upload-link-upload', args=[token]) + resp = self.client.get(url) + self.assertEqual(200, resp.status_code) + + json_resp = json.loads(resp.content) + + assert '8082' in json_resp['upload_link'] + assert 'upload' in json_resp['upload_link'] + + self._remove_upload_link(token) + + def test_upload_with_invalid_permission(self): + self.login_as(self.user) + token = self._add_upload_link() + + url = reverse('api-v2.1-admin-upload-link-upload', args=[token]) + resp = self.client.get(url) + self.assertEqual(403, resp.status_code) + + self._remove_upload_link(token) + + def test_get_upload_link_info_with_invalid_token(self): + self.login_as(self.admin) + + url = reverse('api-v2.1-admin-upload-link-upload', + args=[self.invalid_token]) + resp = self.client.get(url) + self.assertEqual(404, resp.status_code) + + +class AdminUploadLinkCheckPasswordTest(BaseTestCase): + + def setUp(self): + self.repo_id = self.repo.id + self.folder_path= self.folder + self.invalid_token = '00000000000000000000' + + def tearDown(self): + self.remove_repo() + + def _add_upload_link(self, password=None): + fs = UploadLinkShare.objects.create_upload_link_share( + self.user.username, self.repo.id, self.folder_path, password, None) + + return fs.token + + def _remove_upload_link(self, token): + link = UploadLinkShare.objects.get(token=token) + link.delete() + + def test_check_password(self): + self.login_as(self.admin) + + password = randstring(10) + token = self._add_upload_link(password) + url = reverse('api-v2.1-admin-upload-link-check-password', args=[token]) + + resp = self.client.post(url, {'password': password}) + self.assertEqual(200, resp.status_code) + + self._remove_upload_link(token) + + def test_invalid_password(self): + self.login_as(self.admin) + + password = randstring(10) + token = self._add_upload_link(password) + url = reverse('api-v2.1-admin-upload-link-check-password', args=[token]) + + # assert password is valid + resp = self.client.post(url, {'password': password}) + self.assertEqual(200, resp.status_code) + + # assert password is invalid + resp = self.client.post(url, {'password': 'invalid_password'}) + self.assertEqual(403, resp.status_code) + + self._remove_upload_link(token) + + def test_check_password_with_invalid_permission(self): + self.login_as(self.user) + token = self._add_upload_link() + + url = reverse('api-v2.1-admin-upload-link-check-password', args=[token]) + resp = self.client.post(url, {'password': 'invalid_password'}) + self.assertEqual(403, resp.status_code) + + self._remove_upload_link(token) + + def test_check_password_with_invalid_token(self): + self.login_as(self.admin) + + url = reverse('api-v2.1-admin-upload-link-check-password', + args=[self.invalid_token]) + resp = self.client.post(url, {'password': 'invalid_password'}) + self.assertEqual(404, resp.status_code) + +