1
0
mirror of https://github.com/haiwen/seahub.git synced 2025-09-13 05:39:59 +00:00

[api2] Improve account api

This commit is contained in:
zhengxie
2015-04-14 16:01:32 +08:00
parent 629687c865
commit cd8be43149
4 changed files with 192 additions and 38 deletions

View File

@@ -551,3 +551,13 @@ def get_token_v2(request, username, platform, device_id, device_name,
return TokenV2.objects.get_or_create_token( return TokenV2.objects.get_or_create_token(
username, platform, device_id, device_name, username, platform, device_id, device_name,
client_version, platform_version, get_client_ip(request)) client_version, platform_version, get_client_ip(request))
def to_python_boolean(string):
"""Convert a string to boolean.
"""
string = string.lower()
if string in ('t', 'true', '1'):
return True
if string in ('f', 'false', '0'):
return False
raise ValueError("Invalid boolean value: '%s'" % string)

View File

@@ -6,6 +6,7 @@ import json
import datetime import datetime
import urllib2 import urllib2
import re import re
from dateutil.relativedelta import relativedelta
from urllib2 import unquote, quote from urllib2 import unquote, quote
from PIL import Image from PIL import Image
from StringIO import StringIO from StringIO import StringIO
@@ -26,6 +27,7 @@ from django.http import HttpResponse, Http404
from django.template import RequestContext from django.template import RequestContext
from django.template.loader import render_to_string from django.template.loader import render_to_string
from django.shortcuts import render_to_response from django.shortcuts import render_to_response
from django.utils import timezone
from .throttling import ScopedRateThrottle from .throttling import ScopedRateThrottle
from .authentication import TokenAuthentication from .authentication import TokenAuthentication
@@ -35,7 +37,7 @@ from .utils import is_repo_writable, is_repo_accessible, calculate_repo_info, \
get_groups, get_group_and_contacts, prepare_events, \ get_groups, get_group_and_contacts, prepare_events, \
get_person_msgs, api_group_check, get_email, get_timestamp, \ get_person_msgs, api_group_check, get_email, get_timestamp, \
get_group_message_json, get_group_msgs, get_group_msgs_json, get_diff_details, \ get_group_message_json, get_group_msgs, get_group_msgs_json, get_diff_details, \
json_response json_response, to_python_boolean
from seahub.avatar.templatetags.avatar_tags import api_avatar_url from seahub.avatar.templatetags.avatar_tags import api_avatar_url
from seahub.avatar.templatetags.group_avatar_tags import api_grp_avatar_url from seahub.avatar.templatetags.group_avatar_tags import api_grp_avatar_url
from seahub.base.accounts import User from seahub.base.accounts import User
@@ -232,55 +234,126 @@ class Account(APIView):
return Response(info) return Response(info)
def put(self, request, email, format=None): def _update_account_profile(self, request, email):
if not is_valid_username(email): name = request.DATA.get("name", None)
return api_error(status.HTTP_404_NOT_FOUND, 'User not found.') note = request.DATA.get("note", None)
# create or update account if name is None and note is None:
return
profile = Profile.objects.get_profile_by_user(email)
if profile is None:
profile = Profile(user=email)
if name is not None:
# if '/' in name:
# return api_error(status.HTTP_400_BAD_REQUEST, "Nickname should not include '/'")
profile.nickname = name
if note is not None:
profile.intro = note
profile.save()
def _update_account_quota(self, request, email):
storage = request.DATA.get("storage", None)
sharing = request.DATA.get("sharing", None)
if storage is None and sharing is None:
return
if storage is not None:
seafile_api.set_user_quota(email, int(storage))
if sharing is not None:
seafile_api.set_user_share_quota(email, int(sharing))
def _create_account(self, request, email):
copy = request.DATA.copy() copy = request.DATA.copy()
copy.update({'email': email}) copy['email'] = email
serializer = AccountSerializer(data=copy) serializer = AccountSerializer(data=copy)
if serializer.is_valid(): if serializer.is_valid():
try:
User.objects.get(email=serializer.object['email'])
update = True
except User.DoesNotExist:
update = False
user = User.objects.create_user(serializer.object['email'], user = User.objects.create_user(serializer.object['email'],
serializer.object['password'], serializer.object['password'],
serializer.object['is_staff'], serializer.object['is_staff'],
serializer.object['is_active']) serializer.object['is_active'])
name = request.DATA.get("name", None) self._update_account_profile(request, user.username)
note = request.DATA.get("note", None)
if name or note:
try:
profile = Profile.objects.get(user=user.username)
except Profile.DoesNotExist:
profile = Profile()
profile.user = user.username resp = Response('success', status=status.HTTP_201_CREATED)
resp['Location'] = reverse('api2-account', args=[email])
if name:
if '/' in name:
return api_error(status.HTTP_400_BAD_REQUEST, "Nickname should not include '/'")
else:
profile.nickname = name
if note:
profile.intro = note
profile.save()
if update:
resp = Response('success')
else:
resp = Response('success', status=status.HTTP_201_CREATED)
resp['Location'] = reverse('api2-account', args=[email])
return resp return resp
else: else:
return api_error(status.HTTP_400_BAD_REQUEST, serializer.errors) return api_error(status.HTTP_400_BAD_REQUEST, serializer.errors)
def _update_account(self, request, user):
password = request.DATA.get("password", None)
is_staff = request.DATA.get("is_staff", None)
if is_staff is not None:
try:
is_staff = to_python_boolean(is_staff)
except ValueError:
return api_error(status.HTTP_400_BAD_REQUEST,
'%s is not a valid value' % is_staff)
is_active = request.DATA.get("is_active", None)
if is_active is not None:
try:
is_active = to_python_boolean(is_active)
except ValueError:
return api_error(status.HTTP_400_BAD_REQUEST,
'%s is not a valid value' % is_active)
if password is not None:
user.set_password(password)
if is_staff is not None:
user.is_staff = is_staff
if is_active is not None:
user.is_active = is_active
user.save()
self._update_account_profile(request, user.username)
try:
self._update_account_quota(request, user.username)
except SearpcError as e:
logger.error(e)
return api_error(HTTP_520_OPERATION_FAILED, 'Failed to set account quota')
is_trial = request.DATA.get("is_trial", None)
if is_trial is not None:
try:
from seahub_extra.trialaccount.models import TrialAccount
except ImportError:
pass
else:
try:
is_trial = to_python_boolean(is_trial)
except ValueError:
return api_error(status.HTTP_400_BAD_REQUEST,
'%s is not a valid value' % is_trial)
if is_trial is True:
expire_date = timezone.now() + relativedelta(days=7)
TrialAccount.object.create_or_update(user.username,
expire_date)
else:
TrialAccount.objects.filter(user_or_org=user.username).delete()
return Response('success')
def put(self, request, email, format=None):
if not is_valid_username(email):
return api_error(status.HTTP_404_NOT_FOUND, 'User not found.')
try:
user = User.objects.get(email=email)
return self._update_account(request, user)
except User.DoesNotExist:
return self._create_account(request, email)
def delete(self, request, email, format=None): def delete(self, request, email, format=None):
if not is_valid_username(email): if not is_valid_username(email):
return api_error(status.HTTP_404_NOT_FOUND, 'User not found.') return api_error(status.HTTP_404_NOT_FOUND, 'User not found.')

View File

@@ -139,8 +139,8 @@ class ApiTestBase(unittest.TestCase):
""" """
Context manager to create a tmp user, and automatically delete it after use Context manager to create a tmp user, and automatically delete it after use
with self.tmp_repo() as repo: with self.get_tmp_user() as user:
self.get(repo.file_url + '?p=/') ...
""" """
user = self.create_user() user = self.create_user()
try: try:

View File

@@ -41,6 +41,77 @@ class AccountsApiTest(ApiTestBase):
# check the user is really deleted # check the user is really deleted
self.admin_get(test_account_url, expected=404) self.admin_get(test_account_url, expected=404)
def test_update_account_passwd(self):
with self.get_tmp_user() as user:
data = {'password': 'new_password'}
self.admin_put(user.user_url, data=data, expected=200)
def test_set_account_to_staff(self):
with self.get_tmp_user() as user:
self.assertEqual(self.admin_get(user.user_url).json()['is_staff'],
False)
data = {'is_staff': 'true'}
self.admin_put(user.user_url, data=data, expected=200)
self.assertEqual(self.admin_get(user.user_url).json()['is_staff'],
True)
def test_set_account_inactive(self):
with self.get_tmp_user() as user:
self.assertEqual(self.admin_get(user.user_url).json()['is_active'],
True)
data = {'is_active': 'false'}
self.admin_put(user.user_url, data=data, expected=200)
self.assertEqual(self.admin_get(user.user_url).json()['is_active'],
False)
def test_set_account_inactive_with_wrong_arg(self):
with self.get_tmp_user() as user:
self.assertEqual(self.admin_get(user.user_url).json()['is_active'],
True)
data = {'is_active': 'fals'}
self.admin_put(user.user_url, data=data, expected=400)
def test_set_account_inactive_with_empty_arg(self):
with self.get_tmp_user() as user:
self.assertEqual(self.admin_get(user.user_url).json()['is_active'],
True)
data = {'is_active': ''}
self.admin_put(user.user_url, data=data, expected=400)
def test_update_account_nickname(self):
with self.get_tmp_user() as user:
data = {'name': 'new nick name'}
self.admin_put(user.user_url, data=data, expected=200)
# def test_update_account_nickname_with_slash(self):
# with self.get_tmp_user() as user:
# data = {'name': 'new /nick name'}
# self.admin_put(user.user_url, data=data, expected=400)
def test_update_account_intro(self):
with self.get_tmp_user() as user:
data = {'note': 'hello, my name is foo'}
self.admin_put(user.user_url, data=data, expected=200)
def test_update_account_storage_quota(self):
with self.get_tmp_user() as user:
data = {'storage': 1024} # 1KB
self.admin_put(user.user_url, data=data, expected=200)
self.assertEqual(self.admin_get(user.user_url).json()['total'],
1024)
# def test_update_account_sharing_quota(self):
# with self.get_tmp_user() as user:
# data = {'sharing': 1024} # 1KB
# self.admin_put(user.user_url, data=data, expected=200)
# self.assertEqual(self.admin_get(user.user_url).json()['sharing'],
# 1024)
def test_unset_trial_account(self):
with self.get_tmp_user() as user:
data = {'is_trial': 'false'}
self.admin_put(user.user_url, data=data, expected=200)
def test_auth_ping(self): def test_auth_ping(self):
res = self.get(AUTH_PING_URL) res = self.get(AUTH_PING_URL)
self.assertRegexpMatches(res.text, u'"pong"') self.assertRegexpMatches(res.text, u'"pong"')