1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-18 00:00:00 +00:00

add api of amdin manage org user

This commit is contained in:
lian
2017-03-09 16:42:06 +08:00
parent 4cf035986c
commit 8286f73331
7 changed files with 680 additions and 33 deletions

View File

@@ -0,0 +1,239 @@
# Copyright (c) 2012-2016 Seafile Ltd.
import logging
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAdminUser
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework import status
from constance import config
from seaserv import ccnet_api
from seahub.utils import clear_token, is_valid_email
from seahub.utils.licenseparse import user_number_over_limit
from seahub.base.accounts import User
from seahub.base.templatetags.seahub_tags import email2nickname
from seahub.profile.models import Profile
from seahub.options.models import UserOptions
from seahub.api2.authentication import TokenAuthentication
from seahub.api2.throttling import UserRateThrottle
from seahub.api2.utils import api_error
from seahub.api2.permissions import IsProVersion
try:
from seahub.settings import ORG_MEMBER_QUOTA_ENABLED
except ImportError:
ORG_MEMBER_QUOTA_ENABLED= False
logger = logging.getLogger(__name__)
def get_org_user_info(org_id, user_obj):
user_info = {}
email = user_obj.username
user_info['org_id'] = org_id
user_info['active'] = user_obj.is_active
user_info['email'] = email
user_info['name'] = email2nickname(email)
user_info['contact_email'] = Profile.objects.get_contact_email_by_user(email)
return user_info
class AdminOrgUsers(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
throttle_classes = (UserRateThrottle,)
permission_classes = (IsAdminUser, IsProVersion)
def post(self, request, org_id):
""" Add new user to org.
Permission checking:
1. only admin can perform this action.
"""
# argument check
org_id = int(org_id)
if org_id == 0:
error_msg = 'org_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
org = ccnet_api.get_org_by_id(org_id)
if not org:
error_msg = 'Organization %d not found.' % org_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
email = request.POST.get('email', None)
if not email or not is_valid_email(email):
error_msg = 'email invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
password = request.POST.get('password', None)
if not password:
error_msg = 'password invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
try:
User.objects.get(email=email)
user_exists = True
except User.DoesNotExist:
user_exists = False
if user_exists:
error_msg = 'User %s already exists.' % email
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
# check user number limit by license
if user_number_over_limit():
error_msg = 'The number of users exceeds the limit.'
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
# check user number limit by org member quota
org_members = len(ccnet_api.get_org_emailusers(org.url_prefix, -1, -1))
if ORG_MEMBER_QUOTA_ENABLED:
from seahub_extra.organizations.models import OrgMemberQuota
org_members_quota = OrgMemberQuota.objects.get_quota(org_id)
if org_members_quota is not None and org_members >= org_members_quota:
error_msg = 'Failed. You can only invite %d members.' % org_members_quota
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
# create user
try:
user = User.objects.create_user(email,
password, is_staff=False, is_active=True)
except User.DoesNotExist as e:
logger.error(e)
error_msg = 'Fail to add user %s.' % email
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
# add user to org
# set `is_staff` parameter as `0`
try:
ccnet_api.add_org_user(org_id, email, 0)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
name = request.POST.get('name', None)
if name:
Profile.objects.add_or_update(email, name)
if config.FORCE_PASSWORD_CHANGE:
UserOptions.objects.set_force_passwd_change(email)
user_info = get_org_user_info(org_id, user)
return Response(user_info)
class AdminOrgUser(APIView):
authentication_classes = (TokenAuthentication, SessionAuthentication)
throttle_classes = (UserRateThrottle,)
permission_classes = (IsAdminUser, IsProVersion)
def put(self, request, org_id, email):
""" update active of a org user
Permission checking:
1. only admin can perform this action.
"""
# argument check
org_id = int(org_id)
if org_id == 0:
error_msg = 'org_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
try:
org = ccnet_api.get_org_by_id(org_id)
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
if not org:
error_msg = 'Organization %d not found.' % org_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
error_msg = 'User %s not found.' % email
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
active = request.data.get('active', None)
if not active:
error_msg = 'active invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
if active.lower() not in ('true', 'false'):
error_msg = "active invalid, should be 'true' or 'false'."
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
active_user = True if active.lower() == 'true' else False
try:
if active_user:
user.is_active = True
else:
user.is_active = False
# update user status
result_code = user.save()
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
if result_code == -1:
error_msg = 'Fail to add user %s.' % email
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
# clear web api and repo sync token
# when inactive an user
try:
if not active_user:
clear_token(email)
except Exception as e:
logger.error(e)
user_info = get_org_user_info(org_id, user)
return Response(user_info)
def delete(self, request, org_id, email):
""" Delete an user from org
Permission checking:
1. only admin can perform this action.
"""
# argument check
org_id = int(org_id)
if org_id == 0:
error_msg = 'org_id invalid.'
return api_error(status.HTTP_400_BAD_REQUEST, error_msg)
org = ccnet_api.get_org_by_id(org_id)
if not org:
error_msg = 'Organization %d not found.' % org_id
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
error_msg = 'User %s not found.' % email
return api_error(status.HTTP_404_NOT_FOUND, error_msg)
# permission check
if org.creator == email:
error_msg = 'Failed to delete: %s is an organization creator.' % email
return api_error(status.HTTP_403_FORBIDDEN, error_msg)
try:
ccnet_api.remove_org_user(org_id, email)
user.delete()
except Exception as e:
logger.error(e)
error_msg = 'Internal Server Error'
return api_error(status.HTTP_500_INTERNAL_SERVER_ERROR, error_msg)
return Response({'success': True})

View File

@@ -8,6 +8,7 @@ from rest_framework.permissions import BasePermission
from django.conf import settings
from seaserv import check_permission, is_repo_owner, ccnet_api
from seahub.utils import is_pro_version
SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS']
@@ -65,14 +66,24 @@ class CanInviteGuest(BasePermission):
return settings.ENABLE_GUEST_INVITATION and \
request.user.permissions.can_invite_guest()
class CanGenerateShareLink(BasePermission):
"""Check user has permission to generate share link.
"""
def has_permission(self, request, *args, **kwargs):
return request.user.permissions.can_generate_share_link()
class CanGenerateUploadLink(BasePermission):
"""Check user has permission to generate upload link.
"""
def has_permission(self, request, *args, **kwargs):
return request.user.permissions.can_generate_upload_link()
class IsProVersion(BasePermission):
"""
Check whether Seafile is pro version
"""
def has_permission(self, request, *args, **kwargs):
return is_pro_version()

View File

@@ -16,11 +16,10 @@ from constance import config
from registration import signals
from seahub.auth import login
from seahub.constants import DEFAULT_USER
from seahub.profile.models import Profile, DetailedProfile
from seahub.role_permissions.utils import get_enabled_role_permissions_by_role
from seahub.utils import is_user_password_strong, \
clear_token, get_system_admins
clear_token, get_system_admins, is_pro_version
from seahub.utils.mail import send_html_email_with_dj_template, MAIL_PRIORITY
from seahub.utils.licenseparse import user_number_over_limit
@@ -215,6 +214,9 @@ class User(object):
source = "LDAP"
username = self.username
orgs = []
if is_pro_version():
orgs = ccnet_threaded_rpc.get_orgs_by_user(username)
# remove owned repos

View File

@@ -9,6 +9,9 @@
<li class="tabnav-tab"><a href="{% url 'sys_org_info_library' org.org_id %}">{% trans "Libraries" %}</a></li>
<li class="tabnav-tab"><a href="{% url 'sys_org_info_setting' org.org_id %}">{% trans "Settings" %}</a></li>
</ul>
<div class="fright">
<button id="add-user-btn">{% trans "Add user" %}</button>
</div>
</div>
<table>
@@ -30,11 +33,11 @@
{% else %}
<span class="user-status-cur-value">{% trans "Inactive" %}</span>
{% endif %}
<span title="{% trans "Edit"%}" class="user-status-edit-icon op-icon sf2-icon-edit vh"></span>
<span title="{% trans "Edit" %}" class="user-status-edit-icon op-icon sf2-icon-edit vh"></span>
</div>
<select name="permission" class="user-status-select hide">
<option value="1" {%if user.is_active %}selected="selected"{% endif %}>{% trans "Active" %}</option>
<option value="0" {%if not user.is_active %}selected="selected"{% endif %}>{% trans "Inactive"%}</option>
<option value="true" {% if user.is_active %}selected="selected"{% endif %}>{% trans "Active" %}</option>
<option value="false" {% if not user.is_active %}selected="selected"{% endif %}>{% trans "Inactive" %}</option>
</select>
</td>
<td>
@@ -45,7 +48,7 @@
</td>
<td>
{% if not user.is_self %}
<a href="#" class="remove-user-btn op vh" data-url="{% url 'user_remove' user.email %}" data-target="{{ user.email }}">{% trans "Delete" %}</a>
<a href="#" class="remove-user-btn op vh" data-userid="{{ user.email }}">{% trans "Delete" %}</a>
<a href="#" class="reset-user-btn op vh" data-url="{% url 'user_reset' user.email %}" data-target="{{ user.email }}">{% trans "ResetPwd" %}</a>
{% endif %}
</td>
@@ -57,21 +60,154 @@
<p>{% trans "Activating..., please wait" %}</p>
</div>
<form id="add-user-form" action="" method="post" class="hide">{% csrf_token %}
<h3>{% trans "Add user" %}</h3>
<label for="id_email">{% trans "Email" %}</label><br />
<input type="text" name="email" id="id_email" class="input" /><br />
<label for="id_name">{% trans "Name(optional)" %}</label><br />
<input type="text" name="name" id="id_name" class="input" /><br />
<label for="id_password1">{% trans "Password" %}</label>
<div class="passwd-wrapper">
<input type="password" name="password1" id="id_password1" class="passwd input" />
<span title="{% trans "Show" %}" class="icon-eye show-or-hide-password cspt"></span>
<span title="{% trans "Generate a random password" %}" class="icon-magic generate-random-password cspt"></span>
</div>
<label for="id_password2">{% trans "Confirm Password" %}</label><br />
<input type="password" name="password2" id="id_password2" class="input" /><br />
<p class="error hide"></p>
<button type="submit" class="submit">{% trans "Submit" %}</button>
</form>
{% endblock %}
{% block extra_script %}
<script type="text/javascript">
{% include 'sysadmin/sys_org_set_quota_js.html' %}
addConfirmTo($('.remove-user-btn'), {
'title':"{% trans "Delete User" %}",
'con':"{% trans "Are you sure you want to delete %s ?" %}",
'post': true // post request
$('#add-user-btn').click(function() {
$('#add-user-form').modal();
$('#simplemodal-container').css({'width':'auto', 'height':'auto'});
});
$('#add-user-form .show-or-hide-password').click(function() {
var icon = $(this),
passwd_input = $('input[name=password1], input[name=password2]', $('#add-user-form'));
icon.toggleClass('icon-eye icon-eye-slash');
if (icon.hasClass('icon-eye')) {
icon.attr('title', "{% trans "Show" %}");
passwd_input.prop('type', 'password');
} else {
icon.attr('title', "{% trans "Hide" %}");
passwd_input.prop('type', 'text');
}
});
$('#add-user-form .generate-random-password').click(function() {
var form = $('#add-user-form');
var random_password = '';
var possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz0123456789';
for (var i = 0; i < 8; i++) {
random_password += possible.charAt(Math.floor(Math.random() * possible.length));
}
$('input[name=password1], input[name=password2]', form).prop('type', 'text').val(random_password);
$('.show-or-hide-password', form)
.attr('title', "{% trans "Hide" %}")
.removeClass('icon-eye').addClass('icon-eye-slash');
});
$('#add-user-form').submit(function() {
var form = $(this),
form_id = $(this).attr('id'),
org_id = {{ org.org_id }},
email = $.trim(form.children('[name="email"]').val()),
name = $.trim($('[name="name"]', form).val()),
pwd1 = $.trim(form.find('[name="password1"]').val()),
pwd2 = $.trim(form.children('[name="password2"]').val());
if (!email) {
apply_form_error(form_id, "{% trans "Email cannot be blank" %}");
return false;
}
if (!pwd1) {
apply_form_error(form_id, "{% trans "Password cannot be blank" %}");
return false;
}
if (!pwd2) {
apply_form_error(form_id, "{% trans "Please enter the password again" %}");
return false;
}
if (pwd1 != pwd2) {
apply_form_error(form_id, "{% trans "Passwords do not match" %}");
return false;
}
var submit_btn = $(this).find('[type="submit"]');
var url = "{{ SITE_ROOT }}api/v2.1/admin/organizations/" + org_id + "/users/";
disable(submit_btn);
$.ajax({
url: url,
type: 'POST',
datatype: 'json',
cache: false,
beforeSend: prepareCSRFToken,
data: {
'email': email,
'name': name,
'password': pwd1
},
success: function(data) {
location.reload(true);
},
error: function(jqXHR, textStatus, errorThrown) {
if (jqXHR.responseText) {
apply_form_error(form_id, $.parseJSON(jqXHR.responseText).error_msg);
} else {
apply_form_error(form_id, "{% trans "Failed. Please check the network." %}");
}
enable(submit_btn);
}
});
return false;
});
$('.remove-user-btn').click(function() {
var _this = $(this),
uid = _this.attr('data-userid'),
org_id = {{ org.org_id }},
url = "{{ SITE_ROOT }}api/v2.1/admin/organizations/" + org_id + "/users/" + uid + "/",
popupTitle = "{% trans "Delete User" %}",
popupContent = "{% trans "Are you sure you want to delete %s ?" %}".replace('%s', '<span class="op-target ellipsis ellipsis-op-target" title="' + HTMLescape(uid) + '">' + HTMLescape(uid) + '</span>');
var yesCallback = function() {
$.ajax({
url: url,
type: 'DELETE',
dataType: 'json',
cache: false,
beforeSend: prepareCSRFToken,
success: function() {
_this.closest('tr').remove();
feedback("{% trans "Successfully deleted 1 item." %}", 'success');
},
error: function(xhr, textStatus, errorThrown) {
ajaxErrorHandler(xhr, textStatus, errorThrown);
},
complete: function() {
$.modal.close();
}
});
};
showConfirm(popupTitle, popupContent, yesCallback);
return false;
});
addConfirmTo($('.reset-user-btn'), {
'title':"{% trans "Password Reset" %}",
'con':"{% trans "Are you sure you want to reset the password of %s ?" %}",
'post': true // post request
});
$('tr:gt(0)').hover(
function() {
$(this).find('.user-status-edit-icon').removeClass('vh');
@@ -80,49 +216,49 @@ $('tr:gt(0)').hover(
$(this).find('.user-status-edit-icon').addClass('vh');
}
);
$('.user-status-edit-icon').click(function() {
$(this).parent().addClass('hide');
$(this).parent().next().removeClass('hide'); // show 'select'
});
$('.user-status-select').change(function() {
var select = $(this),
select_val = select.val(),
uid = select.parents('tr').attr('data-userid'),
url = "{{ SITE_ROOT }}useradmin/toggle_status/" + uid + "/?s=" + select_val;
$select_prev = $(this).prev('.user-status'), // .user-status, .user-role
org_id = {{ org.org_id }},
url = "{{ SITE_ROOT }}api/v2.1/admin/organizations/" + org_id + "/users/" + uid + "/";
$.ajax({
url: url,
type: 'GET',
dataType: 'json',
cache: false,
beforeSend: function() {
if (select_val == 1) {
if (select_val == 'true') {
// show activating popup
$('#activate-msg').modal();
$('#simplemodal-container').css({'height':'auto'});
}
},
$.ajax({
url: url,
type: 'PUT',
data: {'active': select_val},
dataType: 'json',
cache: false,
beforeSend: prepareCSRFToken,
success: function(data) {
if (data['email_sent']) {
feedback("{% trans "Edit succeeded, an email has been sent." %}", 'success');
} else if (data['email_sent'] === false) {
feedback("{% trans "Edit succeeded, but failed to send email, please check your email configuration." %}", 'success');
} else {
feedback("{% trans "Edit succeeded" %}", 'success');
}
select.prev().children('span').html(select.children('option[value="' +select.val() + '"]').text());
$('.user-status-cur-value', $select_prev).html(select.children('option[value="' +select.val() + '"]').text());
select.addClass('hide');
select.prev().removeClass('hide');
$select_prev.removeClass('hide');
$.modal.close();
},
error: function() {
feedback("{% trans "Edit failed." %}", 'error');
feedback("{% trans 'Edit failed.' %}", 'error');
select.addClass('hide');
select.prev().removeClass('hide');
$.modal.close();
}
});
});
$(document).click(function(e) {
var target = e.target || event.srcElement;
// target can't be edit-icon
@@ -131,5 +267,6 @@ $(document).click(function(e) {
$('.user-status-select').addClass('hide');
}
});
</script>
{% endblock %}

View File

@@ -51,6 +51,7 @@ from seahub.api2.endpoints.admin.groups import AdminGroups, AdminGroup
from seahub.api2.endpoints.admin.group_libraries import AdminGroupLibraries, AdminGroupLibrary
from seahub.api2.endpoints.admin.group_members import AdminGroupMembers, AdminGroupMember
from seahub.api2.endpoints.admin.shares import AdminShares
from seahub.api2.endpoints.admin.org_users import AdminOrgUsers, AdminOrgUser
# Uncomment the next two lines to enable the admin:
#from django.contrib import admin
@@ -226,6 +227,9 @@ urlpatterns = patterns(
url(r'^api/v2.1/admin/trash-libraries/(?P<repo_id>[-0-9a-f]{36})/$', AdminTrashLibrary.as_view(), name='api-v2.1-admin-trash-library'),
url(r'^api/v2.1/admin/shares/$', AdminShares.as_view(), name='api-v2.1-admin-shares'),
url(r'^api/v2.1/admin/organizations/(?P<org_id>\d+)/users/$', AdminOrgUsers.as_view(), name='api-v2.1-admin-org-users'),
url(r'^api/v2.1/admin/organizations/(?P<org_id>\d+)/users/(?P<email>[^/]+)/$', AdminOrgUser.as_view(), name='api-v2.1-admin-org-user'),
(r'^avatar/', include('seahub.avatar.urls')),
(r'^notification/', include('seahub.notifications.urls')),
(r'^contacts/', include('seahub.contacts.urls')),

View File

@@ -48,6 +48,7 @@ def parse_license():
return ret
def user_number_over_limit(new_users = 0):
logger = logging.getLogger(__name__)
if is_pro_version():
try:
# get license user limit
@@ -60,11 +61,16 @@ def user_number_over_limit(new_users = 0):
active_users = active_db_users + active_ldap_users if \
active_ldap_users > 0 else active_db_users
return active_users + new_users >= max_users
if new_users < 0:
logger.debug('`new_users` must be greater or equal to 0.')
return False
elif new_users == 0:
return active_users >= max_users
else:
return active_users + new_users > max_users
except Exception as e:
logger = logging.getLogger(__name__)
logger.error(e)
return False
else:
return False

View File

@@ -0,0 +1,248 @@
import json
from mock import patch
from seaserv import ccnet_api
from django.core.urlresolvers import reverse
from seahub.test_utils import BaseTestCase
from tests.common.utils import randstring
from seaserv import seafserv_threaded_rpc
try:
from seahub.settings import LOCAL_PRO_DEV_ENV
except ImportError:
LOCAL_PRO_DEV_ENV = False
def remove_org(org_id):
org_id = int(org_id)
org = ccnet_api.get_org_by_id(org_id)
if org:
users =ccnet_api.get_org_emailusers(org.url_prefix, -1, -1)
for u in users:
ccnet_api.remove_org_user(org_id, u.email)
groups = ccnet_api.get_org_groups(org.org_id, -1, -1)
for g in groups:
ccnet_api.remove_org_group(org_id, g.gid)
# remove org repos
seafserv_threaded_rpc.remove_org_repo_by_org_id(org_id)
# remove org
ccnet_api.remove_org(org_id)
class OrgUsersTest(BaseTestCase):
def setUp(self):
self.user_name = self.user.username
self.admin_name = self.admin.username
if LOCAL_PRO_DEV_ENV:
self.org_name = randstring(6)
self.org_url_prefix = randstring(6)
tmp_user = self.create_user(email='%s@%s.com' % (randstring(6), randstring(6)))
self.org_creator = tmp_user.username
self.org_id = ccnet_api.create_org(self.org_name,
self.org_url_prefix, self.org_creator)
self.org_users_url = reverse('api-v2.1-admin-org-users',
args=[self.org_id])
def tearDown(self):
self.remove_group()
self.remove_repo()
if LOCAL_PRO_DEV_ENV:
remove_org(self.org_id)
self.remove_user(self.org_creator)
def test_can_create(self):
if not LOCAL_PRO_DEV_ENV:
return
self.login_as(self.admin)
email = '%s@%s.com' % (randstring(6), randstring(6))
data = {'email': email, 'password': randstring(6)}
resp = self.client.post(self.org_users_url, data)
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['email'] == email
def test_can_not_create_if_not_admin(self):
if not LOCAL_PRO_DEV_ENV:
return
self.login_as(self.user)
email = '%s@%s.com' % (randstring(6), randstring(6))
data = {'email': email, 'password': randstring(6)}
resp = self.client.post(self.org_users_url, data)
self.assertEqual(403, resp.status_code)
def test_create_with_invalid_org_id(self):
if not LOCAL_PRO_DEV_ENV:
return
self.login_as(self.admin)
invalid_org_users_url = reverse('api-v2.1-admin-org-users', args=[0])
email = '%s@%s.com' % (randstring(6), randstring(6))
data = {'email': email, 'password': randstring(6)}
resp = self.client.post(invalid_org_users_url, data)
self.assertEqual(400, resp.status_code)
def test_create_with_existed_user(self):
if not LOCAL_PRO_DEV_ENV:
return
self.login_as(self.admin)
data = {'email': self.admin_name, 'password': randstring(6)}
resp = self.client.post(self.org_users_url, data)
self.assertEqual(400, resp.status_code)
@patch('seahub.api2.endpoints.admin.org_users.user_number_over_limit')
def test_create_with_user_number_over_limit(self, mock_user_number_over_limit):
if not LOCAL_PRO_DEV_ENV:
return
mock_user_number_over_limit.return_value = True
self.login_as(self.admin)
email = '%s@%s.com' % (randstring(6), randstring(6))
data = {'email': email, 'password': randstring(6)}
resp = self.client.post(self.org_users_url, data)
self.assertEqual(403, resp.status_code)
class OrgUserTest(BaseTestCase):
def setUp(self):
self.user_name = self.user.username
self.admin_name = self.admin.username
if LOCAL_PRO_DEV_ENV:
self.org_name = randstring(6)
self.org_url_prefix = randstring(6)
tmp_user = self.create_user(email='%s@%s.com' % (randstring(6), randstring(6)))
self.org_creator = tmp_user.username
self.org_id = ccnet_api.create_org(self.org_name,
self.org_url_prefix, self.org_creator)
self.org_users_url = reverse('api-v2.1-admin-org-users',
args=[self.org_id])
def tearDown(self):
self.remove_group()
self.remove_repo()
if LOCAL_PRO_DEV_ENV:
remove_org(self.org_id)
self.remove_user(self.org_creator)
def test_can_delete(self):
if not LOCAL_PRO_DEV_ENV:
return
email = '%s@%s.com' % (randstring(6), randstring(6))
self.create_user(email=email)
ccnet_api.add_org_user(self.org_id, email, 0)
assert ccnet_api.org_user_exists(self.org_id, email) == 1
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, email])
resp = self.client.delete(url)
self.assertEqual(200, resp.status_code)
assert ccnet_api.org_user_exists(self.org_id, email) == 0
def test_can_not_delete_if_not_admin(self):
if not LOCAL_PRO_DEV_ENV:
return
email = '%s@%s.com' % (randstring(6), randstring(6))
self.create_user(email=email)
ccnet_api.add_org_user(self.org_id, email, 0)
assert ccnet_api.org_user_exists(self.org_id, email) == 1
self.login_as(self.user)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, email])
resp = self.client.delete(url)
self.assertEqual(403, resp.status_code)
def test_delete_org_creator(self):
if not LOCAL_PRO_DEV_ENV:
return
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id,
self.org_creator])
resp = self.client.delete(url)
self.assertEqual(403, resp.status_code)
def test_delete_invalid_user(self):
if not LOCAL_PRO_DEV_ENV:
return
not_existed_user = '%s@%s.com' % (randstring(6), randstring(6))
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id,
not_existed_user])
resp = self.client.delete(url)
self.assertEqual(404, resp.status_code)
def test_can_update(self):
if not LOCAL_PRO_DEV_ENV:
return
email = '%s@%s.com' % (randstring(6), randstring(6))
tmp_user = self.create_user(email=email)
ccnet_api.add_org_user(self.org_id, email, 0)
assert ccnet_api.org_user_exists(self.org_id, email) == 1
assert tmp_user.is_active
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, email])
status = 'false'
data = 'active=%s' % status
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
json_resp = json.loads(resp.content)
self.assertEqual(200, resp.status_code)
assert json_resp['active'] is False
def test_update_with_invalid_args(self):
if not LOCAL_PRO_DEV_ENV:
return
email = '%s@%s.com' % (randstring(6), randstring(6))
tmp_user = self.create_user(email=email)
ccnet_api.add_org_user(self.org_id, email, 0)
assert ccnet_api.org_user_exists(self.org_id, email) == 1
assert tmp_user.is_active
self.login_as(self.admin)
url = reverse('api-v2.1-admin-org-user', args=[self.org_id, email])
status = 'fals'
data = 'active=%s' % status
resp = self.client.put(url, data, 'application/x-www-form-urlencoded')
self.assertEqual(400, resp.status_code)