1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-05 08:53:14 +00:00

sysadmin institutions api (#4203)

* sysadmin institutions api
This commit is contained in:
Leo
2019-11-02 13:16:49 +08:00
committed by lian
parent 3cff7b045f
commit 5dae23e52b
11 changed files with 687 additions and 2 deletions

View File

@@ -0,0 +1,251 @@
import logging
from rest_framework import status
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAdminUser
from rest_framework.response import Response
from rest_framework.views import APIView
from django.utils.translation import ugettext as _
from seaserv import seafile_api, ccnet_api
from seahub.api2.authentication import TokenAuthentication
from seahub.api2.throttling import UserRateThrottle
from seahub.api2.utils import api_error
from seahub.base.templatetags.seahub_tags import email2nickname, email2contact_email
from seahub.base.accounts import User
from seahub.base.models import UserLastLogin
from seahub.profile.models import Profile
from seahub.utils import is_valid_username
from seahub.utils.timeutils import timestamp_to_isoformat_timestr, datetime_to_isoformat_timestr
from seahub.institutions.models import Institution, InstitutionAdmin
from seahub.api2.endpoints.utils import get_user_quota_usage_and_total
from seahub.institutions.utils import is_institution_admin
logger = logging.getLogger(__name__)
def get_institution_user_info(user_obj, institution):
info = {}
info['email'] = user_obj.email
info['name'] = email2nickname(user_obj.email)
info['contact_email'] = email2contact_email(user_obj.email)
info['quota_usage'], info['quota_total'] = get_user_quota_usage_and_total(user_obj.email)
info['create_time'] = timestamp_to_isoformat_timestr(user_obj.ctime)
info['is_active'] = user_obj.is_active
info['is_institution_admin'] = is_institution_admin(user_obj.email, institution)
last_login_obj = UserLastLogin.objects.get_by_username(user_obj.email)
info['last_login'] = datetime_to_isoformat_timestr(last_login_obj.last_login) if last_login_obj else ''
return info
class AdminInstitutionUsers(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAdminUser, )
throttle_classes = (UserRateThrottle, )
def get(self, request, institution_id):
"""List users of an Institution
"""
try:
institution = Institution.objects.get(id=institution_id)
except Institution.DoesNotExist:
error_msg = "institution %s not found." % institution_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
is_institution_admin = request.GET.get('is_institution_admin', '')
# is_institution_admin = '', return all users, filter by page
# is_institution_admin = true, return admin users
# is_institution_admin = false, return none admin users
if not is_institution_admin:
try:
current_page = int(request.GET.get('page', '1'))
per_page = int(request.GET.get('per_page', '100'))
except ValueError:
current_page = 1
per_page = 100
start = (current_page - 1) * per_page
profiles = Profile.objects.filter(institution=institution.name)[start:start + per_page]
emails = [x.user for x in profiles]
else:
is_institution_admin = is_institution_admin.lower()
if is_institution_admin not in ('true', 'false'):
error_msg = 'is_institution_admin %s invalid' % is_institution_admin
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
admin_emails = [user.user for user in InstitutionAdmin.objects.filter(institution=institution)]
if is_institution_admin == 'true':
emails = admin_emails
elif is_institution_admin == 'false':
profiles = Profile.objects.filter(institution=institution.name)
emails = [x.user for x in profiles if x.user not in admin_emails]
user_objs = []
for email in emails:
try:
user_obj = User.objects.get(email=email)
user_objs.append(user_obj)
except User.DoesNotExist:
continue
user_info = []
for user in user_objs:
user_info.append(get_institution_user_info(user, institution))
resp = {
'user_list': user_info,
'total_count': len(user_objs),
}
return Response(resp)
def post(self, request, institution_id):
"""Add users to Institution
"""
try:
institution = Institution.objects.get(id=institution_id)
except Institution.DoesNotExist:
error_msg = "institution %s not found." % institution_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# argument check
emails = request.POST.getlist('email', None)
if not emails:
error_msg = 'email invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
result = {}
result['failed'] = []
result['success'] = []
for email in emails:
if not is_valid_username(email):
result['failed'].append({
'email': email,
'error_msg': 'email %s invalid.' % email
})
continue
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
result['failed'].append({
'email': email,
'error_msg': 'email %s not found.' % email
})
continue
profile = Profile.objects.get_profile_by_user(email)
if not profile:
profile = Profile.objects.add_or_update(username=email)
if profile.institution:
if profile.institution != institution.name:
result['failed'].append({
'email': email,
'error_msg': _("Failed to add %s to the institution: user already belongs to an institution") % email})
continue
else:
result['failed'].append({
'email': email,
'error_msg': _("Failed to add %s to the institution: user already belongs to this institution") % email})
continue
else:
profile.institution = institution.name
profile.save()
result['success'].append(get_institution_user_info(user, institution))
return Response(result)
class AdminInstitutionUser(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAdminUser, )
throttle_classes = (UserRateThrottle, )
def put(self, request, institution_id, email):
""" Update user of an institution
"""
try:
institution = Institution.objects.get(id=institution_id)
except Institution.DoesNotExist:
error_msg = "institution %s not found." % institution_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
error_msg = "user %s not found." % email
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
profile = Profile.objects.get_profile_by_user(email)
if not profile or profile.institution != institution.name:
error_msg = 'email %s invalid' % email
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
if user.is_staff:
error_msg = _('Can not assign institutional administration roles to global administrators')
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
is_inst_admin = request.data.get('is_institution_admin')
if is_inst_admin:
is_inst_admin = is_inst_admin.lower()
if is_inst_admin not in ('true', 'false'):
error_msg = 'is_institution_admin %s invalid' % is_inst_admin
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
try:
if is_inst_admin == 'true':
# if user is already inst admin, cannot set to institution admin
if is_institution_admin(email, institution):
error_msg = 'user %s is already admin' % email
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
else:
InstitutionAdmin.objects.create(institution=institution, user=email)
elif is_inst_admin == 'false':
InstitutionAdmin.objects.filter(institution=institution, user=email).delete()
except Exception as e:
logging.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
return Response(get_institution_user_info(user, institution))
def delete(self, request, institution_id, email):
""" Delete user from an institution
"""
try:
institution = Institution.objects.get(id=institution_id)
except Institution.DoesNotExist:
error_msg = "institution %s not found." % institution_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
User.objects.get(email=email)
except User.DoesNotExist:
error_msg = "user %s not found." % email
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
profile = Profile.objects.get_profile_by_user(email)
if not profile or profile.institution != institution.name:
error_msg = "email %s invalid." % email
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
Profile.objects.add_or_update(email, institution='')
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
# if email is admin, delete from InstitutionAdmin table
InstitutionAdmin.objects.filter(user=email).delete()
return Response({'success': True})

View File

@@ -0,0 +1,168 @@
import logging
from rest_framework import status
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAdminUser
from rest_framework.response import Response
from rest_framework.views import APIView
from seaserv import seafile_api, ccnet_api
from seahub.api2.authentication import TokenAuthentication
from seahub.api2.throttling import UserRateThrottle
from seahub.api2.utils import api_error
from seahub.profile.models import Profile
from seahub.utils.file_size import get_file_size_unit
from seahub.utils.timeutils import datetime_to_isoformat_timestr
from seahub.institutions.models import Institution, InstitutionQuota, InstitutionAdmin
from seahub.institutions.utils import get_institution_space_usage
from seahub.signals import institution_deleted
logger = logging.getLogger(__name__)
class AdminInstitutions(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAdminUser, )
throttle_classes = (UserRateThrottle, )
def get(self, request):
"""List all Institutions
"""
try:
current_page = int(request.GET.get('page', '1'))
per_page = int(request.GET.get('per_page', '100'))
except ValueError:
current_page = 1
per_page = 100
start = per_page * (current_page - 1)
institutions = Institution.objects.all()[start:start + per_page]
count = Institution.objects.count()
institutions_info = []
for institution in institutions:
data = {}
data['id'] = institution.id
data['name'] = institution.name
data['ctime'] = datetime_to_isoformat_timestr(institution.create_time)
institutions_info.append(data)
resp = {
'institution_list': institutions_info,
'total_count': count,
}
return Response(resp)
def post(self, request):
"""Create an Institution
"""
name = request.data.get('name', '').strip()
if not name:
error_msg = 'name invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
try:
institution = Institution.objects.add_institution(name=name)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
info = {}
info['id'] = institution.id
info['name'] = institution.name
info['ctime'] = datetime_to_isoformat_timestr(institution.create_time)
return Response(info)
class AdminInstitution(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
permission_classes = (IsAdminUser, )
throttle_classes = (UserRateThrottle, )
def get(self, request, institution_id):
"""Get an Institution's info
"""
try:
institution = Institution.objects.get(id=institution_id)
except Exception as e:
logger.error(e)
error_msg = "institution %s not found." % institution_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
info = {}
info['id'] = institution.id
info['name'] = institution.name
info['user_count'] = Profile.objects.filter(institution=institution.name).count()
info['quota_total'] = InstitutionQuota.objects.get_or_none(institution=institution)
info['quota_used'] = get_institution_space_usage(institution)
return Response(info)
def put(self, request, institution_id):
"""Update (quota) of institution
"""
try:
institution = Institution.objects.get(id=institution_id)
except Exception as e:
logger.error(e)
error_msg = "institution %s not found." % institution_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
quota_mb = request.data.get('quota', '')
try:
quota_mb = int(quota_mb)
except Exception as e:
logger.error(e)
error_msg = 'quota invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
quota = quota_mb * get_file_size_unit('MB')
try:
InstitutionQuota.objects.update_or_create(
institution=institution,
defaults={'quota': quota},
)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
info = {}
info['id'] = institution.id
info['name'] = institution.name
info['user_count'] = Profile.objects.filter(institution=institution.name).count()
info['quota_total'] = InstitutionQuota.objects.get_or_none(institution=institution)
info['quota_used'] = get_institution_space_usage(institution)
return Response(info)
def delete(self, request, institution_id):
"""Delete an Institution
"""
try:
institution = Institution.objects.get(id=institution_id)
except Exception as e:
logger.error(e)
error_msg = "institution %s not found." % institution_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
institution.delete()
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
# delete user and admin in institution
institution_deleted.send(sender=None, inst_name=institution.name)
return Response({'success': True})

View File

@@ -47,6 +47,7 @@ from seahub.admin_log.signals import admin_operation
from seahub.admin_log.models import USER_DELETE, USER_ADD
from seahub.api2.endpoints.group_owned_libraries import get_group_id_by_repo_owner
from seahub.group.utils import group_id_to_name
from seahub.institutions.models import InstitutionAdmin
from seahub.options.models import UserOptions
from seahub.share.models import FileShare, UploadLinkShare
@@ -186,6 +187,8 @@ def update_user_info(request, user, password, is_active, is_staff, role,
if institution_name is not None:
Profile.objects.add_or_update(email, institution=institution_name)
if institution_name == '':
InstitutionAdmin.objects.filter(user=email).delete()
if quota_total_mb:
quota_total = int(quota_total_mb) * get_file_size_unit('MB')

View File

@@ -189,3 +189,24 @@ def generate_links_header_for_paginator(base_url, page, per_page, total_count, o
(next_page_url, last_page_url, first_page_url, prev_page_url)
return links_header
def get_user_quota_usage_and_total(email, org_id=''):
try:
if org_id:
quota_usage = seafile_api.get_org_user_quota_usage(org_id, email)
quota_total = seafile_api.get_org_user_quota(org_id, email)
else:
orgs = ccnet_api.get_orgs_by_user(email)
if orgs:
org_id = orgs[0].org_id
quota_usage = seafile_api.get_org_user_quota_usage(org_id, email)
quota_total = seafile_api.get_org_user_quota(org_id, email)
else:
quota_usage = seafile_api.get_user_self_usage(email)
quota_total = seafile_api.get_user_quota(email)
except Exception as e:
logger.error(e)
quota_usage = -1
quota_total = -1
return quota_usage, quota_total

View File

@@ -11,6 +11,7 @@ from django.db.models.manager import EmptyManager
from django.contrib.contenttypes.models import ContentType
from django.utils.encoding import smart_str
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
logger = logging.getLogger(__name__)
UNUSABLE_PASSWORD = '!' # This will never be a valid hash
@@ -170,3 +171,7 @@ from registration.signals import user_deleted
def user_deleted_cb(sender, **kwargs):
username = kwargs['username']
SocialAuthUser.objects.filter(username=username).delete()
# if user is institution admin, delete recored in InstitutionAdmin
if getattr(settings, 'MULTI_INSTITUTION', False):
from seahub.institutions.models import InstitutionAdmin
InstitutionAdmin.objects.filter(user=username).delete()

View File

@@ -4,11 +4,17 @@ from django.utils import timezone
from seahub.base.fields import LowerCaseCharField
class InstitutionManager(models.Manager):
def add_institution(self, name):
institution = super(InstitutionManager, self).create(name=name)
institution.save()
return institution
class Institution(models.Model):
name = models.CharField(max_length=200)
create_time = models.DateTimeField(default=timezone.now)
objects = InstitutionManager()
class InstitutionAdmin(models.Model):
institution = models.ForeignKey(Institution)

View File

@@ -1,7 +1,7 @@
# Copyright (c) 2012-2016 Seafile Ltd.
from seaserv import seafile_api
from seahub.profile.models import Profile
from seahub.institutions.models import InstitutionQuota
from seahub.institutions.models import InstitutionQuota, InstitutionAdmin
def get_institution_space_usage(inst):
@@ -23,3 +23,11 @@ def get_institution_available_quota(inst):
allocated += seafile_api.get_user_quota(user)
return 0 if allocated >= inst_quota else inst_quota - allocated
def is_institution_admin(email, institution=None):
# if institution_name is given, return if email is admin in institution
# else, return if email is admin in all institutions
if institution:
admins = InstitutionAdmin.objects.filter(user=email, institution=institution)
return len(admins) >= 1

View File

@@ -12,6 +12,7 @@ from seahub.profile.settings import EMAIL_ID_CACHE_PREFIX, EMAIL_ID_CACHE_TIMEOU
from seahub.institutions.models import Institution
from registration.signals import user_registered
from seahub.signals import institution_deleted
from seahub.institutions.models import InstitutionAdmin
# Get an instance of a logger
logger = logging.getLogger(__name__)
@@ -238,4 +239,5 @@ def update_profile_cache(sender, instance, **kwargs):
def remove_user_for_inst_deleted(sender, **kwargs):
inst_name = kwargs.get("inst_name", "")
Profile.objects.filter(institution=inst_name).update(institution="")
InstitutionAdmin.objects.filter(institution__name=inst_name).delete()

View File

@@ -129,6 +129,8 @@ from seahub.api2.endpoints.admin.users_batch import AdminUsersBatch, AdminAdminU
AdminImportUsers
from seahub.api2.endpoints.admin.operation_logs import AdminOperationLogs
from seahub.api2.endpoints.admin.organizations import AdminOrganizations, AdminOrganization
from seahub.api2.endpoints.admin.institutions import AdminInstitutions, AdminInstitution
from seahub.api2.endpoints.admin.institution_users import AdminInstitutionUsers, AdminInstitutionUser
from seahub.api2.endpoints.admin.org_users import AdminOrgUsers, AdminOrgUser
from seahub.api2.endpoints.admin.org_groups import AdminOrgGroups
from seahub.api2.endpoints.admin.org_repos import AdminOrgRepos
@@ -545,6 +547,12 @@ urlpatterns = [
url(r'^api/v2.1/admin/organizations/(?P<org_id>\d+)/repos/$', AdminOrgRepos.as_view(),name='api-v2.1-admin-org-repos'),
url(r'^api/v2.1/admin/organizations/(?P<org_id>\d+)/statistics/traffic/$', AdminOrgStatsTraffic.as_view(), name='api-v2.1-admin-org-stats-traffic'),
## admin::institutions
url(r'^api/v2.1/admin/institutions/$', AdminInstitutions.as_view(), name='api-v2.1-admin-institutions'),
url(r'^api/v2.1/admin/institutions/(?P<institution_id>\d+)/$', AdminInstitution.as_view(), name='api-v2.1-admin-institution'),
url(r'^api/v2.1/admin/institutions/(?P<institution_id>\d+)/users/$', AdminInstitutionUsers.as_view(), name='api-v2.1-admin-institution-users'),
url(r'^api/v2.1/admin/institutions/(?P<institution_id>\d+)/users/(?P<email>[^/]+)/$', AdminInstitutionUser.as_view(), name='api-v2.1-admin-institution-user'),
## admin::logo
url(r'^api/v2.1/admin/logo/$', AdminLogo.as_view(), name='api-v2.1-admin-logo'),
url(r'^api/v2.1/admin/favicon/$', AdminFavicon.as_view(), name='api-v2.1-admin-favicon'),

View File

@@ -0,0 +1,104 @@
import json
import logging
from django.core.urlresolvers import reverse
from seahub.test_utils import BaseTestCase
from tests.common.utils import randstring
from seahub.institutions.models import Institution, InstitutionAdmin
from seahub.profile.models import Profile
logger = logging.getLogger(__name__)
class AdminInstitutionUsersTest(BaseTestCase):
def setUp(self):
pass
def _add_institution(self, name=''):
return Institution.objects.create(name=name)
def _delete_institution(self, name=''):
try:
institution = Institution.objects.get(name=name)
institution.delete()
except Exception as e:
logger.error(e)
def test_can_get(self):
self.login_as(self.admin)
inst = self._add_institution('int1')
url = reverse('api-v2.1-admin-institution-users', args=[inst.id])
resp = self.client.get(url)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert type(json_resp['user_list']) is list
inst.delete()
def test_can_create(self):
self.login_as(self.admin)
inst = self._add_institution('int1')
url = reverse('api-v2.1-admin-institution-users', args=[inst.id])
data = {
'email': 'invalid_email_string',
}
resp = self.client.post(url, data)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert type(json_resp['success']) is list
assert type(json_resp['failed']) is list
class AdminInstitutionUserTest(BaseTestCase):
def setUp(self):
pass
def _add_institution(self, name=''):
return Institution.objects.create(name=name)
def _delete_institution(self, name=''):
try:
institution = Institution.objects.get(name=name)
institution.delete()
except Exception as e:
logger.error(e)
def _add_user_in_institution(self, email, inst_name):
profile = Profile.objects.get_profile_by_user(email)
if not profile:
profile = Profile.objects.add_or_update(username=email, institution=inst_name)
else:
profile.institution = inst_name
profile.save()
def test_can_update(self):
self.login_as(self.admin)
inst = self._add_institution('int1')
self._add_user_in_institution(self.user.email, inst.name)
url = reverse('api-v2.1-admin-institution-user', args=[inst.id, self.user.email])
data = 'is_institution_admin=True'
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['is_institution_admin'] is True
inst.delete()
def test_can_delete(self):
self.login_as(self.admin)
inst = self._add_institution('int1')
self._add_user_in_institution(self.user.email, inst.name)
url = reverse('api-v2.1-admin-institution-user', args=[inst.id, self.user.email])
resp = self.client.delete(url)
self.assertEqual(200, resp.status_code)
inst.delete()

View File

@@ -0,0 +1,109 @@
import json
import logging
from django.core.urlresolvers import reverse
from seahub.test_utils import BaseTestCase
from tests.common.utils import randstring
from seahub.institutions.models import Institution
logger = logging.getLogger(__name__)
class InstitutionsTest(BaseTestCase):
def setUp(self):
pass
def _add_institution(self, name=''):
return Institution.objects.create(name=name)
def _delete_institution(self, name=''):
try:
institution = Institution.objects.get(name=name)
institution.delete()
except Exception as e:
logger.error(e)
def test_can_get(self):
self.login_as(self.admin)
inst = self._add_institution('int1')
url = reverse('api-v2.1-admin-institutions')
resp = self.client.get(url)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['institution_list'][0]['name'] == 'int1'
assert json_resp['total_count'] == 1
inst.delete()
def test_can_create(self):
self.login_as(self.admin)
url = reverse('api-v2.1-admin-institutions')
institution_name = randstring(10)
data = {
'name': institution_name,
}
resp = self.client.post(url, data)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['name'] == institution_name
self._delete_institution(institution_name)
class InstitutionTest(BaseTestCase):
def setUp(self):
pass
def _add_institution(self, name=''):
return Institution.objects.create(name=name)
def _delete_institution(self, name=''):
try:
institution = Institution.objects.get(name=name)
institution.delete()
except Exception as e:
logger.error(e)
def test_can_get(self):
self.login_as(self.admin)
institution_name = randstring(10)
inst = self._add_institution(institution_name)
url = reverse('api-v2.1-admin-institution', args=[inst.id])
resp = self.client.get(url)
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['id'] == inst.id
assert json_resp['name'] == institution_name
inst.delete()
def test_can_update(self):
self.login_as(self.admin)
institution_name = randstring(10)
inst = self._add_institution(institution_name)
url = reverse('api-v2.1-admin-institution', args=[inst.id])
data = 'quota=1024'
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
self.assertEqual(200, resp.status_code)
json_resp = json.loads(resp.content)
assert json_resp['quota_total'] == 1024 * 1000 * 1000
inst.delete()
def test_can_delete(self):
self.login_as(self.admin)
institution_name = randstring(10)
inst = self._add_institution(institution_name)
url = reverse('api-v2.1-admin-institution', args=[inst.id])
resp = self.client.delete(url)
self.assertEqual(200, resp.status_code)