mirror of
https://github.com/haiwen/seahub.git
synced 2025-09-05 08:53:14 +00:00
sysadmin reconstruct user api (#4031)
This commit is contained in:
@@ -108,6 +108,26 @@ class AdminShareLink(APIView):
|
|||||||
link_info = get_share_link_info(sharelink)
|
link_info = get_share_link_info(sharelink)
|
||||||
return Response(link_info)
|
return Response(link_info)
|
||||||
|
|
||||||
|
def delete(self, request, token):
|
||||||
|
""" Remove a special share link.
|
||||||
|
|
||||||
|
Permission checking:
|
||||||
|
1. only admin can perform this action.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
fs = FileShare.objects.get(token=token)
|
||||||
|
except FileShare.DoesNotExist:
|
||||||
|
return Response({'success': True})
|
||||||
|
|
||||||
|
try:
|
||||||
|
fs.delete()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'Internal Server Error'
|
||||||
|
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||||
|
|
||||||
|
return Response({'success': True})
|
||||||
|
|
||||||
|
|
||||||
class AdminShareLinkDirents(APIView):
|
class AdminShareLinkDirents(APIView):
|
||||||
|
|
||||||
|
@@ -86,6 +86,26 @@ class AdminUploadLink(APIView):
|
|||||||
link_info = get_upload_link_info(uploadlink)
|
link_info = get_upload_link_info(uploadlink)
|
||||||
return Response(link_info)
|
return Response(link_info)
|
||||||
|
|
||||||
|
def delete(self, request, token):
|
||||||
|
""" Remove a special upload link.
|
||||||
|
|
||||||
|
Permission checking:
|
||||||
|
1. only admin can perform this action.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
fs = UploadLinkShare.objects.get(token=token)
|
||||||
|
except UploadLinkShare.DoesNotExist:
|
||||||
|
return Response({'success': True})
|
||||||
|
|
||||||
|
try:
|
||||||
|
fs.delete()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'Internal Server Error'
|
||||||
|
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||||
|
|
||||||
|
return Response({'success': True})
|
||||||
|
|
||||||
|
|
||||||
class AdminUploadLinkUpload(APIView):
|
class AdminUploadLinkUpload(APIView):
|
||||||
|
|
||||||
|
@@ -1,5 +1,8 @@
|
|||||||
# Copyright (c) 2012-2016 Seafile Ltd.
|
# Copyright (c) 2012-2016 Seafile Ltd.
|
||||||
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
from types import FunctionType
|
||||||
|
from constance import config
|
||||||
|
|
||||||
from rest_framework import status
|
from rest_framework import status
|
||||||
from rest_framework.authentication import SessionAuthentication
|
from rest_framework.authentication import SessionAuthentication
|
||||||
@@ -7,7 +10,6 @@ from rest_framework.permissions import IsAdminUser
|
|||||||
from rest_framework.response import Response
|
from rest_framework.response import Response
|
||||||
from rest_framework.views import APIView
|
from rest_framework.views import APIView
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from django.core.urlresolvers import reverse
|
|
||||||
from django.utils.translation import ugettext as _
|
from django.utils.translation import ugettext as _
|
||||||
|
|
||||||
from seaserv import seafile_api, ccnet_api
|
from seaserv import seafile_api, ccnet_api
|
||||||
@@ -15,26 +17,91 @@ from seaserv import seafile_api, ccnet_api
|
|||||||
from seahub.api2.authentication import TokenAuthentication
|
from seahub.api2.authentication import TokenAuthentication
|
||||||
from seahub.api2.throttling import UserRateThrottle
|
from seahub.api2.throttling import UserRateThrottle
|
||||||
from seahub.api2.utils import api_error, to_python_boolean
|
from seahub.api2.utils import api_error, to_python_boolean
|
||||||
from seahub.api2.endpoints.utils import generate_links_header_for_paginator
|
|
||||||
|
|
||||||
from seahub.settings import SEND_EMAIL_ON_ADDING_SYSTEM_MEMBER
|
from seahub.settings import SEND_EMAIL_ON_ADDING_SYSTEM_MEMBER, INIT_PASSWD, \
|
||||||
|
SEND_EMAIL_ON_RESETTING_USER_PASSWD
|
||||||
|
from seahub.base.templatetags.seahub_tags import email2nickname
|
||||||
from seahub.base.accounts import User
|
from seahub.base.accounts import User
|
||||||
from seahub.base.templatetags.seahub_tags import email2nickname, \
|
from seahub.base.models import UserLastLogin
|
||||||
email2contact_email
|
from seahub.two_factor.models import default_device
|
||||||
from seahub.profile.models import Profile, DetailedProfile
|
from seahub.profile.models import Profile, DetailedProfile
|
||||||
from seahub.profile.settings import CONTACT_CACHE_TIMEOUT, CONTACT_CACHE_PREFIX
|
from seahub.profile.settings import CONTACT_CACHE_TIMEOUT, CONTACT_CACHE_PREFIX
|
||||||
from seahub.utils import is_valid_username, is_org_context, \
|
from seahub.utils import is_valid_username, is_org_context, \
|
||||||
is_pro_version, normalize_cache_key, is_valid_email, \
|
is_pro_version, normalize_cache_key, is_valid_email, \
|
||||||
IS_EMAIL_CONFIGURED, send_html_email, get_site_name
|
IS_EMAIL_CONFIGURED, send_html_email, get_site_name
|
||||||
from seahub.utils.timeutils import timestamp_to_isoformat_timestr
|
|
||||||
from seahub.utils.file_size import get_file_size_unit
|
from seahub.utils.file_size import get_file_size_unit
|
||||||
|
from seahub.utils.timeutils import timestamp_to_isoformat_timestr
|
||||||
|
from seahub.utils.repo import normalize_repo_status_code
|
||||||
|
from seahub.constants import DEFAULT_ADMIN
|
||||||
|
from seahub.role_permissions.models import AdminRole
|
||||||
from seahub.role_permissions.utils import get_available_roles
|
from seahub.role_permissions.utils import get_available_roles
|
||||||
from seahub.utils.licenseparse import user_number_over_limit
|
from seahub.utils.licenseparse import user_number_over_limit
|
||||||
|
|
||||||
|
from seahub.options.models import UserOptions
|
||||||
|
from seahub.share.models import FileShare, UploadLinkShare
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
json_content_type = 'application/json; charset=utf-8'
|
json_content_type = 'application/json; charset=utf-8'
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_upload_link_info(uls):
|
||||||
|
data = {}
|
||||||
|
|
||||||
|
repo_id = uls.repo_id
|
||||||
|
try:
|
||||||
|
repo = seafile_api.get_repo(repo_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
repo = None
|
||||||
|
|
||||||
|
path = uls.path
|
||||||
|
if path:
|
||||||
|
obj_name = '/' if path == '/' else os.path.basename(path.rstrip('/'))
|
||||||
|
else:
|
||||||
|
obj_name = ''
|
||||||
|
|
||||||
|
data['repo_name'] = repo.repo_name if repo else ''
|
||||||
|
data['path'] = path
|
||||||
|
data['obj_name'] = obj_name
|
||||||
|
data['view_cnt'] = uls.view_cnt
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_share_link_info(fileshare):
|
||||||
|
data = {}
|
||||||
|
|
||||||
|
repo_id = fileshare.repo_id
|
||||||
|
try:
|
||||||
|
repo = seafile_api.get_repo(repo_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
repo = None
|
||||||
|
|
||||||
|
path = fileshare.path
|
||||||
|
if path:
|
||||||
|
obj_name = '/' if path == '/' else os.path.basename(path.rstrip('/'))
|
||||||
|
else:
|
||||||
|
obj_name = ''
|
||||||
|
|
||||||
|
data['repo_name'] = repo.repo_name if repo else ''
|
||||||
|
|
||||||
|
data['path'] = path
|
||||||
|
data['obj_name'] = obj_name
|
||||||
|
data['is_dir'] = True if fileshare.s_type == 'd' else False
|
||||||
|
|
||||||
|
data['view_cnt'] = fileshare.view_cnt
|
||||||
|
|
||||||
|
if fileshare.s_type == 'f':
|
||||||
|
obj_id = seafile_api.get_file_id_by_path(repo_id, path)
|
||||||
|
data['size'] = seafile_api.get_file_size(repo.store_id,
|
||||||
|
repo.version, obj_id)
|
||||||
|
else:
|
||||||
|
data['size'] = ''
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
def update_user_info(request, user):
|
def update_user_info(request, user):
|
||||||
|
|
||||||
# update basic user info
|
# update basic user info
|
||||||
@@ -51,6 +118,12 @@ def update_user_info(request, user):
|
|||||||
if is_active:
|
if is_active:
|
||||||
is_active = to_python_boolean(is_active)
|
is_active = to_python_boolean(is_active)
|
||||||
user.is_active = is_active
|
user.is_active = is_active
|
||||||
|
if user.is_active:
|
||||||
|
try:
|
||||||
|
send_html_email(_(u'Your account on %s is activated') % get_site_name(),
|
||||||
|
'sysadmin/user_activation_email.html', {'username': user.email}, None, [user.email])
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
|
||||||
# update user
|
# update user
|
||||||
user.save()
|
user.save()
|
||||||
@@ -119,7 +192,6 @@ def get_user_info(email):
|
|||||||
|
|
||||||
info['is_staff'] = user.is_staff
|
info['is_staff'] = user.is_staff
|
||||||
info['is_active'] = user.is_active
|
info['is_active'] = user.is_active
|
||||||
info['create_time'] = user.ctime
|
|
||||||
info['reference_id'] = user.reference_id if user.reference_id else ''
|
info['reference_id'] = user.reference_id if user.reference_id else ''
|
||||||
|
|
||||||
info['department'] = d_profile.department if d_profile else ''
|
info['department'] = d_profile.department if d_profile else ''
|
||||||
@@ -129,12 +201,67 @@ def get_user_info(email):
|
|||||||
|
|
||||||
info['create_time'] = timestamp_to_isoformat_timestr(user.ctime)
|
info['create_time'] = timestamp_to_isoformat_timestr(user.ctime)
|
||||||
|
|
||||||
|
info['has_default_device'] = True if default_device(user) else False
|
||||||
|
info['is_force_2fa'] = UserOptions.objects.is_force_2fa(email)
|
||||||
|
|
||||||
if is_pro_version():
|
if is_pro_version():
|
||||||
info['role'] = user.role
|
info['role'] = user.role
|
||||||
|
|
||||||
return info
|
return info
|
||||||
|
|
||||||
|
|
||||||
|
class AdminAdminUsers(APIView):
|
||||||
|
|
||||||
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
|
permission_classes = (IsAdminUser, )
|
||||||
|
throttle_classes = (UserRateThrottle, )
|
||||||
|
|
||||||
|
def get(self, request):
|
||||||
|
"""List all admins from database and ldap imported
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
db_users = ccnet_api.get_emailusers('DB', -1, -1)
|
||||||
|
ldap_imported_users = ccnet_api.get_emailusers('LDAPImport', -1, -1)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'Internal Server Error'
|
||||||
|
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||||
|
|
||||||
|
admin_users = []
|
||||||
|
for user in db_users + ldap_imported_users:
|
||||||
|
if user.is_staff is True:
|
||||||
|
admin_users.append(user)
|
||||||
|
|
||||||
|
admin_users_info = []
|
||||||
|
for user in admin_users:
|
||||||
|
user_info = {}
|
||||||
|
profile = Profile.objects.get_profile_by_user(user.email)
|
||||||
|
user_info['email'] = user.email
|
||||||
|
user_info['name'] = email2nickname(user.email)
|
||||||
|
user_info['contact_email'] = profile.contact_email if profile and profile.contact_email else ''
|
||||||
|
user_info['login_id'] = profile.login_id if profile and profile.login_id else ''
|
||||||
|
|
||||||
|
user_info['is_staff'] = user.is_staff
|
||||||
|
user_info['is_active'] = user.is_active
|
||||||
|
|
||||||
|
user_info['quota_total'] = seafile_api.get_user_quota(user.email)
|
||||||
|
user_info['quota_usage'] = seafile_api.get_user_self_usage(user.email)
|
||||||
|
|
||||||
|
user_info['create_time'] = timestamp_to_isoformat_timestr(user.ctime)
|
||||||
|
user_info['last_login'] = UserLastLogin.objects.get_by_username(user.email).last_login if UserLastLogin.objects.get_by_username(user.email) else ''
|
||||||
|
|
||||||
|
try:
|
||||||
|
admin_role = AdminRole.objects.get_admin_role(user.email)
|
||||||
|
user_info['admin_role'] = admin_role.role
|
||||||
|
except AdminRole.DoesNotExist:
|
||||||
|
user_info['admin_role'] = DEFAULT_ADMIN
|
||||||
|
admin_users_info.append(user_info)
|
||||||
|
|
||||||
|
result = {
|
||||||
|
'admin_user_list': admin_users_info,
|
||||||
|
}
|
||||||
|
return Response(result)
|
||||||
|
|
||||||
class AdminUsers(APIView):
|
class AdminUsers(APIView):
|
||||||
|
|
||||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
@@ -151,26 +278,37 @@ class AdminUsers(APIView):
|
|||||||
per_page = 25
|
per_page = 25
|
||||||
|
|
||||||
start = (page - 1) * per_page
|
start = (page - 1) * per_page
|
||||||
end = page * per_page + 1
|
end = start + per_page
|
||||||
users = ccnet_api.get_emailusers('DB', start, end)
|
users = ccnet_api.get_emailusers('DB', start, end)
|
||||||
total_count = ccnet_api.count_emailusers('DB') + \
|
total_count = ccnet_api.count_emailusers('DB') + \
|
||||||
ccnet_api.count_inactive_emailusers('DB')
|
ccnet_api.count_inactive_emailusers('DB')
|
||||||
|
|
||||||
data = []
|
data = []
|
||||||
for user in users:
|
for user in users:
|
||||||
user_info = get_user_info(user.email)
|
profile = Profile.objects.get_profile_by_user(user.email)
|
||||||
data.append(user_info)
|
|
||||||
|
info = {}
|
||||||
|
info['email'] = user.email
|
||||||
|
info['name'] = email2nickname(user.email)
|
||||||
|
info['contact_email'] = profile.contact_email if profile and profile.contact_email else ''
|
||||||
|
info['login_id'] = profile.login_id if profile and profile.login_id else ''
|
||||||
|
|
||||||
|
info['is_staff'] = user.is_staff
|
||||||
|
info['is_active'] = user.is_active
|
||||||
|
|
||||||
|
info['quota_total'] = seafile_api.get_user_quota(user.email)
|
||||||
|
info['quota_usage'] = seafile_api.get_user_self_usage(user.email)
|
||||||
|
|
||||||
|
info['create_time'] = timestamp_to_isoformat_timestr(user.ctime)
|
||||||
|
info['last_login'] = UserLastLogin.objects.get_by_username(user.email).last_login if UserLastLogin.objects.get_by_username(user.email) else ''
|
||||||
|
|
||||||
|
if is_pro_version():
|
||||||
|
info['role'] = user.role
|
||||||
|
|
||||||
|
data.append(info)
|
||||||
|
|
||||||
result = {'data': data, 'total_count': total_count}
|
result = {'data': data, 'total_count': total_count}
|
||||||
resp = Response(result)
|
return Response(result)
|
||||||
|
|
||||||
## generate `Links` header for paginator
|
|
||||||
base_url = reverse('api-v2.1-admin-users')
|
|
||||||
links_header = generate_links_header_for_paginator(base_url,
|
|
||||||
page, per_page, total_count)
|
|
||||||
resp['Links'] = links_header
|
|
||||||
|
|
||||||
return resp
|
|
||||||
|
|
||||||
def post(self, request):
|
def post(self, request):
|
||||||
|
|
||||||
@@ -425,3 +563,238 @@ class AdminUser(APIView):
|
|||||||
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||||
|
|
||||||
return Response({'success': True})
|
return Response({'success': True})
|
||||||
|
|
||||||
|
|
||||||
|
class AdminUserResetPassword(APIView):
|
||||||
|
|
||||||
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
|
permission_classes = (IsAdminUser, )
|
||||||
|
throttle_classes = (UserRateThrottle, )
|
||||||
|
|
||||||
|
def put(self, request, email):
|
||||||
|
"""Reset password for user
|
||||||
|
|
||||||
|
Permission checking:
|
||||||
|
1. only admin can perform this action.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not is_valid_username(email):
|
||||||
|
error_msg = 'email invalid'
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
user = User.objects.get(email=email)
|
||||||
|
except User.DoesNotExist as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'email invalid.'
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||||
|
|
||||||
|
if isinstance(INIT_PASSWD, FunctionType):
|
||||||
|
new_password = INIT_PASSWD()
|
||||||
|
else:
|
||||||
|
new_password = INIT_PASSWD
|
||||||
|
user.set_password(new_password)
|
||||||
|
user.save()
|
||||||
|
|
||||||
|
if config.FORCE_PASSWORD_CHANGE:
|
||||||
|
UserOptions.objects.set_force_passwd_change(user.username)
|
||||||
|
|
||||||
|
if IS_EMAIL_CONFIGURED and SEND_EMAIL_ON_RESETTING_USER_PASSWD:
|
||||||
|
c = {'email': email, 'password': new_password}
|
||||||
|
try:
|
||||||
|
send_html_email(_(u'Password has been reset on %s') % get_site_name(),
|
||||||
|
'sysadmin/user_reset_email.html', c, None, [email])
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'Internal Server Error'
|
||||||
|
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||||
|
|
||||||
|
return Response({'new_password': new_password})
|
||||||
|
|
||||||
|
|
||||||
|
class AdminUserGroups(APIView):
|
||||||
|
|
||||||
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
|
permission_classes = (IsAdminUser, )
|
||||||
|
throttle_classes = (UserRateThrottle, )
|
||||||
|
|
||||||
|
def get(self, request, email):
|
||||||
|
""" return all groups user joined
|
||||||
|
|
||||||
|
Permission checking:
|
||||||
|
1. Admin user;
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not is_valid_username(email):
|
||||||
|
error_msg = 'email invalid.'
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
User.objects.get(email=email)
|
||||||
|
except User.DoesNotExist as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'User %s not found.' % email
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
groups_info = []
|
||||||
|
try:
|
||||||
|
groups = ccnet_api.get_personal_groups_by_user(email)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'Internal Server Error'
|
||||||
|
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||||
|
|
||||||
|
# Use dict to reduce memcache fetch cost in large for-loop.
|
||||||
|
nickname_dict = {}
|
||||||
|
creator_name_set = set([g.creator_name for g in groups])
|
||||||
|
for e in creator_name_set:
|
||||||
|
if e not in nickname_dict:
|
||||||
|
nickname_dict[e] = email2nickname(e)
|
||||||
|
|
||||||
|
for group in groups:
|
||||||
|
isoformat_timestr = timestamp_to_isoformat_timestr(group.timestamp)
|
||||||
|
group_info = {
|
||||||
|
"id": group.id,
|
||||||
|
"name": group.group_name,
|
||||||
|
"owner_email": group.creator_name,
|
||||||
|
"owner_name": nickname_dict.get(group.creator_name, ''),
|
||||||
|
"created_at": isoformat_timestr,
|
||||||
|
"parent_group_id": group.parent_group_id if is_pro_version() else 0
|
||||||
|
}
|
||||||
|
groups_info.append(group_info)
|
||||||
|
|
||||||
|
try:
|
||||||
|
is_group_staff = ccnet_api.check_group_staff(group.id, email)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'Internal Server Error'
|
||||||
|
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||||
|
|
||||||
|
if email == group.creator_name:
|
||||||
|
group_info['role'] = 'Owner'
|
||||||
|
elif is_group_staff:
|
||||||
|
group_info['role'] = 'Admin'
|
||||||
|
else:
|
||||||
|
group_info['role'] = 'Member'
|
||||||
|
return Response({'group_list': groups_info})
|
||||||
|
|
||||||
|
|
||||||
|
class AdminUserShareLinks(APIView):
|
||||||
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
|
permission_classes = (IsAdminUser,)
|
||||||
|
throttle_classes = (UserRateThrottle,)
|
||||||
|
|
||||||
|
def get(self, request, email):
|
||||||
|
""" Get all shared download links of a user.
|
||||||
|
|
||||||
|
Permission checking:
|
||||||
|
1. only admin can perform this action.
|
||||||
|
"""
|
||||||
|
if not is_valid_username(email):
|
||||||
|
error_msg = 'email invalid.'
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
User.objects.get(email=email)
|
||||||
|
except User.DoesNotExist as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'User %s not found.' % email
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
share_links = FileShare.objects.filter(username=email)
|
||||||
|
|
||||||
|
links_info = []
|
||||||
|
for fs in share_links:
|
||||||
|
link_info = get_user_share_link_info(fs)
|
||||||
|
links_info.append(link_info)
|
||||||
|
|
||||||
|
return Response({'share_link_list': links_info})
|
||||||
|
|
||||||
|
|
||||||
|
class AdminUserUploadLinks(APIView):
|
||||||
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
|
permission_classes = (IsAdminUser,)
|
||||||
|
throttle_classes = (UserRateThrottle,)
|
||||||
|
|
||||||
|
def get(self, request, email):
|
||||||
|
""" Get all shared upload links of a user.
|
||||||
|
|
||||||
|
Permission checking:
|
||||||
|
1. only admin can perform this action.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not is_valid_username(email):
|
||||||
|
error_msg = 'email invalid.'
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
User.objects.get(email=email)
|
||||||
|
except User.DoesNotExist as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'User %s not found.' % email
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
upload_links = UploadLinkShare.objects.filter(username=email)
|
||||||
|
|
||||||
|
links_info = []
|
||||||
|
for fs in upload_links:
|
||||||
|
link_info = get_user_upload_link_info(fs)
|
||||||
|
links_info.append(link_info)
|
||||||
|
|
||||||
|
return Response({'upload_link_list': links_info})
|
||||||
|
|
||||||
|
|
||||||
|
class AdminUserBeSharedRepos(APIView):
|
||||||
|
|
||||||
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
|
throttle_classes = (UserRateThrottle,)
|
||||||
|
permission_classes = (IsAdminUser,)
|
||||||
|
|
||||||
|
def get(self, request, email):
|
||||||
|
""" List 'all' libraries shared to a user
|
||||||
|
|
||||||
|
Permission checking:
|
||||||
|
1. only admin can perform this action.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not is_valid_username(email):
|
||||||
|
error_msg = 'email invalid.'
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
User.objects.get(email=email)
|
||||||
|
except User.DoesNotExist as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'User %s not found.' % email
|
||||||
|
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
beshared_repos = seafile_api.get_share_in_repo_list(email, -1, -1)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
error_msg = 'Internal Server Error'
|
||||||
|
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
|
||||||
|
|
||||||
|
# Use dict to reduce memcache fetch cost in large for-loop.
|
||||||
|
nickname_dict = {}
|
||||||
|
owner_set = set([x.user for x in beshared_repos])
|
||||||
|
for e in owner_set:
|
||||||
|
if e not in nickname_dict:
|
||||||
|
nickname_dict[e] = email2nickname(e)
|
||||||
|
|
||||||
|
repos_info = []
|
||||||
|
for repo in beshared_repos:
|
||||||
|
repo_info = {}
|
||||||
|
repo_info['id'] = repo.repo_id
|
||||||
|
repo_info['name'] = repo.repo_name
|
||||||
|
repo_info['owner_email'] = repo.user
|
||||||
|
repo_info['owner_name'] = nickname_dict.get(repo.user, '')
|
||||||
|
repo_info['size'] = repo.size
|
||||||
|
repo_info['encrypted'] = repo.encrypted
|
||||||
|
repo_info['file_count'] = repo.file_count
|
||||||
|
repo_info['status'] = normalize_repo_status_code(repo.status)
|
||||||
|
repo_info['last_modify'] = timestamp_to_isoformat_timestr(repo.last_modify)
|
||||||
|
|
||||||
|
repos_info.append(repo_info)
|
||||||
|
|
||||||
|
return Response({'repo_list': repos_info})
|
||||||
|
@@ -19,12 +19,75 @@ from seahub.base.accounts import User
|
|||||||
from seahub.profile.models import Profile
|
from seahub.profile.models import Profile
|
||||||
from seahub.institutions.models import Institution
|
from seahub.institutions.models import Institution
|
||||||
from seahub.utils.file_size import get_file_size_unit
|
from seahub.utils.file_size import get_file_size_unit
|
||||||
|
from seahub.utils import string2list, is_valid_username
|
||||||
from seahub.admin_log.models import USER_DELETE
|
from seahub.admin_log.models import USER_DELETE
|
||||||
from seahub.admin_log.signals import admin_operation
|
from seahub.admin_log.signals import admin_operation
|
||||||
|
from seahub.utils.timeutils import timestamp_to_isoformat_timestr
|
||||||
|
from seahub.constants import DEFAULT_ADMIN
|
||||||
|
from seahub.role_permissions.models import AdminRole
|
||||||
|
from seahub.base.templatetags.seahub_tags import email2nickname
|
||||||
|
from seahub.base.models import UserLastLogin
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class AdminAdminUsersBatch(APIView):
|
||||||
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
|
throttle_classes = (UserRateThrottle,)
|
||||||
|
permission_classes = (IsAdminUser,)
|
||||||
|
|
||||||
|
def post(self, request):
|
||||||
|
""" Add admin in batch
|
||||||
|
"""
|
||||||
|
|
||||||
|
emails = request.data.get('emails', None)
|
||||||
|
if not emails:
|
||||||
|
error_msg = 'emails invalid.'
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||||
|
|
||||||
|
emails = string2list(emails)
|
||||||
|
new_admin_info = []
|
||||||
|
|
||||||
|
for email in emails:
|
||||||
|
if not is_valid_username(email):
|
||||||
|
error_msg = 'email %s invalid.' % email
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||||
|
|
||||||
|
try:
|
||||||
|
user = User.objects.get(email=email)
|
||||||
|
except User.DoesNotExist:
|
||||||
|
error_msg = 'email %s invalid.' % email
|
||||||
|
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
|
||||||
|
|
||||||
|
user.is_staff = True
|
||||||
|
user.save()
|
||||||
|
|
||||||
|
profile = Profile.objects.get_profile_by_user(user.email)
|
||||||
|
user_info = {}
|
||||||
|
user_info['email'] = user.email
|
||||||
|
user_info['name'] = email2nickname(user.email)
|
||||||
|
user_info['contact_email'] = profile.contact_email if profile and profile.contact_email else ''
|
||||||
|
user_info['login_id'] = profile.login_id if profile and profile.login_id else ''
|
||||||
|
|
||||||
|
user_info['is_staff'] = user.is_staff
|
||||||
|
user_info['is_active'] = user.is_active
|
||||||
|
|
||||||
|
user_info['quota_total'] = seafile_api.get_user_quota(user.email)
|
||||||
|
user_info['quota_usage'] = seafile_api.get_user_self_usage(user.email)
|
||||||
|
|
||||||
|
user_info['create_time'] = timestamp_to_isoformat_timestr(user.ctime)
|
||||||
|
user_info['last_login'] = UserLastLogin.objects.get_by_username(user.email).last_login if UserLastLogin.objects.get_by_username(user.email) else ''
|
||||||
|
|
||||||
|
try:
|
||||||
|
admin_role = AdminRole.objects.get_admin_role(user.email)
|
||||||
|
user_info['admin_role'] = admin_role.role
|
||||||
|
except AdminRole.DoesNotExist:
|
||||||
|
user_info['admin_role'] = DEFAULT_ADMIN
|
||||||
|
new_admin_info.append(user_info)
|
||||||
|
|
||||||
|
return Response({'new_admin_user_list': new_admin_info})
|
||||||
|
|
||||||
|
|
||||||
class AdminUsersBatch(APIView):
|
class AdminUsersBatch(APIView):
|
||||||
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
authentication_classes = (TokenAuthentication, SessionAuthentication)
|
||||||
throttle_classes = (UserRateThrottle,)
|
throttle_classes = (UserRateThrottle,)
|
||||||
|
@@ -18,6 +18,12 @@
|
|||||||
enable_work_weixin: {% if enable_work_weixin %} true {% else %} false {% endif %},
|
enable_work_weixin: {% if enable_work_weixin %} true {% else %} false {% endif %},
|
||||||
enableSysAdminViewRepo: {% if enable_sys_admin_view_repo %} true {% else %} false {% endif %},
|
enableSysAdminViewRepo: {% if enable_sys_admin_view_repo %} true {% else %} false {% endif %},
|
||||||
trashReposExpireDays: {{ trash_repos_expire_days }},
|
trashReposExpireDays: {{ trash_repos_expire_days }},
|
||||||
|
is_email_configured: {% if is_email_configured %} true {% else %} false {% endif %},
|
||||||
|
send_email_on_resetting_user_passwd: {% if send_email_on_resetting_user_passwd %} true {% else %} false {% endif %},
|
||||||
|
send_email_on_adding_system_member: {% if send_email_on_adding_system_member %} true {% else %} false {% endif %},
|
||||||
|
enable_two_factor_auth: {% if enable_two_factor_auth %} true {% else %} false {% endif %},
|
||||||
|
available_roles: '{{ available_roles | escapejs }}' ,
|
||||||
|
available_admin_roles: '{{ available_admin_roles | escapejs }}' ,
|
||||||
admin_permissions: {
|
admin_permissions: {
|
||||||
"can_view_system_info": {% if user.admin_permissions.can_view_system_info %} true {% else %} false {% endif %},
|
"can_view_system_info": {% if user.admin_permissions.can_view_system_info %} true {% else %} false {% endif %},
|
||||||
"can_view_statistic": {% if user.admin_permissions.can_view_statistic %} true {% else %} false {% endif %},
|
"can_view_statistic": {% if user.admin_permissions.can_view_statistic %} true {% else %} false {% endif %},
|
||||||
|
@@ -107,7 +107,8 @@ from seahub.api2.endpoints.admin.statistics import (
|
|||||||
)
|
)
|
||||||
from seahub.api2.endpoints.admin.devices import AdminDevices
|
from seahub.api2.endpoints.admin.devices import AdminDevices
|
||||||
from seahub.api2.endpoints.admin.device_errors import AdminDeviceErrors
|
from seahub.api2.endpoints.admin.device_errors import AdminDeviceErrors
|
||||||
from seahub.api2.endpoints.admin.users import AdminUsers, AdminUser
|
from seahub.api2.endpoints.admin.users import AdminUsers, AdminUser, AdminUserResetPassword, AdminAdminUsers, \
|
||||||
|
AdminUserGroups, AdminUserShareLinks, AdminUserUploadLinks, AdminUserBeSharedRepos
|
||||||
from seahub.api2.endpoints.admin.device_trusted_ip import AdminDeviceTrustedIP
|
from seahub.api2.endpoints.admin.device_trusted_ip import AdminDeviceTrustedIP
|
||||||
from seahub.api2.endpoints.admin.libraries import AdminLibraries, AdminLibrary
|
from seahub.api2.endpoints.admin.libraries import AdminLibraries, AdminLibrary
|
||||||
from seahub.api2.endpoints.admin.library_dirents import AdminLibraryDirents, AdminLibraryDirent
|
from seahub.api2.endpoints.admin.library_dirents import AdminLibraryDirents, AdminLibraryDirent
|
||||||
@@ -124,7 +125,7 @@ from seahub.api2.endpoints.admin.share_links import AdminShareLink, \
|
|||||||
AdminShareLinkDirents
|
AdminShareLinkDirents
|
||||||
from seahub.api2.endpoints.admin.upload_links import AdminUploadLink, \
|
from seahub.api2.endpoints.admin.upload_links import AdminUploadLink, \
|
||||||
AdminUploadLinkUpload, AdminUploadLinkCheckPassword
|
AdminUploadLinkUpload, AdminUploadLinkCheckPassword
|
||||||
from seahub.api2.endpoints.admin.users_batch import AdminUsersBatch
|
from seahub.api2.endpoints.admin.users_batch import AdminUsersBatch, AdminAdminUsersBatch
|
||||||
from seahub.api2.endpoints.admin.operation_logs import AdminOperationLogs
|
from seahub.api2.endpoints.admin.operation_logs import AdminOperationLogs
|
||||||
from seahub.api2.endpoints.admin.organizations import AdminOrganizations, AdminOrganization
|
from seahub.api2.endpoints.admin.organizations import AdminOrganizations, AdminOrganization
|
||||||
from seahub.api2.endpoints.admin.org_users import AdminOrgUsers, AdminOrgUser
|
from seahub.api2.endpoints.admin.org_users import AdminOrgUsers, AdminOrgUser
|
||||||
@@ -451,6 +452,13 @@ urlpatterns = [
|
|||||||
# [^...] Matches any single character not in brackets
|
# [^...] Matches any single character not in brackets
|
||||||
# + Matches between one and unlimited times, as many times as possible
|
# + Matches between one and unlimited times, as many times as possible
|
||||||
url(r'^api/v2.1/admin/users/(?P<email>[^/]+@[^/]+)/$', AdminUser.as_view(), name='api-v2.1-admin-user'),
|
url(r'^api/v2.1/admin/users/(?P<email>[^/]+@[^/]+)/$', AdminUser.as_view(), name='api-v2.1-admin-user'),
|
||||||
|
url(r'^api/v2.1/admin/users/(?P<email>[^/]+@[^/]+)/reset-password/$', AdminUserResetPassword.as_view(), name='api-v2.1-admin-user-reset-password'),
|
||||||
|
url(r'^api/v2.1/admin/users/(?P<email>[^/]+@[^/]+)/groups/$', AdminUserGroups.as_view(), name='api-v2.1-admin-user-groups'),
|
||||||
|
url(r'^api/v2.1/admin/users/(?P<email>[^/]+@[^/]+)/share-links/$', AdminUserShareLinks.as_view(), name='api-v2.1-admin-user-share-links'),
|
||||||
|
url(r'^api/v2.1/admin/users/(?P<email>[^/]+@[^/]+)/upload-links/$', AdminUserUploadLinks.as_view(), name='api-v2.1-admin-user-upload-links'),
|
||||||
|
url(r'^api/v2.1/admin/users/(?P<email>[^/]+@[^/]+)/beshared-repos/$', AdminUserBeSharedRepos.as_view(), name='api-v2.1-admin-user-beshared-repos'),
|
||||||
|
|
||||||
|
url(r'^api/v2.1/admin/admin-users/$', AdminAdminUsers.as_view(), name='api-v2.1-admin-admin-users'),
|
||||||
|
|
||||||
## admin::devices
|
## admin::devices
|
||||||
url(r'^api/v2.1/admin/devices/$', AdminDevices.as_view(), name='api-v2.1-admin-devices'),
|
url(r'^api/v2.1/admin/devices/$', AdminDevices.as_view(), name='api-v2.1-admin-devices'),
|
||||||
@@ -509,6 +517,7 @@ urlpatterns = [
|
|||||||
AdminUploadLinkCheckPassword.as_view(), name='api-v2.1-admin-upload-link-check-password'),
|
AdminUploadLinkCheckPassword.as_view(), name='api-v2.1-admin-upload-link-check-password'),
|
||||||
|
|
||||||
## admin::users
|
## admin::users
|
||||||
|
url(r'^api/v2.1/admin/admin-users/batch/$', AdminAdminUsersBatch.as_view(), name='api-v2.1-admin-users-batch'),
|
||||||
url(r'^api/v2.1/admin/users/batch/$', AdminUsersBatch.as_view(), name='api-v2.1-admin-users-batch'),
|
url(r'^api/v2.1/admin/users/batch/$', AdminUsersBatch.as_view(), name='api-v2.1-admin-users-batch'),
|
||||||
|
|
||||||
## admin::admin-role
|
## admin::admin-role
|
||||||
@@ -649,6 +658,10 @@ urlpatterns = [
|
|||||||
url(r'^sys/groups/$', sysadmin_react_fake_view, name="sys_groups"),
|
url(r'^sys/groups/$', sysadmin_react_fake_view, name="sys_groups"),
|
||||||
url(r'^sys/groups/(?P<group_id>\d+)/libraries/$', sysadmin_react_fake_view, name="sys_group_libraries"),
|
url(r'^sys/groups/(?P<group_id>\d+)/libraries/$', sysadmin_react_fake_view, name="sys_group_libraries"),
|
||||||
url(r'^sys/groups/(?P<group_id>\d+)/members/$', sysadmin_react_fake_view, name="sys_group_members"),
|
url(r'^sys/groups/(?P<group_id>\d+)/members/$', sysadmin_react_fake_view, name="sys_group_members"),
|
||||||
|
url(r'^sys/users/$', sysadmin_react_fake_view, name="sys_users"),
|
||||||
|
url(r'^sys/users-all/$', sysadmin_react_fake_view, name="sys_users_all"),
|
||||||
|
url(r'^sys/users-admin/$', sysadmin_react_fake_view, name="sys_users_admin"),
|
||||||
|
url(r'^sys/user-info/(?P<email>[^/]+)/$', sysadmin_react_fake_view, name="sys_users"),
|
||||||
url(r'^sys/work-weixin/$', sysadmin_react_fake_view, name="sys_work_weixin"),
|
url(r'^sys/work-weixin/$', sysadmin_react_fake_view, name="sys_work_weixin"),
|
||||||
url(r'^sys/work-weixin/departments/$', sysadmin_react_fake_view, name="sys_work_weixin_departments"),
|
url(r'^sys/work-weixin/departments/$', sysadmin_react_fake_view, name="sys_work_weixin_departments"),
|
||||||
|
|
||||||
|
@@ -76,7 +76,7 @@ import seahub.settings as settings
|
|||||||
from seahub.settings import INIT_PASSWD, SITE_ROOT, \
|
from seahub.settings import INIT_PASSWD, SITE_ROOT, \
|
||||||
SEND_EMAIL_ON_ADDING_SYSTEM_MEMBER, SEND_EMAIL_ON_RESETTING_USER_PASSWD, \
|
SEND_EMAIL_ON_ADDING_SYSTEM_MEMBER, SEND_EMAIL_ON_RESETTING_USER_PASSWD, \
|
||||||
ENABLE_SYS_ADMIN_VIEW_REPO, ENABLE_GUEST_INVITATION, \
|
ENABLE_SYS_ADMIN_VIEW_REPO, ENABLE_GUEST_INVITATION, \
|
||||||
ENABLE_LIMIT_IPADDRESS
|
ENABLE_LIMIT_IPADDRESS, ENABLE_TWO_FACTOR_AUTH
|
||||||
try:
|
try:
|
||||||
from seahub.settings import ENABLE_TRIAL_ACCOUNT
|
from seahub.settings import ENABLE_TRIAL_ACCOUNT
|
||||||
except:
|
except:
|
||||||
@@ -150,6 +150,9 @@ def sysadmin_react_fake_view(request, **kwargs):
|
|||||||
'constance_enabled': dj_settings.CONSTANCE_ENABLED,
|
'constance_enabled': dj_settings.CONSTANCE_ENABLED,
|
||||||
'multi_tenancy': MULTI_TENANCY,
|
'multi_tenancy': MULTI_TENANCY,
|
||||||
'multi_institution': getattr(dj_settings, 'MULTI_INSTITUTION', False),
|
'multi_institution': getattr(dj_settings, 'MULTI_INSTITUTION', False),
|
||||||
|
'is_email_configured': IS_EMAIL_CONFIGURED,
|
||||||
|
'send_email_on_resetting_user_passwd': SEND_EMAIL_ON_RESETTING_USER_PASSWD,
|
||||||
|
'send_email_on_adding_system_member': SEND_EMAIL_ON_ADDING_SYSTEM_MEMBER,
|
||||||
'sysadmin_extra_enabled': ENABLE_SYSADMIN_EXTRA,
|
'sysadmin_extra_enabled': ENABLE_SYSADMIN_EXTRA,
|
||||||
'enable_guest_invitation': ENABLE_GUEST_INVITATION,
|
'enable_guest_invitation': ENABLE_GUEST_INVITATION,
|
||||||
'enable_terms_and_conditions': config.ENABLE_TERMS_AND_CONDITIONS,
|
'enable_terms_and_conditions': config.ENABLE_TERMS_AND_CONDITIONS,
|
||||||
@@ -157,6 +160,9 @@ def sysadmin_react_fake_view(request, **kwargs):
|
|||||||
'enable_work_weixin': ENABLE_WORK_WEIXIN,
|
'enable_work_weixin': ENABLE_WORK_WEIXIN,
|
||||||
'enable_sys_admin_view_repo': ENABLE_SYS_ADMIN_VIEW_REPO,
|
'enable_sys_admin_view_repo': ENABLE_SYS_ADMIN_VIEW_REPO,
|
||||||
'trash_repos_expire_days': expire_days if expire_days > 0 else 30,
|
'trash_repos_expire_days': expire_days if expire_days > 0 else 30,
|
||||||
|
'enable_two_factor_auth': ENABLE_TWO_FACTOR_AUTH,
|
||||||
|
'available_roles': get_available_roles(),
|
||||||
|
'available_admin_roles': get_available_admin_roles()
|
||||||
})
|
})
|
||||||
|
|
||||||
@login_required
|
@login_required
|
||||||
|
@@ -9,6 +9,7 @@ from seahub.test_utils import BaseTestCase
|
|||||||
from seahub.base.templatetags.seahub_tags import email2nickname, \
|
from seahub.base.templatetags.seahub_tags import email2nickname, \
|
||||||
email2contact_email
|
email2contact_email
|
||||||
from seahub.profile.models import DetailedProfile
|
from seahub.profile.models import DetailedProfile
|
||||||
|
from seahub.share.models import FileShare, UploadLinkShare
|
||||||
from seahub.utils.file_size import get_file_size_unit
|
from seahub.utils.file_size import get_file_size_unit
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -33,19 +34,14 @@ class AdminUsersTest(BaseTestCase):
|
|||||||
|
|
||||||
json_resp = json.loads(resp.content)
|
json_resp = json.loads(resp.content)
|
||||||
|
|
||||||
assert json_resp['total_count'] > 0
|
|
||||||
assert len(json_resp['data']) == json_resp['total_count']
|
|
||||||
|
|
||||||
assert 'email' in json_resp['data'][0]
|
assert 'email' in json_resp['data'][0]
|
||||||
assert 'name' in json_resp['data'][0]
|
assert 'name' in json_resp['data'][0]
|
||||||
assert 'contact_email' in json_resp['data'][0]
|
assert 'contact_email' in json_resp['data'][0]
|
||||||
assert 'is_staff' in json_resp['data'][0]
|
assert 'is_staff' in json_resp['data'][0]
|
||||||
assert 'is_active' in json_resp['data'][0]
|
assert 'is_active' in json_resp['data'][0]
|
||||||
assert 'create_time' in json_resp['data'][0]
|
assert 'create_time' in json_resp['data'][0]
|
||||||
assert 'department' in json_resp['data'][0]
|
|
||||||
assert 'quota_total' in json_resp['data'][0]
|
assert 'quota_total' in json_resp['data'][0]
|
||||||
assert 'quota_usage' in json_resp['data'][0]
|
assert 'quota_usage' in json_resp['data'][0]
|
||||||
assert 'create_time' in json_resp['data'][0]
|
|
||||||
|
|
||||||
def test_get_with_invalid_user_permission(self):
|
def test_get_with_invalid_user_permission(self):
|
||||||
self.login_as(self.user)
|
self.login_as(self.user)
|
||||||
@@ -382,3 +378,101 @@ class AdminUserTest(BaseTestCase):
|
|||||||
data = {"email": self.tmp_email, "reference_id": ''}
|
data = {"email": self.tmp_email, "reference_id": ''}
|
||||||
resp = self.client.put(self.url, json.dumps(data),
|
resp = self.client.put(self.url, json.dumps(data),
|
||||||
'application/json')
|
'application/json')
|
||||||
|
|
||||||
|
|
||||||
|
class AdminUserShareLinksTest(BaseTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.repo_id = self.repo.id
|
||||||
|
self.file_path= self.file
|
||||||
|
self.folder_path= self.folder
|
||||||
|
self.invalid_token = '00000000000000000000'
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.remove_repo()
|
||||||
|
|
||||||
|
def _add_file_share_link(self, password=None):
|
||||||
|
fs = FileShare.objects.create_file_link(
|
||||||
|
self.user.username, self.repo.id, self.file, password, None)
|
||||||
|
|
||||||
|
return fs.token
|
||||||
|
|
||||||
|
def _add_dir_share_link(self, password=None):
|
||||||
|
fs = FileShare.objects.create_dir_link(
|
||||||
|
self.user.username, self.repo.id, self.folder, password, None)
|
||||||
|
|
||||||
|
return fs.token
|
||||||
|
|
||||||
|
def _remove_share_link(self, token):
|
||||||
|
link = FileShare.objects.get(token=token)
|
||||||
|
link.delete()
|
||||||
|
|
||||||
|
def test_get_file_share_links(self):
|
||||||
|
self.login_as(self.admin)
|
||||||
|
token = self._add_file_share_link()
|
||||||
|
|
||||||
|
url = reverse('api-v2.1-admin-user-share-links', args=[self.admin.username])
|
||||||
|
|
||||||
|
resp = self.client.get(url)
|
||||||
|
|
||||||
|
json_resp = json.loads(resp.content)
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
assert len(json_resp) > 0
|
||||||
|
|
||||||
|
self._remove_share_link(token)
|
||||||
|
|
||||||
|
|
||||||
|
class AdminUserUploadLinksTest(BaseTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.repo_id = self.repo.id
|
||||||
|
self.folder_path= self.folder
|
||||||
|
self.invalid_token = '00000000000000000000'
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.remove_repo()
|
||||||
|
|
||||||
|
def _add_upload_link(self, password=None):
|
||||||
|
fs = UploadLinkShare.objects.create_upload_link_share(
|
||||||
|
self.user.username, self.repo.id, self.folder_path, password, None)
|
||||||
|
|
||||||
|
return fs.token
|
||||||
|
|
||||||
|
def _remove_upload_link(self, token):
|
||||||
|
link = UploadLinkShare.objects.get(token=token)
|
||||||
|
link.delete()
|
||||||
|
|
||||||
|
def test_get_file_share_links(self):
|
||||||
|
self.login_as(self.admin)
|
||||||
|
token = self._add_upload_link()
|
||||||
|
|
||||||
|
url = reverse('api-v2.1-admin-user-upload-links', args=[self.admin.username])
|
||||||
|
resp = self.client.get(url)
|
||||||
|
|
||||||
|
json_resp = json.loads(resp.content)
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
assert len(json_resp) > 0
|
||||||
|
|
||||||
|
self._remove_upload_link(token)
|
||||||
|
|
||||||
|
|
||||||
|
class AdminAdminUsersTest(BaseTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.url = reverse('api-v2.1-admin-admin-users')
|
||||||
|
self.tmp_email = '%s@email.com' % randstring(10)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.remove_user(self.tmp_email)
|
||||||
|
|
||||||
|
def test_get_admin_users(self):
|
||||||
|
self.login_as(self.admin)
|
||||||
|
|
||||||
|
resp = self.client.get(self.url)
|
||||||
|
self.assertEqual(200, resp.status_code)
|
||||||
|
|
||||||
|
json_resp = json.loads(resp.content)
|
||||||
|
|
||||||
|
for admin_user in json_resp['admin_user_list']:
|
||||||
|
assert admin_user['is_staff'] == True
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user