mirror of
https://github.com/haiwen/seahub.git
synced 2025-09-18 08:16:07 +00:00
[auth & api] Refactor login
This commit is contained in:
@@ -65,20 +65,23 @@ class AuthTokenSerializer(serializers.Serializer):
|
||||
else:
|
||||
raise serializers.ValidationError('invalid params')
|
||||
|
||||
# convert login id or contact email to username if any
|
||||
username = Profile.objects.convert_login_str_to_username(login_id)
|
||||
|
||||
p_id = ccnet_api.get_primary_id(username)
|
||||
if p_id is not None:
|
||||
username = p_id
|
||||
|
||||
if username and password:
|
||||
user = authenticate(username=username, password=password)
|
||||
if login_id and password:
|
||||
user = authenticate(username=login_id, password=password)
|
||||
if user:
|
||||
if not user.is_active:
|
||||
raise serializers.ValidationError('User account is disabled.')
|
||||
else:
|
||||
raise serializers.ValidationError('Unable to login with provided credentials.')
|
||||
"""try login id/contact email/primary id"""
|
||||
# convert login id or contact email to username if any
|
||||
username = Profile.objects.convert_login_str_to_username(login_id)
|
||||
# convert username to primary id if any
|
||||
p_id = ccnet_api.get_primary_id(username)
|
||||
if p_id is not None:
|
||||
username = p_id
|
||||
|
||||
user = authenticate(username=username, password=password)
|
||||
if user is None:
|
||||
raise serializers.ValidationError('Unable to login with provided credentials.')
|
||||
else:
|
||||
raise serializers.ValidationError('Must include "username" and "password"')
|
||||
|
||||
@@ -98,10 +101,10 @@ class AuthTokenSerializer(serializers.Serializer):
|
||||
else:
|
||||
logger.info('%s: unrecognized device' % login_id)
|
||||
|
||||
token = get_token_v2(self.context['request'], username, platform,
|
||||
token = get_token_v2(self.context['request'], user.username, platform,
|
||||
device_id, device_name, client_version, platform_version)
|
||||
else:
|
||||
token = get_token_v1(username)
|
||||
token = get_token_v1(user.username)
|
||||
|
||||
return token.key
|
||||
|
||||
|
@@ -47,19 +47,25 @@ class AuthenticationForm(forms.Form):
|
||||
return self.cleaned_data['login'].strip()
|
||||
|
||||
def clean(self):
|
||||
login = self.cleaned_data.get('login')
|
||||
username = self.cleaned_data.get('login')
|
||||
password = self.cleaned_data.get('password')
|
||||
|
||||
# convert login id or contact email to username if any
|
||||
username = Profile.objects.convert_login_str_to_username(login)
|
||||
|
||||
username = self.get_primary_id_by_username(username)
|
||||
if username and password:
|
||||
self.user_cache = authenticate(username=username,
|
||||
password=password)
|
||||
if self.user_cache is None:
|
||||
raise forms.ValidationError(_("Please enter a correct email/username and password. Note that both fields are case-sensitive."))
|
||||
elif not self.user_cache.is_active:
|
||||
"""then try login id/contact email/primary id"""
|
||||
# convert login id or contact email to username if any
|
||||
username = Profile.objects.convert_login_str_to_username(username)
|
||||
# convert username to primary id if any
|
||||
username = self.get_primary_id_by_username(username)
|
||||
|
||||
self.user_cache = authenticate(username=username, password=password)
|
||||
if self.user_cache is None:
|
||||
raise forms.ValidationError(_("Please enter a correct email/username and password. Note that both fields are case-sensitive."))
|
||||
|
||||
# user found for login string but inactive
|
||||
if not self.user_cache.is_active:
|
||||
if settings.ACTIVATE_AFTER_FIRST_LOGIN and \
|
||||
not UserOptions.objects.is_user_logged_in(username):
|
||||
"""Activate user on first login."""
|
||||
|
@@ -1,10 +1,26 @@
|
||||
from mock import patch
|
||||
from seaserv import ccnet_api
|
||||
|
||||
from seahub.test_utils import BaseTestCase
|
||||
from seahub.api2.serializers import AuthTokenSerializer
|
||||
from seahub.profile.models import Profile
|
||||
|
||||
class AuthTokenSerializerTest(BaseTestCase):
|
||||
def setUp(self):
|
||||
self.inactive_user = self.create_user('inactive@test.com', is_active=False)
|
||||
Profile.objects.add_or_update(self.user.username,
|
||||
login_id='user_login_id',
|
||||
contact_email='contact@test.com')
|
||||
|
||||
ccnet_api.set_reference_id(self.user.username, 'another_email@test.com')
|
||||
|
||||
def assertSuccess(self, s):
|
||||
assert s.is_valid() is True
|
||||
assert s.errors == {}
|
||||
|
||||
def assertFailed(self, s):
|
||||
assert s.is_valid() is False
|
||||
assert 'Unable to login with provided credentials.' in s.errors['non_field_errors']
|
||||
|
||||
def test_validate(self):
|
||||
d = {
|
||||
@@ -47,3 +63,77 @@ class AuthTokenSerializerTest(BaseTestCase):
|
||||
'device_name': 'test',
|
||||
}, context={'request': self.fake_request})
|
||||
assert s.is_valid() is True
|
||||
|
||||
def test_invalid_user(self):
|
||||
d = {
|
||||
'username': 'test_does_not_exist',
|
||||
'password': '123',
|
||||
}
|
||||
|
||||
s = AuthTokenSerializer(data=d, context={'request': self.fake_request})
|
||||
self.assertFailed(s)
|
||||
|
||||
def test_inactive_user(self):
|
||||
d = {
|
||||
'username': self.inactive_user.username,
|
||||
'password': 'secret',
|
||||
}
|
||||
|
||||
s = AuthTokenSerializer(data=d, context={'request': self.fake_request})
|
||||
assert s.is_valid() is False
|
||||
assert 'User account is disabled.' in s.errors['non_field_errors']
|
||||
|
||||
def test_inactive_user_incorrect_password(self):
|
||||
"""An invalid login doesn't leak the inactive status of a user."""
|
||||
d = {
|
||||
'username': self.inactive_user.username,
|
||||
'password': 'incorrect'
|
||||
}
|
||||
|
||||
s = AuthTokenSerializer(data=d, context={'request': self.fake_request})
|
||||
self.assertFailed(s)
|
||||
|
||||
def test_login_failed(self):
|
||||
d = {
|
||||
'username': self.user.username,
|
||||
'password': 'incorrect',
|
||||
}
|
||||
|
||||
s = AuthTokenSerializer(data=d, context={'request': self.fake_request})
|
||||
self.assertFailed(s)
|
||||
|
||||
def test_login_success(self):
|
||||
d = {
|
||||
'username': self.user.username,
|
||||
'password': self.user_password,
|
||||
}
|
||||
|
||||
s = AuthTokenSerializer(data=d, context={'request': self.fake_request})
|
||||
self.assertSuccess(s)
|
||||
|
||||
def test_login_id(self):
|
||||
d = {
|
||||
'username': 'user_login_id',
|
||||
'password': self.user_password,
|
||||
}
|
||||
|
||||
s = AuthTokenSerializer(data=d, context={'request': self.fake_request})
|
||||
self.assertSuccess(s)
|
||||
|
||||
def test_contact_email(self):
|
||||
d = {
|
||||
'username': 'contact@test.com',
|
||||
'password': self.user_password,
|
||||
}
|
||||
|
||||
s = AuthTokenSerializer(data=d, context={'request': self.fake_request})
|
||||
self.assertSuccess(s)
|
||||
|
||||
def test_primary_id(self):
|
||||
d = {
|
||||
'username': 'another_email@test.com',
|
||||
'password': self.user_password,
|
||||
}
|
||||
|
||||
s = AuthTokenSerializer(data=d, context={'request': self.fake_request})
|
||||
self.assertSuccess(s)
|
||||
|
0
tests/seahub/auth/forms/__init__.py
Normal file
0
tests/seahub/auth/forms/__init__.py
Normal file
99
tests/seahub/auth/forms/test_authentication.py
Normal file
99
tests/seahub/auth/forms/test_authentication.py
Normal file
@@ -0,0 +1,99 @@
|
||||
from seaserv import ccnet_api
|
||||
|
||||
from seahub.test_utils import BaseTestCase
|
||||
from seahub.auth.forms import AuthenticationForm
|
||||
from seahub.profile.models import Profile
|
||||
|
||||
|
||||
class AuthenticationFormTest(BaseTestCase):
|
||||
def setUp(self):
|
||||
self.inactive_user = self.create_user('inactive@test.com', is_active=False)
|
||||
Profile.objects.add_or_update(self.user.username,
|
||||
login_id='user_login_id',
|
||||
contact_email='contact@test.com')
|
||||
|
||||
ccnet_api.set_reference_id(self.user.username, 'another_email@test.com')
|
||||
|
||||
def tearDown(self):
|
||||
self.remove_user(self.inactive_user.username)
|
||||
|
||||
def assertSuccess(self, f):
|
||||
assert f.is_valid() is True
|
||||
assert f.non_field_errors() == []
|
||||
|
||||
def assertFailed(self, f):
|
||||
assert f.is_valid() is False
|
||||
assert 'Please enter a correct email/username and password. Note that both fields are case-sensitive.' in f.non_field_errors()
|
||||
|
||||
def test_invalid_user(self):
|
||||
data = {
|
||||
'login': 'test_does_not_exist',
|
||||
'password': '123'
|
||||
}
|
||||
form = AuthenticationForm(None, data)
|
||||
self.assertFailed(form)
|
||||
|
||||
def test_inactive_user(self):
|
||||
data = {
|
||||
'login': self.inactive_user.username,
|
||||
'password': 'secret'
|
||||
}
|
||||
|
||||
form = AuthenticationForm(None, data)
|
||||
assert form.is_valid() is False
|
||||
assert 'This account is inactive.' in form.non_field_errors()
|
||||
|
||||
def test_inactive_user_incorrect_password(self):
|
||||
"""An invalid login doesn't leak the inactive status of a user."""
|
||||
data = {
|
||||
'login': self.inactive_user.username,
|
||||
'password': 'incorrect'
|
||||
}
|
||||
|
||||
form = AuthenticationForm(None, data)
|
||||
self.assertFailed(form)
|
||||
|
||||
def test_login_success(self):
|
||||
data = {
|
||||
'login': self.user.username,
|
||||
'password': self.user_password,
|
||||
}
|
||||
|
||||
form = AuthenticationForm(None, data)
|
||||
self.assertSuccess(form)
|
||||
|
||||
def test_login_failed(self):
|
||||
data = {
|
||||
'login': self.user.username,
|
||||
'password': 'incorrect',
|
||||
}
|
||||
|
||||
form = AuthenticationForm(None, data)
|
||||
self.assertFailed(form)
|
||||
|
||||
def test_login_id(self):
|
||||
data = {
|
||||
'login': 'user_login_id',
|
||||
'password': self.user_password,
|
||||
}
|
||||
|
||||
form = AuthenticationForm(None, data)
|
||||
self.assertSuccess(form)
|
||||
|
||||
def test_contact_email(self):
|
||||
data = {
|
||||
'login': 'contact@test.com',
|
||||
'password': self.user_password,
|
||||
}
|
||||
|
||||
form = AuthenticationForm(None, data)
|
||||
self.assertSuccess(form)
|
||||
|
||||
def test_primary_id(self):
|
||||
data = {
|
||||
'login': 'another_email@test.com',
|
||||
'password': self.user_password,
|
||||
}
|
||||
|
||||
form = AuthenticationForm(None, data)
|
||||
self.assertSuccess(form)
|
Reference in New Issue
Block a user