1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-07-31 06:40:39 +00:00

ADD: repo-api-tokens GET POST PUT and access via-repo-token/dir and v… (#4136)

* ADD: repo-api-tokens GET POST PUT and access via-repo-token/dir and via-repo-token/upload-link by repo-api-token

* MOD: POST /via-api-token/dir logic

* MOD: check app and token when POST /repo-api-tokens

* MOD: use seafile_api.get_dir_id_by_path in via_repo_token for list dirs
This commit is contained in:
Alex Happy 2019-11-04 15:47:20 +08:00 committed by Daniel Pan
parent 9801ddc51d
commit b15b1bdffc
16 changed files with 1563 additions and 1 deletions

View File

@ -0,0 +1,312 @@
import React, {Fragment} from 'react';
import PropTypes from 'prop-types';
import {gettext} from '../../utils/constants';
import {Modal, ModalHeader, ModalBody, Button, Input} from 'reactstrap';
import RepoAPITokenPermissionEditor from '../select-editor/repo-api-token-permission-editor';
import {seafileAPI} from '../../utils/seafile-api';
import toaster from '../toast';
import copy from 'copy-to-clipboard';
import Loading from '../loading';
import '../../css/share-link-dialog.css';
const apiTokenItemPropTypes = {
item: PropTypes.object.isRequired,
deleteAPIToken: PropTypes.func.isRequired,
updateAPIToken: PropTypes.func.isRequired,
};
class APITokenItem extends React.Component {
constructor(props) {
super(props);
this.state = {
isOperationShow: false,
};
}
onMouseEnter = () => {
this.setState({isOperationShow: true});
};
onMouseLeave = () => {
this.setState({isOperationShow: false});
};
onDeleteAPIToken = () => {
this.props.deleteAPIToken(this.props.item.app_name);
};
onUpdateAPIToken = (permission) => {
this.props.updateAPIToken(this.props.item.app_name, permission);
};
onCopyAPIToken = () => {
let api_token = this.props.item.api_token;
copy(api_token);
toaster.success(gettext('API Token is copied to the clipboard.'));
};
render() {
let item = this.props.item;
return (
<tr onMouseEnter={this.onMouseEnter} onMouseLeave={this.onMouseLeave}>
<td className="name">{item.app_name}</td>
<td>
<RepoAPITokenPermissionEditor
isTextMode={true}
isEditIconShow={this.state.isOperationShow}
currentPermission={item.permission}
onPermissionChanged={this.onUpdateAPIToken}
/>
</td>
<td>{item.api_token}</td>
<td>
<span
className="far fa-copy action-icon"
onClick={this.onCopyAPIToken}
/>
</td>
<td>
<span
className={`sf2-icon-x3 action-icon ${this.state.isOperationShow ? '' : 'hide'}`}
onClick={this.onDeleteAPIToken}
title={gettext('Delete')}
/>
</td>
</tr>
);
}
}
APITokenItem.propTypes = apiTokenItemPropTypes;
const propTypes = {
// currentTable: PropTypes.object.isRequired,
// onTableAPITokenToggle: PropTypes.func.isRequired,
repo: PropTypes.object.isRequired,
onRepoAPITokenToggle: PropTypes.func.isRequired,
};
class RepoAPITokenDialog extends React.Component {
constructor(props) {
super(props);
this.state = {
apiTokenList: [],
permission: '',
appName: '',
errorMsg: '',
loading: true,
isSubmitBtnActive: true,
};
this.repo = this.props.repo;
}
listAPITokens = () => {
seafileAPI.listRepoAPITokens(this.repo.repo_id).then((res) => {
this.setState({
apiTokenList: res.data.repo_api_tokens,
loading: false,
});
}).catch(error => {
if (error.response.status === 403) {
this.setState({
errorMsg: gettext('Permission denied'),
});
} else {
this.handleError(error);
}
});
};
onInputChange = (e) => {
let appName = e.target.value;
this.setState({
appName: appName,
});
};
onKeyDown = (e) => {
if (e.keyCode === 13) {
e.preventDefault();
this.addAPIToken();
}
};
setPermission = (permission) => {
this.setState({permission: permission});
};
addAPIToken = () => {
if (!this.state.appName) {
return;
}
this.setState({
isSubmitBtnActive: false,
});
const {appName, permission, apiTokenList} = this.state;
seafileAPI.addRepoAPIToken(this.repo.repo_id, appName, permission).then((res) => {
apiTokenList.push(res.data);
this.setState({
apiTokenList: apiTokenList,
isSubmitBtnActive: true,
});
}).catch(error => {
this.handleError(error);
this.setState({
isSubmitBtnActive: true,
});
});
};
deleteAPIToken = (appName) => {
seafileAPI.deleteRepoAPIToken(this.repo.repo_id, appName).then((res) => {
const apiTokenList = this.state.apiTokenList.filter(item => {
return item.app_name !== appName;
});
this.setState({
apiTokenList: apiTokenList,
});
}).catch(error => {
this.handleError(error);
});
};
updateAPIToken = (appName, permission) => {
seafileAPI.updateRepoAPIToken(this.repo.repo_id, appName, permission).then((res) => {
let apiTokenList = this.state.apiTokenList.filter(item => {
if (item.app_name === appName) {
item.permission = permission;
}
return item;
});
this.setState({
apiTokenList: apiTokenList,
});
}).catch(error => {
this.handleError(error);
});
};
handleError = (e) => {
if (e.response) {
toaster.danger(e.response.data.error_msg || e.response.data.detail || gettext('Error'), {duration: 3});
} else {
toaster.danger(gettext('Please check the network.'), {duration: 3});
}
};
componentDidMount() {
this.listAPITokens();
}
renderContent = () => {
const renderAPITokenList = this.state.apiTokenList.map((item, index) => {
return (
<APITokenItem
key={index}
item={item}
deleteAPIToken={this.deleteAPIToken}
updateAPIToken={this.updateAPIToken}
/>
);
});
return (
<Fragment>
{this.state.errorMsg &&
<div className='w-100'>
<p className="error text-center">{this.state.errorMsg}</p>
</div>
}
{!this.state.errorMsg &&
<div className='mx-5 mb-5' style={{height: 'auto'}}>
<table>
<thead>
<tr>
<th width="45%">{gettext('App Name')}</th>
<th width="40%">{gettext('Permission')}</th>
<th width="15%"></th>
</tr>
</thead>
<tbody>
<tr>
<td>
<Input
type="text"
id="appName"
value={this.state.appName}
onChange={this.onInputChange}
onKeyDown={this.onKeyDown}
/>
</td>
<td>
<RepoAPITokenPermissionEditor
isTextMode={false}
isEditIconShow={false}
currentPermission={this.state.permission}
onPermissionChanged={this.setPermission}
/>
</td>
<td>
<Button onClick={this.addAPIToken} disabled={!this.state.isSubmitBtnActive}>{gettext('Submit')}</Button>
</td>
</tr>
</tbody>
</table>
{this.state.apiTokenList.length !== 0 &&
<div className='o-auto' style={{height: 'calc(100% - 91px)'}}>
<div className="h-100" style={{maxHeight: '18rem'}}>
<table>
<thead>
<tr>
<th width="22%">{gettext('App Name')}</th>
<th width="15%">{gettext('Permission')}</th>
<th width="53%">{gettext('Access Token')}</th>
<th width="5%"></th>
<th width="5%"></th>
</tr>
</thead>
<tbody>
{renderAPITokenList}
</tbody>
</table>
</div>
</div>
}
{this.state.loading &&
<Loading/>
}
</div>
}
</Fragment>
);
};
render() {
// let currentTable = this.props.currentTable;
// let name = currentTable.name;
let repo = this.repo;
return (
<Modal
isOpen={true} className="share-dialog" style={{maxWidth: '720px'}}
toggle={this.props.onRepoAPITokenToggle}
>
<ModalHeader toggle={this.props.onRepoAPITokenToggle}>
{gettext('API Token')} <span className="op-target" title={repo.repo_name}>{repo.repo_name}</span></ModalHeader>
<ModalBody className="share-dialog-content">
{this.renderContent()}
</ModalBody>
</Modal>
);
}
}
RepoAPITokenDialog.propTypes = propTypes;
export default RepoAPITokenDialog;

View File

@ -0,0 +1,81 @@
import React, { Fragment } from 'react';
import PropTypes from 'prop-types';
import Select from 'react-select';
import { gettext } from '../../utils/constants';
const propTypes = {
isTextMode: PropTypes.bool.isRequired,
isEditIconShow: PropTypes.bool.isRequired,
currentPermission: PropTypes.string.isRequired,
onPermissionChanged: PropTypes.func.isRequired,
};
class RepoAPITokenPermissionEditor extends React.Component {
constructor(props) {
super(props);
this.state = {
isEditing: false,
};
this.options = [
{ value: 'rw', label: <div>{gettext('Read-Write')}</div> },
{ value: 'r', label: <div>{gettext('Read-Only')}</div> }
];
}
componentDidMount() {
document.addEventListener('click', this.onHideSelect);
}
componentWillUnmount() {
document.removeEventListener('click', this.onHideSelect);
}
onHideSelect = () => {
this.setState({ isEditing: false });
}
onEditPermission = (e) => {
e.nativeEvent.stopImmediatePropagation();
this.setState({ isEditing: true });
}
onPermissionChanged = (e) => {
if (e.value !== this.props.currentPermission) {
this.props.onPermissionChanged(e.value);
}
this.setState({ isEditing: false });
}
onSelectHandler = (e) => {
e.nativeEvent.stopImmediatePropagation();
}
render() {
const { currentPermission, isTextMode } = this.props;
let optionTranslation = currentPermission === 'rw' ? gettext('Read-Write') : gettext('Read-Only');
return (
<div onClick={this.onSelectHandler}>
{(isTextMode && !this.state.isEditing) ?
<Fragment>
<span>{optionTranslation}</span>
{this.props.isEditIconShow &&
<span title={gettext('Edit')} className="fa fa-pencil-alt attr-action-icon" onClick={this.onEditPermission}/>
}
</Fragment>
:
<Select
options={this.options}
placeholder={optionTranslation}
onChange={this.onPermissionChanged}
captureMenuScroll={false}
/>
}
</div>
);
}
}
RepoAPITokenPermissionEditor.propTypes = propTypes;
export default RepoAPITokenPermissionEditor;

View File

@ -18,6 +18,7 @@ import LabelRepoStateDialog from '../../components/dialog/label-repo-state-dialo
import LibSubFolderPermissionDialog from '../../components/dialog/lib-sub-folder-permission-dialog';
import Rename from '../../components/rename';
import MylibRepoMenu from './mylib-repo-menu';
import RepoAPITokenDialog from "../../components/dialog/repo-api-token-dialog";
const propTypes = {
repo: PropTypes.object.isRequired,
@ -46,6 +47,7 @@ class MylibRepoListItem extends React.Component {
isResetPasswordDialogShow: false,
isLabelRepoStateDialogOpen: false,
isFolderPermissionDialogShow: false,
isAPITokenDialogShow: false,
};
}
@ -100,6 +102,9 @@ class MylibRepoListItem extends React.Component {
case 'Label Current State':
this.onLabelToggle();
break;
case 'API Token':
this.onAPITokenToggle();
break;
default:
break;
}
@ -171,6 +176,10 @@ class MylibRepoListItem extends React.Component {
this.setState({isFolderPermissionDialogShow: !this.state.isFolderPermissionDialogShow});
}
onAPITokenToggle = () => {
this.setState({isAPITokenDialogShow: !this.state.isAPITokenDialogShow});
}
onUnfreezedItem = () => {
this.setState({
highlight: false,
@ -406,6 +415,16 @@ class MylibRepoListItem extends React.Component {
/>
</ModalPortal>
)}
{this.state.isAPITokenDialogShow && (
<ModalPortal>
<RepoAPITokenDialog
repo={repo}
onRepoAPITokenToggle={this.onAPITokenToggle}
/>
</ModalPortal>
)}
</Fragment>
);
}

View File

@ -53,7 +53,7 @@ class MylibRepoMenu extends React.Component {
generatorOperations = () => {
let repo = this.props.repo;
let showResetPasswordMenuItem = repo.encrypted && enableResetEncryptedRepoPassword && isEmailConfigured;
let operations = ['Rename', 'Transfer', 'History Setting'];
let operations = ['Rename', 'Transfer', 'History Setting', 'API Token'];
if (repo.encrypted) {
operations.push('Change Password');
}
@ -105,6 +105,9 @@ class MylibRepoMenu extends React.Component {
case 'Label Current State':
translateResult = gettext('Label Current State');
break;
case 'API Token':
translateResult = gettext('API Token');
break;
default:
break;
}

View File

@ -6,9 +6,12 @@ from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import APIException
from seaserv import ccnet_api
from seahub.auth.models import AnonymousUser
from seahub.base.accounts import User
from seahub.api2.models import Token, TokenV2
from seahub.api2.utils import get_client_ip
from seahub.repo_api_tokens.models import RepoAPITokens
from seahub.utils import within_time_range
try:
from seahub.settings import MULTI_TENANCY
@ -134,3 +137,46 @@ class TokenAuthentication(BaseAuthentication):
logger.exception('error when save token v2:')
return (user, token)
class RepoAPITokenAuthentication(BaseAuthentication):
"""
Simple token based authentication.
Clients should authenticate by passing the token key in the "Authorization"
HTTP header, prepended with the string "token ". For example:
Authorization: token 401f7ac837da42b97f613d789819ff93537bee6a
A custom token model may be used, but must have the following properties.
* key -- The string identifying the token
* user -- The user to which the token belongs
"""
def authenticate(self, request):
"""
auth request from repo_api_token,
fill request.user with AnonymousUser temporarily,
return key from headers' token,
and set request.token_creator to person whom repo_api_token was generated by
:param request: request
:return: AnonymousUser, repo_api_token
"""
auth = request.META.get('HTTP_AUTHORIZATION', '').split()
if not auth or auth[0].lower() != 'token':
return None
if len(auth) == 1:
msg = 'Invalid token header. No credentials provided.'
raise AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid token header. Token string should not contain spaces.'
raise AuthenticationFailed(msg)
rat = RepoAPITokens.objects.filter(token=auth[1]).first()
if not rat:
raise AuthenticationFailed('Token inactive or deleted')
request.repo_api_token_obj = rat
return AnonymousUser(), auth[1]

View File

@ -0,0 +1,159 @@
import logging
from rest_framework import status
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from django.utils.translation import ugettext as _
from seahub.api2.authentication import TokenAuthentication
from seahub.api2.throttling import UserRateThrottle
from seahub.api2.utils import api_error
from seaserv import seafile_api
from seahub.constants import PERMISSION_READ
from seahub.repo_api_tokens.models import RepoAPITokens
from seahub.repo_api_tokens.utils import permission_check_admin_owner
logger = logging.getLogger(__name__)
def _get_repo_token_info(repo_token_obj):
return {
'repo_id': repo_token_obj.repo_id,
'app_name': repo_token_obj.app_name,
'generated_by': repo_token_obj.generated_by,
'permission': repo_token_obj.permission,
'api_token': repo_token_obj.token
}
class RepoAPITokensView(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAuthenticated,)
throttle_classes = (UserRateThrottle,)
def get(self, request, repo_id):
# resource check
repo = seafile_api.get_repo(repo_id)
if not repo:
error_msg = _('app_name invalid.')
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
username = request.user.username
if not permission_check_admin_owner(username, repo_id):
error_msg = _('Permission denied.')
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
rats = RepoAPITokens.objects.filter(repo_id=repo_id).order_by('-generated_at')
rat_infos = [_get_repo_token_info(rat) for rat in rats]
return Response({'repo_api_tokens': rat_infos})
def post(self, request, repo_id):
# arguments check
app_name = request.data.get('app_name')
if not app_name:
error_msg = _('app_name invalid.')
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
repo_permission = request.data.get('permission')
if repo_permission and repo_permission not in [perm[0] for perm in RepoAPITokens.PERMISSION_CHOICES]:
error_msg = _('permission invalid.')
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
repo_permission = repo_permission if repo_permission else PERMISSION_READ
# resource check
repo = seafile_api.get_repo(repo_id)
if not repo:
error_msg = _('Library %(repo_id)s not found.' % {'repo_id': repo_id})
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
username = request.user.username
if not permission_check_admin_owner(username, repo_id):
error_msg = _('Permission denied.')
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
rat = RepoAPITokens.objects.filter(app_name=app_name, repo_id=repo_id).first()
if rat:
error_msg = _('app: %(app)s token already exists.' % {'app': app_name})
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
try:
rat = RepoAPITokens.objects.create_token(app_name=app_name,
repo_id=repo_id,
username=username,
permission=repo_permission)
except Exception as e:
logger.error('user: %s create repo: %s\'s token error: %s', username, repo_id, e)
error_msg = _('Internal Server Error.')
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
return Response(_get_repo_token_info(rat))
class RepoAPITokenView(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAuthenticated,)
throttle_classes = (UserRateThrottle,)
def delete(self, request, repo_id, app_name):
# resource check
repo = seafile_api.get_repo(repo_id)
if not repo:
error_msg = _('Library %(repo_id)s not found.' % {'repo_id': repo_id})
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
username = request.user.username
# permission check
if not permission_check_admin_owner(username, repo_id):
error_msg = _('Permission denied.')
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
try:
rat = RepoAPITokens.objects.filter(repo_id=repo_id, app_name=app_name).first()
if not rat:
error_msg = _('api token not found')
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
rat.delete()
except Exception as e:
logger.error('user: %s delete repo: %s app_name: %s error: %s', username, repo_id, app_name, e)
error_msg = _('Internal Server Error.')
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
return Response({'success': True})
def put(self, request, repo_id, app_name):
# arguments check
permission = request.data.get('permission')
if not permission or permission not in [perm[0] for perm in RepoAPITokens.PERMISSION_CHOICES]:
error_msg = 'permission invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
# resource check
repo = seafile_api.get_repo(repo_id)
if not repo:
error_msg = _('Library %(repo_id)s not found.' % {'repo_id': repo_id})
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
username = request.user.username
if not permission_check_admin_owner(username, repo_id):
error_msg = _('Permission denied.')
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
rat = RepoAPITokens.objects.filter(app_name=app_name, repo_id=repo_id).first()
if not rat:
error_msg = _('api token not found')
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
rat.permission = permission
rat.save()
except Exception as e:
logger.error('user: %s update repo: %s app_name: %s error: %s', username, repo_id, app_name, e)
error_msg = _('Internal Server Error.')
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
return Response(_get_repo_token_info(rat))

View File

@ -0,0 +1,385 @@
import os
import json
import logging
import posixpath
from django.http import HttpResponse
from django.utils.translation import ugettext as _
from rest_framework import status
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.reverse import reverse
from rest_framework.views import APIView
from urllib.parse import quote
from seahub.api2.authentication import RepoAPITokenAuthentication
from seahub.repo_api_tokens.utils import get_dir_file_info_list
from seahub.api2.throttling import UserRateThrottle
from seahub.api2.utils import api_error, to_python_boolean
from seaserv import seafile_api, get_repo, check_quota
from pysearpc import SearpcError
from seahub.repo_api_tokens.utils import get_dir_file_recursively
from seahub.constants import PERMISSION_READ
from seahub.utils import normalize_dir_path, check_filename_with_rename, gen_file_upload_url, is_valid_dirent_name
from seahub.utils.timeutils import timestamp_to_isoformat_timestr
logger = logging.getLogger(__name__)
json_content_type = 'application/json; charset=utf-8'
HTTP_443_ABOVE_QUOTA = 443
HTTP_520_OPERATION_FAILED = 520
def check_folder_permission_by_repo_api(request, repo_id, path):
"""
Check repo/folder/file access permission of a repo_api_token.
:param request: request obj
:param repo_id: repo's id
:param path: repo path
:return:
"""
repo_status = seafile_api.get_repo_status(repo_id)
if repo_status == 1:
return PERMISSION_READ
return request.repo_api_token_obj.permission # and return repo_api_token's permission
class ViaRepoDirView(APIView):
authentication_classes = (RepoAPITokenAuthentication, SessionAuthentication)
permission_classes = (IsAuthenticated,)
throttle_classes = (UserRateThrottle,)
def get_dir_info(self, repo_id, dir_path):
dir_obj = seafile_api.get_dirent_by_path(repo_id, dir_path)
dir_info = {
'type': 'dir',
'repo_id': repo_id,
'parent_dir': os.path.dirname(dir_path.rstrip('/')),
'obj_name': dir_obj.obj_name,
'obj_id': dir_obj.obj_id,
'mtime': timestamp_to_isoformat_timestr(dir_obj.mtime),
}
return dir_info
def get(self, request, repo_id, format=None):
# argument check
recursive = request.GET.get('recursive', '0')
if recursive not in ('1', '0'):
error_msg = "If you want to get recursive dir entries, you should set 'recursive' argument as '1'."
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
request_type = request.GET.get('type', '')
if request_type and request_type not in ('f', 'd'):
error_msg = "'type should be 'f' or 'd'."
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
with_thumbnail = request.GET.get('with_thumbnail', 'false')
if with_thumbnail not in ('true', 'false'):
error_msg = 'with_thumbnail invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
with_thumbnail = to_python_boolean(with_thumbnail)
thumbnail_size = request.GET.get('thumbnail_size', 48)
try:
thumbnail_size = int(thumbnail_size)
except ValueError:
error_msg = 'thumbnail_size invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
with_parents = request.GET.get('with_parents', 'false')
if with_parents not in ('true', 'false'):
error_msg = 'with_parents invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
with_parents = to_python_boolean(with_parents)
# recource check
repo = seafile_api.get_repo(repo_id)
if not repo:
error_msg = 'Library %s not found.' % repo_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
parent_dir = request.GET.get('path', '/')
parent_dir = normalize_dir_path(parent_dir)
dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir)
if not dir_id:
error_msg = 'Folder %s not found.' % parent_dir
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
permission = check_folder_permission_by_repo_api(request, repo_id, parent_dir)
if not permission:
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
# get dir/file list recursively
# username = request.user.username
username = seafile_api.get_repo_owner(repo_id)
if recursive == '1':
try:
dir_file_info_list = get_dir_file_recursively(repo_id, parent_dir, [])
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
response_dict = {}
response_dict['dirent_list'] = []
if request_type == 'f':
for item in dir_file_info_list:
if item['type'] == 'file':
response_dict['dirent_list'].append(item)
elif request_type == 'd':
for item in dir_file_info_list:
if item['type'] == 'dir':
response_dict['dirent_list'].append(item)
else:
response_dict['dirent_list'] = dir_file_info_list
return Response(response_dict)
parent_dir_list = []
if not with_parents:
# only return dirent list in current parent folder
parent_dir_list.append(parent_dir)
else:
# if value of 'path' parameter is '/a/b/c' add with_parents's is 'true'
# then return dirent list in '/', '/a', '/a/b' and '/a/b/c'.
if parent_dir == '/':
parent_dir_list.append(parent_dir)
else:
tmp_parent_dir = '/'
parent_dir_list.append(tmp_parent_dir)
for folder_name in parent_dir.strip('/').split('/'):
tmp_parent_dir = posixpath.join(tmp_parent_dir, folder_name)
tmp_parent_dir = normalize_dir_path(tmp_parent_dir)
parent_dir_list.append(tmp_parent_dir)
all_dir_info_list = []
all_file_info_list = []
try:
for parent_dir in parent_dir_list:
# get dir file info list
dir_info_list, file_info_list = get_dir_file_info_list(username,
request_type, repo, parent_dir, with_thumbnail,
thumbnail_size)
all_dir_info_list.extend(dir_info_list)
all_file_info_list.extend(file_info_list)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
response_dict = {}
response_dict["user_perm"] = permission
response_dict["dir_id"] = dir_id
if request_type == 'f':
response_dict['dirent_list'] = all_file_info_list
elif request_type == 'd':
response_dict['dirent_list'] = all_dir_info_list
else:
response_dict['dirent_list'] = all_dir_info_list + all_file_info_list
return Response(response_dict)
def post(self, request, repo_id, format=None):
# argument check
path = request.GET.get('path', None)
if not path or path[0] != '/':
error_msg = 'path invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
if path == '/':
error_msg = 'Can not operate root dir.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
operation = request.data.get('operation', None)
if not operation:
error_msg = 'operation invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
operation = operation.lower()
if operation not in ('mkdir', 'rename', 'revert'):
error_msg = "operation can only be 'mkdir', 'rename' or 'revert'."
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
# resource check
repo = seafile_api.get_repo(repo_id)
if not repo:
error_msg = 'Library %s not found.' % repo_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
path = path.rstrip('/')
username = request.user.username
parent_dir = os.path.dirname(path)
if operation == 'mkdir':
# resource check
parent_dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir)
if not parent_dir_id:
error_msg = 'Folder %s not found.' % parent_dir
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
if check_folder_permission_by_repo_api(request, repo_id, parent_dir) != 'rw':
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
new_dir_name = os.path.basename(path)
if not is_valid_dirent_name(new_dir_name):
return api_error(status.HTTP_400_BAD_REQUEST,
'name invalid.')
retry_count = 0
while retry_count < 10:
new_dir_name = check_filename_with_rename(repo_id,
parent_dir, new_dir_name)
try:
seafile_api.post_dir(repo_id,
parent_dir, new_dir_name, username)
break
except SearpcError as e:
if str(e) == 'file already exists':
retry_count += 1
else:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR,
error_msg)
new_dir_path = posixpath.join(parent_dir, new_dir_name)
dir_info = self.get_dir_info(repo_id, new_dir_path)
resp = Response(dir_info)
return resp
if operation == 'rename':
# resource check
dir_id = seafile_api.get_dir_id_by_path(repo_id, path)
if not dir_id:
error_msg = 'Folder %s not found.' % path
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
if check_folder_permission_by_repo_api(request, repo_id, path) != 'rw':
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
old_dir_name = os.path.basename(path)
new_dir_name = request.data.get('newname', None)
if not new_dir_name:
error_msg = 'newname invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
if not is_valid_dirent_name(new_dir_name):
return api_error(status.HTTP_400_BAD_REQUEST,
'name invalid.')
if new_dir_name == old_dir_name:
dir_info = self.get_dir_info(repo_id, path)
resp = Response(dir_info)
return resp
try:
# rename duplicate name
new_dir_name = check_filename_with_rename(repo_id, parent_dir, new_dir_name)
# rename dir
seafile_api.rename_file(repo_id, parent_dir, old_dir_name,
new_dir_name, username)
new_dir_path = posixpath.join(parent_dir, new_dir_name)
dir_info = self.get_dir_info(repo_id, new_dir_path)
resp = Response(dir_info)
return resp
except SearpcError as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
if operation == 'revert':
commit_id = request.data.get('commit_id', None)
if not commit_id:
error_msg = 'commit_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
if seafile_api.get_dir_id_by_path(repo_id, path):
# dir exists in repo
if check_folder_permission_by_repo_api(request, repo_id, path) != 'rw':
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
else:
# dir NOT exists in repo
if check_folder_permission_by_repo_api(request, repo_id, '/') != 'rw':
error_msg = 'Permission denied.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
try:
seafile_api.revert_dir(repo_id, commit_id, path, username)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
return Response({'success': True})
class ViaRepoUploadLinkView(APIView):
authentication_classes = (RepoAPITokenAuthentication, SessionAuthentication)
permission_classes = (IsAuthenticated,)
throttle_classes = (UserRateThrottle,)
def get(self, request, repo_id, format=None):
# recourse check
repo = seafile_api.get_repo(repo_id)
if not repo:
error_msg = 'Library %s not found.' % repo_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
parent_dir = request.GET.get('path', '/')
dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir)
if not dir_id:
error_msg = 'Folder %s not found.' % parent_dir
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
if check_folder_permission_by_repo_api(request, repo_id, parent_dir) != 'rw':
return api_error(status.HTTP_403_FORBIDDEN,
'You do not have permission to access this folder.')
if check_quota(repo_id) < 0:
return api_error(HTTP_443_ABOVE_QUOTA, _("Out of quota."))
token = seafile_api.get_fileserver_access_token(repo_id,
'dummy', 'upload', request.repo_api_token_obj.app_name,
use_onetime=False)
if not token:
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
req_from = request.GET.get('from', 'api')
if req_from == 'api':
try:
replace = to_python_boolean(request.GET.get('replace', '0'))
except ValueError:
replace = False
url = gen_file_upload_url(token, 'upload-api', replace)
elif req_from == 'web':
url = gen_file_upload_url(token, 'upload-aj')
else:
error_msg = 'from invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
return Response(url)

View File

View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.23 on 2019-09-29 08:56
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='RepoAPITokens',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('repo_id', models.CharField(db_index=True, max_length=36)),
('app_name', models.CharField(db_index=True, max_length=255)),
('token', models.CharField(max_length=40, unique=True)),
('generated_at', models.DateTimeField(auto_now_add=True)),
('generated_by', models.CharField(max_length=255)),
('last_access', models.DateTimeField(auto_now=True)),
('permission', models.CharField(max_length=15)),
],
options={
'db_table': 'repo_api_tokens',
},
),
]

View File

@ -0,0 +1,45 @@
from _sha1 import sha1
import hmac
import uuid
from django.db import models
from seahub.constants import PERMISSION_READ, PERMISSION_READ_WRITE
class RepoAPITokensManager(models.Manager):
@staticmethod
def generate_key():
unique = str(uuid.uuid4())
return hmac.new(unique.encode('utf-8'), digestmod=sha1).hexdigest()
def create_token(self, app_name, repo_id, username, permission=PERMISSION_READ):
token = self.generate_key()
rat = super(RepoAPITokensManager, self).create(app_name=app_name,
repo_id=repo_id,
generated_by=username,
permission=permission,
token=token)
return rat
class RepoAPITokens(models.Model):
PERMISSION_CHOICES = (
(PERMISSION_READ, 'read'),
(PERMISSION_READ_WRITE, 'read and write')
)
repo_id = models.CharField(max_length=36, db_index=True)
app_name = models.CharField(max_length=255, db_index=True)
token = models.CharField(unique=True, max_length=40)
generated_at = models.DateTimeField(auto_now_add=True)
generated_by = models.CharField(max_length=255)
last_access = models.DateTimeField(auto_now=True)
permission = models.CharField(max_length=15)
objects = RepoAPITokensManager()
class Meta:
db_table = 'repo_api_tokens'

View File

@ -0,0 +1,210 @@
import os
import logging
import posixpath
import stat
from django.utils.http import urlquote
from seaserv import seafile_api
from seahub.base.models import UserStarredFiles
from seahub.base.templatetags.seahub_tags import email2nickname, email2contact_email
from seahub.settings import ENABLE_VIDEO_THUMBNAIL, THUMBNAIL_ROOT
from seahub.thumbnail.utils import get_thumbnail_src
from seahub.utils import is_pro_version, FILEEXT_TYPE_MAP, IMAGE, XMIND, VIDEO
from seahub.utils.file_tags import get_files_tags_in_dir
logger = logging.getLogger(__name__)
json_content_type = 'application/json; charset=utf-8'
HTTP_520_OPERATION_FAILED = 520
def permission_check_admin_owner(username, repo_id): # maybe add more complex logic in the future
return username == seafile_api.get_repo_owner(repo_id)
def get_dir_file_recursively(repo_id, path, all_dirs):
is_pro = is_pro_version()
path_id = seafile_api.get_dir_id_by_path(repo_id, path)
dirs = seafile_api.list_dir_by_path(repo_id, path, -1, -1)
for dirent in dirs:
entry = {}
if stat.S_ISDIR(dirent.mode):
entry["type"] = 'dir'
else:
entry["type"] = 'file'
entry['modifier_email'] = dirent.modifier
entry["size"] = dirent.size
if is_pro:
entry["is_locked"] = dirent.is_locked
entry["lock_owner"] = dirent.lock_owner
if dirent.lock_owner:
entry["lock_owner_name"] = email2nickname(dirent.lock_owner)
entry["lock_time"] = dirent.lock_time
entry["parent_dir"] = path
entry["id"] = dirent.obj_id
entry["name"] = dirent.obj_name
entry["mtime"] = dirent.mtime
all_dirs.append(entry)
# Use dict to reduce memcache fetch cost in large for-loop.
file_list = [item for item in all_dirs if item['type'] == 'file']
contact_email_dict = {}
nickname_dict = {}
modifiers_set = {x['modifier_email'] for x in file_list}
for e in modifiers_set:
if e not in contact_email_dict:
contact_email_dict[e] = email2contact_email(e)
if e not in nickname_dict:
nickname_dict[e] = email2nickname(e)
for e in file_list:
e['modifier_contact_email'] = contact_email_dict.get(e['modifier_email'], '')
e['modifier_name'] = nickname_dict.get(e['modifier_email'], '')
if stat.S_ISDIR(dirent.mode):
sub_path = posixpath.join(path, dirent.obj_name)
get_dir_file_recursively(repo_id, sub_path, all_dirs)
return all_dirs
def get_dir_file_info_list(username, request_type, repo_obj, parent_dir,
with_thumbnail, thumbnail_size):
repo_id = repo_obj.id
dir_info_list = []
file_info_list = []
# get dirent(folder and file) list
parent_dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir)
dir_file_list = seafile_api.list_dir_with_perm(repo_id,
parent_dir, parent_dir_id, username, -1, -1)
try:
starred_items = UserStarredFiles.objects.filter(email=username,
repo_id=repo_id, path__startswith=parent_dir, org_id=-1)
starred_item_path_list = [f.path.rstrip('/') for f in starred_items]
except Exception as e:
logger.error(e)
starred_item_path_list = []
# only get dir info list
if not request_type or request_type == 'd':
dir_list = [dirent for dirent in dir_file_list if stat.S_ISDIR(dirent.mode)]
for dirent in dir_list:
dir_info = {}
dir_info["type"] = "dir"
dir_info["id"] = dirent.obj_id
dir_info["name"] = dirent.obj_name
dir_info["mtime"] = dirent.mtime
dir_info["permission"] = dirent.permission
dir_info["parent_dir"] = parent_dir
dir_info_list.append(dir_info)
# get star info
dir_info['starred'] = False
dir_path = posixpath.join(parent_dir, dirent.obj_name)
if dir_path.rstrip('/') in starred_item_path_list:
dir_info['starred'] = True
# only get file info list
if not request_type or request_type == 'f':
file_list = [dirent for dirent in dir_file_list if not stat.S_ISDIR(dirent.mode)]
# Use dict to reduce memcache fetch cost in large for-loop.
nickname_dict = {}
contact_email_dict = {}
modifier_set = {x.modifier for x in file_list}
lock_owner_set = {x.lock_owner for x in file_list}
for e in modifier_set | lock_owner_set:
if e not in nickname_dict:
nickname_dict[e] = email2nickname(e)
if e not in contact_email_dict:
contact_email_dict[e] = email2contact_email(e)
try:
files_tags_in_dir = get_files_tags_in_dir(repo_id, parent_dir)
except Exception as e:
logger.error(e)
files_tags_in_dir = {}
for dirent in file_list:
file_name = dirent.obj_name
file_path = posixpath.join(parent_dir, file_name)
file_obj_id = dirent.obj_id
file_info = {}
file_info["type"] = "file"
file_info["id"] = file_obj_id
file_info["name"] = file_name
file_info["mtime"] = dirent.mtime
file_info["permission"] = dirent.permission
file_info["parent_dir"] = parent_dir
file_info["size"] = dirent.size
modifier_email = dirent.modifier
file_info['modifier_email'] = modifier_email
file_info['modifier_name'] = nickname_dict.get(modifier_email, '')
file_info['modifier_contact_email'] = contact_email_dict.get(modifier_email, '')
# get lock info
if is_pro_version():
file_info["is_locked"] = dirent.is_locked
file_info["lock_time"] = dirent.lock_time
lock_owner_email = dirent.lock_owner or ''
file_info["lock_owner"] = lock_owner_email
file_info['lock_owner_name'] = nickname_dict.get(lock_owner_email, '')
file_info['lock_owner_contact_email'] = contact_email_dict.get(lock_owner_email, '')
if username == lock_owner_email:
file_info["locked_by_me"] = True
else:
file_info["locked_by_me"] = False
# get star info
file_info['starred'] = False
if file_path.rstrip('/') in starred_item_path_list:
file_info['starred'] = True
# get tag info
file_tags = files_tags_in_dir.get(file_name, [])
if file_tags:
file_info['file_tags'] = []
for file_tag in file_tags:
file_info['file_tags'].append(file_tag)
# get thumbnail info
if with_thumbnail and not repo_obj.encrypted:
# used for providing a way to determine
# if send a request to create thumbnail.
fileExt = os.path.splitext(file_name)[1][1:].lower()
file_type = FILEEXT_TYPE_MAP.get(fileExt)
if file_type in (IMAGE, XMIND) or \
file_type == VIDEO and ENABLE_VIDEO_THUMBNAIL:
# if thumbnail has already been created, return its src.
# Then web browser will use this src to get thumbnail instead of
# recreating it.
thumbnail_file_path = os.path.join(THUMBNAIL_ROOT,
str(thumbnail_size), file_obj_id)
if os.path.exists(thumbnail_file_path):
src = get_thumbnail_src(repo_id, thumbnail_size, file_path)
file_info['encoded_thumbnail_src'] = urlquote(src)
file_info_list.append(file_info)
dir_info_list.sort(key=lambda x: x['name'].lower())
file_info_list.sort(key=lambda x: x['name'].lower())
return dir_info_list, file_info_list

View File

@ -254,6 +254,7 @@ INSTALLED_APPS = (
'seahub.related_files',
'seahub.work_weixin',
'seahub.file_participants',
'seahub.repo_api_tokens',
)
# Enable or disable view File Scan

View File

@ -90,6 +90,8 @@ from seahub.api2.endpoints.starred_items import StarredItems
from seahub.api2.endpoints.markdown_lint import MarkdownLintView
from seahub.api2.endpoints.public_repos_search import PublishedRepoSearchView
from seahub.api2.endpoints.recent_added_files import RecentAddedFilesView
from seahub.api2.endpoints.repo_api_tokens import RepoAPITokensView, RepoAPITokenView
from seahub.api2.endpoints.via_repo_token import ViaRepoDirView, ViaRepoUploadLinkView
# Admin
@ -368,6 +370,14 @@ urlpatterns = [
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/file/participant/$', FileParticipantView.as_view(), name='api-v2.1-file-participant'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/related-users/$', RepoRelatedUsersView.as_view(), name='api-v2.1-related-user'),
## user:: repo-api-tokens
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/repo-api-tokens/$', RepoAPITokensView.as_view(), name='api-v2.1-repo-api-tokens'),
url(r'^api/v2.1/repos/(?P<repo_id>[-0-9a-f]{36})/repo-api-tokens/(?P<app_name>.*)/$', RepoAPITokenView.as_view(), name='api-v2.1-repo-api-token'),
## access repo from repo_api_tokens
url(r'^api/v2.1/via-repo-token/(?P<repo_id>[-0-9a-f]{36})/dir/$', ViaRepoDirView.as_view(), name='via-repo-dir'),
url(r'^api/v2.1/via-repo-token/(?P<repo_id>[-0-9a-f]{36})/upload-link/$', ViaRepoUploadLinkView.as_view(), name='via-upload-link'),
# user::related-files
url(r'^api/v2.1/related-files/$', RelatedFilesView.as_view(), name='api-v2.1-related-files'),
url(r'^api/v2.1/related-files/(?P<related_id>\d+)/$', RelatedFileView.as_view(), name='api-v2.1-related-file'),

View File

@ -0,0 +1,127 @@
import json
from django.core.urlresolvers import reverse
from seahub.repo_api_tokens.models import RepoAPITokens
from seahub.test_utils import BaseTestCase
class RepoAPITokensTest(BaseTestCase):
def setUp(self):
self.login_as(self.user)
# create repo
repo_id = self.create_repo(name='test-repo',
desc='',
username=self.user.username,
passwd=None)
self.repo_id = repo_id
self.app_name = 'wow'
self.permission = 'rw'
self.url = reverse('api-v2.1-repo-api-tokens', args=[repo_id])
def tearDown(self):
RepoAPITokens.objects.filter(repo_id=self.repo_id).delete()
self.remove_repo(self.repo_id)
def test_generate_token_by_owner(self):
resp = self.client.post(self.url, {'app_name': self.app_name, 'permission': self.permission})
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
self.assertEqual(self.repo_id, json_resp['repo_id'])
self.assertEqual(self.app_name, json_resp['app_name'])
self.assertEqual(self.user.username, json_resp['generated_by'])
self.assertEqual(self.permission, json_resp['permission'])
def test_generate_token_by_other(self):
self.logout()
self.login_as(self.admin)
resp = self.client.post(self.url, {'app_name': self.app_name, 'permission': self.permission})
self.assertEqual(403, resp.status_code)
def _create_repo_api_token_obj(self, app_name, permission):
username = self.user.username
return RepoAPITokens.objects.create_token(app_name, self.repo_id, username, permission=permission)
def test_get_tokens_by_owner(self):
# create
apps = ['first', 'second']
permissions = ['r', 'rw']
for app, permission in zip(apps, permissions):
self._create_repo_api_token_obj(app, permission)
# GET request
resp = self.client.get(self.url)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
for repo_api_token_json, app, permission in zip(json_resp['repo_api_tokens'][::-1], apps, permissions):
self.assertEqual(self.repo_id, repo_api_token_json['repo_id'])
self.assertEqual(app, repo_api_token_json['app_name'])
self.assertEqual(self.user.username, repo_api_token_json['generated_by'])
self.assertEqual(permission, repo_api_token_json['permission'])
class RepoAPITokenTest(BaseTestCase):
def setUp(self):
self.login_as(self.user)
# create repo
repo_id = self.create_repo(name='test-repo',
desc='',
username=self.user.username,
passwd=None)
self.repo_id = repo_id
# set user
self.user_app_name = 'user-app'
self.user_url = reverse('api-v2.1-repo-api-token', args=[self.repo_id, self.user_app_name])
# set admin
self.admin_app_name = 'admin-app'
self.admin_url = reverse('api-v2.1-repo-api-token', args=[self.repo_id, self.admin_app_name])
self.share_repo_to_admin_with_admin_permission()
self.share_repo_to_group_with_admin_permission()
def tearDown(self):
RepoAPITokens.objects.filter(repo_id=self.repo_id).delete()
self.remove_repo(self.repo_id)
def _create_repo_api_token_obj(self, admin=False):
app_name = self.user_app_name if not admin else self.admin_app_name
username = self.user.username if not admin else self.admin.username
return RepoAPITokens.objects.create_token(app_name, self.repo_id, username, permission='r')
def test_put_token_by_owner(self):
rat = self._create_repo_api_token_obj()
data = 'permission=rw'
resp = self.client.put(self.user_url, data, 'application/x-www-form-urlencoded')
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
self.assertEqual(self.repo_id, json_resp['repo_id'])
self.assertEqual(self.user_app_name, json_resp['app_name'])
self.assertEqual(self.user.username, json_resp['generated_by'])
self.assertEqual('rw', json_resp['permission'])
self.assertEqual(rat.token, json_resp['api_token'])
def test_put_token_by_other(self):
rat = self._create_repo_api_token_obj()
data = 'permission=rw'
self.logout()
self.login_as(self.admin)
resp = self.client.put(self.user_url, data, 'application/x-www-form-urlencoded')
self.assertEqual(403, resp.status_code)
def test_delete_token_by_self(self):
rat = self._create_repo_api_token_obj()
resp = self.client.delete(self.user_url)
self.assertEqual(200, resp.status_code)
def test_delete_token_by_other(self):
rat = self._create_repo_api_token_obj()
self.logout()
self.login_as(self.admin)
resp = self.client.delete(self.user_url)
self.assertEqual(403, resp.status_code)

View File

@ -0,0 +1,132 @@
from _sha1 import sha1
import hmac
import os
import json
import uuid
from django.core.urlresolvers import reverse
from seahub.repo_api_tokens.models import RepoAPITokens
from seahub.test_utils import BaseTestCase
class ViaRepoDirTest(BaseTestCase):
def _create_repo_api_token_obj(self, app_name, permission):
username = self.user.username
return RepoAPITokens.objects.create_token(app_name, self.repo_id, username, permission=permission)
def setUp(self):
self.login_as(self.user)
self.repo_id = self.repo.id
self.file_path = self.file
self.file_name = os.path.basename(self.file_path.rstrip('/'))
self.folder_path = self.folder
self.folder_name = os.path.basename(self.folder_path)
self.r_app_name, permission = 'app_name', 'r'
self.repo_r_api_token_obj = self._create_repo_api_token_obj(self.r_app_name, permission)
self.rw_app_name, permission = 'rw_app_name', 'rw'
self.repo_rw_api_token_obj = self._create_repo_api_token_obj(self.rw_app_name, permission)
self.url = reverse('via-repo-dir', args=[self.repo_id])
self.logout()
def tearDown(self):
RepoAPITokens.objects.filter(repo_id=self.repo_id).delete()
self.remove_repo(self.repo_id)
def test_read_repo_from_valid_token(self):
headers = {'HTTP_AUTHORIZATION': 'token ' + self.repo_r_api_token_obj.token}
resp = self.client.get(self.url, **headers)
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert len(json_resp['dirent_list']) == 2
assert self.folder_name == json_resp['dirent_list'][0]['name']
assert self.file_name == json_resp['dirent_list'][1]['name']
assert len(json_resp['dirent_list'][1]['modifier_name']) > 0
assert len(json_resp['dirent_list'][1]['modifier_contact_email']) > 0
def test_read_repo_from_invalid_token(self):
unique = str(uuid.uuid4())
token = hmac.new(unique.encode('utf-8'), digestmod=sha1).hexdigest()
headers = {'HTTP_AUTHORIZATION': 'token ' + token}
resp = self.client.get(self.url, **headers)
assert resp.status_code in (401, 403)
def test_mkdir_repo_from_valid_r_token(self):
data = {
'operation': 'mkdir',
}
headers = {'HTTP_AUTHORIZATION': 'token ' + self.repo_r_api_token_obj.token}
url = self.url + '?path=/new'
resp = self.client.post(url, data=data, **headers)
self.assertEqual(403, resp.status_code)
def test_mkdir_repo_from_valid_rw_token(self):
data = {
'operation': 'mkdir',
}
headers = {'HTTP_AUTHORIZATION': 'token ' + self.repo_rw_api_token_obj.token}
url = self.url + '?path=/new'
resp = self.client.post(url, data=data, **headers)
self.assertEqual(200, resp.status_code)
class ViaUploadLinkTest(BaseTestCase):
def _create_repo_api_token_obj(self, app_name, permission):
username = self.user.username
return RepoAPITokens.objects.create_token(app_name, self.repo_id, username, permission=permission)
def setUp(self):
self.login_as(self.user)
repo_id = self.create_repo(name='test-repo',
desc='',
username=self.user.username,
passwd=None)
self.repo_id = repo_id
self.folder_name = os.path.basename(self.create_folder(repo_id=self.repo_id,
parent_dir='/',
dirname='folder',
username='test@test.com'))
self.file_name = os.path.basename(self.create_file(repo_id=self.repo_id,
parent_dir='/',
filename='test.txt',
username='test@test.com'))
self.r_app_name, permission = 'app_name', 'r'
self.repo_r_api_token_obj = self._create_repo_api_token_obj(self.r_app_name, permission)
self.rw_app_name, permission = 'rw_app_name', 'rw'
self.repo_rw_api_token_obj = self._create_repo_api_token_obj(self.rw_app_name, permission)
self.url = reverse('via-upload-link', args=[self.repo_id])
self.logout()
def tearDown(self):
RepoAPITokens.objects.filter(repo_id=self.repo_id).delete()
self.remove_repo(self.repo_id)
def test_get_upload_link_from_r_token(self):
data = {
'path': '/',
}
headers = {'HTTP_AUTHORIZATION': 'token ' + self.repo_r_api_token_obj.token}
resp = self.client.get(self.url, data=data, **headers)
self.assertEqual(403, resp.status_code)
def test_get_upload_link_from_rw_token(self):
data = {
'path': '/',
}
headers = {'HTTP_AUTHORIZATION': 'token ' + self.repo_rw_api_token_obj.token}
resp = self.client.get(self.url, data=data, **headers)
self.assertEqual(200, resp.status_code)
assert resp.content