mirror of
https://github.com/haiwen/seahub.git
synced 2025-08-22 08:47:22 +00:00
commit
cd62d8d572
@ -74,7 +74,7 @@ def _get_login_failed_attempts(username=None, ip=None):
|
|||||||
|
|
||||||
return max(username_attempts, ip_attempts)
|
return max(username_attempts, ip_attempts)
|
||||||
|
|
||||||
def _incr_login_faied_attempts(username=None, ip=None):
|
def _incr_login_failed_attempts(username=None, ip=None):
|
||||||
"""Increase login failed attempts by 1 for both username and ip.
|
"""Increase login failed attempts by 1 for both username and ip.
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
@ -109,8 +109,11 @@ def _clear_login_failed_attempts(request):
|
|||||||
"""
|
"""
|
||||||
username = request.user.username
|
username = request.user.username
|
||||||
ip = get_remote_ip(request)
|
ip = get_remote_ip(request)
|
||||||
cache.delete(LOGIN_ATTEMPT_PREFIX + username)
|
cache.delete(LOGIN_ATTEMPT_PREFIX + urlquote(username))
|
||||||
cache.delete(LOGIN_ATTEMPT_PREFIX + ip)
|
cache.delete(LOGIN_ATTEMPT_PREFIX + ip)
|
||||||
|
p = Profile.objects.get_profile_by_user(username)
|
||||||
|
if p and p.login_id:
|
||||||
|
cache.delete(LOGIN_ATTEMPT_PREFIX + urlquote(p.login_id))
|
||||||
|
|
||||||
def _handle_login_form_valid(request, user, redirect_to, remember_me):
|
def _handle_login_form_valid(request, user, redirect_to, remember_me):
|
||||||
if UserOptions.objects.passwd_change_required(
|
if UserOptions.objects.passwd_change_required(
|
||||||
@ -135,76 +138,71 @@ def login(request, template_name='registration/login.html',
|
|||||||
|
|
||||||
redirect_to = request.REQUEST.get(redirect_field_name, '')
|
redirect_to = request.REQUEST.get(redirect_field_name, '')
|
||||||
ip = get_remote_ip(request)
|
ip = get_remote_ip(request)
|
||||||
failed_attempt = _get_login_failed_attempts(ip=ip)
|
|
||||||
|
|
||||||
if request.method == "POST":
|
if request.method == "POST":
|
||||||
username = urlquote(request.REQUEST.get('username', '').strip())
|
login = urlquote(request.REQUEST.get('login', '').strip())
|
||||||
|
failed_attempt = _get_login_failed_attempts(username=login, ip=ip)
|
||||||
remember_me = True if request.REQUEST.get('remember_me',
|
remember_me = True if request.REQUEST.get('remember_me',
|
||||||
'') == 'on' else False
|
'') == 'on' else False
|
||||||
|
|
||||||
|
# check the form
|
||||||
|
used_captcha_already = False
|
||||||
|
if bool(config.FREEZE_USER_ON_LOGIN_FAILED) is True:
|
||||||
|
form = authentication_form(data=request.POST)
|
||||||
|
else:
|
||||||
|
if failed_attempt >= config.LOGIN_ATTEMPT_LIMIT:
|
||||||
|
form = CaptchaAuthenticationForm(data=request.POST)
|
||||||
|
used_captcha_already = True
|
||||||
|
else:
|
||||||
|
form = authentication_form(data=request.POST)
|
||||||
|
|
||||||
|
if form.is_valid():
|
||||||
|
return _handle_login_form_valid(request, form.get_user(),
|
||||||
|
redirect_to, remember_me)
|
||||||
|
|
||||||
|
# form is invalid
|
||||||
|
failed_attempt = _incr_login_failed_attempts(username=login,
|
||||||
|
ip=ip)
|
||||||
|
|
||||||
if failed_attempt >= config.LOGIN_ATTEMPT_LIMIT:
|
if failed_attempt >= config.LOGIN_ATTEMPT_LIMIT:
|
||||||
if bool(config.FREEZE_USER_ON_LOGIN_FAILED) is True:
|
if bool(config.FREEZE_USER_ON_LOGIN_FAILED) is True:
|
||||||
# log user in if password is valid otherwise freeze account
|
# log user in if password is valid otherwise freeze account
|
||||||
form = authentication_form(data=request.POST)
|
logger.warn('Login attempt limit reached, try freeze the user, email/username: %s, ip: %s, attemps: %d' %
|
||||||
if form.is_valid():
|
(login, ip, failed_attempt))
|
||||||
return _handle_login_form_valid(request, form.get_user(),
|
|
||||||
redirect_to, remember_me)
|
|
||||||
else:
|
|
||||||
# freeze user account anyway
|
|
||||||
login = request.REQUEST.get('login', '')
|
login = request.REQUEST.get('login', '')
|
||||||
email = Profile.objects.get_username_by_login_id(login)
|
email = Profile.objects.get_username_by_login_id(login)
|
||||||
if email is None:
|
if email is None:
|
||||||
email = login
|
email = login
|
||||||
|
|
||||||
try:
|
try:
|
||||||
user = User.objects.get(email)
|
user = User.objects.get(email)
|
||||||
if user.is_active:
|
if user.is_active:
|
||||||
user.freeze_user(notify_admins=True)
|
user.freeze_user(notify_admins=True)
|
||||||
|
logger.warn('Login attempt limit reached, freeze the user email/username: %s, ip: %s, attemps: %d' %
|
||||||
|
(login, ip, failed_attempt))
|
||||||
except User.DoesNotExist:
|
except User.DoesNotExist:
|
||||||
|
logger.warn('Login attempt limit reached with invalid email/username: %s, ip: %s, attemps: %d' %
|
||||||
|
(login, ip, failed_attempt))
|
||||||
pass
|
pass
|
||||||
form.errors['freeze_account'] = _('This account has been frozen due to too many failed login attempts.')
|
form.errors['freeze_account'] = _('This account has been frozen due to too many failed login attempts.')
|
||||||
else:
|
else:
|
||||||
# log user in if password is valid otherwise show captcha
|
# use a new form with Captcha
|
||||||
form = CaptchaAuthenticationForm(data=request.POST)
|
logger.warn('Login attempt limit reached, show Captcha, email/username: %s, ip: %s, attemps: %d' %
|
||||||
if form.is_valid():
|
|
||||||
return _handle_login_form_valid(request, form.get_user(),
|
|
||||||
redirect_to, remember_me)
|
|
||||||
else:
|
|
||||||
# show page with captcha and increase failed login attempts
|
|
||||||
_incr_login_faied_attempts(username=username, ip=ip)
|
|
||||||
else:
|
|
||||||
# login failed attempts < limit
|
|
||||||
form = authentication_form(data=request.POST)
|
|
||||||
if form.is_valid():
|
|
||||||
return _handle_login_form_valid(request, form.get_user(),
|
|
||||||
redirect_to, remember_me)
|
|
||||||
else:
|
|
||||||
# increase failed attempts
|
|
||||||
login = urlquote(request.REQUEST.get('login', '').strip())
|
|
||||||
failed_attempt = _incr_login_faied_attempts(username=login,
|
|
||||||
ip=ip)
|
|
||||||
|
|
||||||
if failed_attempt >= config.LOGIN_ATTEMPT_LIMIT:
|
|
||||||
logger.warn('Login attempt limit reached, email/username: %s, ip: %s, attemps: %d' %
|
|
||||||
(login, ip, failed_attempt))
|
(login, ip, failed_attempt))
|
||||||
|
if not used_captcha_already:
|
||||||
if bool(config.FREEZE_USER_ON_LOGIN_FAILED) is True:
|
|
||||||
form = authentication_form(data=request.POST)
|
|
||||||
else:
|
|
||||||
form = CaptchaAuthenticationForm()
|
form = CaptchaAuthenticationForm()
|
||||||
else:
|
|
||||||
form = authentication_form(data=request.POST)
|
|
||||||
else:
|
else:
|
||||||
### GET
|
### GET
|
||||||
|
failed_attempt = _get_login_failed_attempts(ip=ip)
|
||||||
if failed_attempt >= config.LOGIN_ATTEMPT_LIMIT:
|
if failed_attempt >= config.LOGIN_ATTEMPT_LIMIT:
|
||||||
logger.warn('Login attempt limit reached, ip: %s, attempts: %d' %
|
|
||||||
(ip, failed_attempt))
|
|
||||||
if bool(config.FREEZE_USER_ON_LOGIN_FAILED) is True:
|
if bool(config.FREEZE_USER_ON_LOGIN_FAILED) is True:
|
||||||
form = authentication_form(data=request.POST)
|
form = authentication_form()
|
||||||
else:
|
else:
|
||||||
|
logger.warn('Login attempt limit reached, show Captcha, ip: %s, attempts: %d' %
|
||||||
|
(ip, failed_attempt))
|
||||||
form = CaptchaAuthenticationForm()
|
form = CaptchaAuthenticationForm()
|
||||||
else:
|
else:
|
||||||
form = authentication_form(request)
|
form = authentication_form()
|
||||||
|
|
||||||
request.session.set_test_cookie()
|
request.session.set_test_cookie()
|
||||||
|
|
||||||
|
@ -145,8 +145,15 @@ class BaseTestCase(TestCase, Fixtures):
|
|||||||
self.remove_repo(self.repo.id)
|
self.remove_repo(self.repo.id)
|
||||||
|
|
||||||
def login_as(self, user):
|
def login_as(self, user):
|
||||||
|
if isinstance(user, basestring):
|
||||||
|
login = user
|
||||||
|
elif isinstance(user, User):
|
||||||
|
login = user.username
|
||||||
|
else:
|
||||||
|
assert False
|
||||||
|
|
||||||
return self.client.post(
|
return self.client.post(
|
||||||
reverse('auth_login'), {'login': user.username,
|
reverse('auth_login'), {'login': login,
|
||||||
'password': self.user_password}
|
'password': self.user_password}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1,9 +1,15 @@
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
from django.core.cache import cache
|
||||||
from django.core.urlresolvers import reverse
|
from django.core.urlresolvers import reverse
|
||||||
|
from django.utils.http import urlquote
|
||||||
|
|
||||||
from constance import config
|
from constance import config
|
||||||
|
|
||||||
|
from seahub.base.accounts import User
|
||||||
|
from seahub.auth.forms import AuthenticationForm, CaptchaAuthenticationForm
|
||||||
|
from seahub.auth.views import LOGIN_ATTEMPT_PREFIX
|
||||||
from seahub.options.models import UserOptions
|
from seahub.options.models import UserOptions
|
||||||
|
from seahub.profile.models import Profile
|
||||||
from seahub.test_utils import BaseTestCase
|
from seahub.test_utils import BaseTestCase
|
||||||
|
|
||||||
|
|
||||||
@ -17,6 +23,20 @@ class LoginTest(BaseTestCase):
|
|||||||
self.assertEqual(302, resp.status_code)
|
self.assertEqual(302, resp.status_code)
|
||||||
self.assertRegexpMatches(resp['Location'], r'http://testserver%s' % settings.LOGIN_REDIRECT_URL)
|
self.assertRegexpMatches(resp['Location'], r'http://testserver%s' % settings.LOGIN_REDIRECT_URL)
|
||||||
|
|
||||||
|
def test_can_login_with_login_id(self):
|
||||||
|
p = Profile.objects.add_or_update(self.user.username, 'nickname')
|
||||||
|
login_id = 'test_login_id'
|
||||||
|
p.login_id = login_id
|
||||||
|
p.save()
|
||||||
|
assert Profile.objects.get_username_by_login_id(login_id) == self.user.username
|
||||||
|
|
||||||
|
resp = self.client.post(
|
||||||
|
reverse('auth_login'), {'login': login_id,
|
||||||
|
'password': self.user_password}
|
||||||
|
)
|
||||||
|
self.assertEqual(302, resp.status_code)
|
||||||
|
self.assertRegexpMatches(resp['Location'], r'http://testserver%s' % settings.LOGIN_REDIRECT_URL)
|
||||||
|
|
||||||
def test_redirect_to_after_success_login(self):
|
def test_redirect_to_after_success_login(self):
|
||||||
resp = self.client.post(
|
resp = self.client.post(
|
||||||
reverse('auth_login') + '?next=/foo/',
|
reverse('auth_login') + '?next=/foo/',
|
||||||
@ -62,17 +82,15 @@ class LoginTest(BaseTestCase):
|
|||||||
self.assertEqual(resp.context['force_passwd_change'], True)
|
self.assertEqual(resp.context['force_passwd_change'], True)
|
||||||
|
|
||||||
|
|
||||||
class LoginCaptchaTest(BaseTestCase):
|
class LoginTestMixin():
|
||||||
def setUp(self):
|
"""Utility methods for login test.
|
||||||
config.LOGIN_ATTEMPT_LIMIT = 1
|
"""
|
||||||
config.FREEZE_USER_ON_LOGIN_FAILED = False
|
def _bad_passwd_login(self, user=None):
|
||||||
|
if user is None:
|
||||||
|
user = self.user
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
self.clear_cache()
|
|
||||||
|
|
||||||
def _bad_passwd_login(self):
|
|
||||||
resp = self.client.post(
|
resp = self.client.post(
|
||||||
reverse('auth_login'), {'login': self.user.username,
|
reverse('auth_login'), {'login': user.username,
|
||||||
'password': 'badpassword'}
|
'password': 'badpassword'}
|
||||||
)
|
)
|
||||||
return resp
|
return resp
|
||||||
@ -81,17 +99,115 @@ class LoginCaptchaTest(BaseTestCase):
|
|||||||
resp = self.client.get(reverse('auth_login'))
|
resp = self.client.get(reverse('auth_login'))
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
def _get_user_login_failed_attempt(self, username):
|
||||||
|
return cache.get(LOGIN_ATTEMPT_PREFIX + urlquote(username), 0)
|
||||||
|
|
||||||
|
|
||||||
|
class LoginCaptchaTest(BaseTestCase, LoginTestMixin):
|
||||||
|
def setUp(self):
|
||||||
|
self.clear_cache() # make sure cache is clean
|
||||||
|
|
||||||
|
config.LOGIN_ATTEMPT_LIMIT = 3
|
||||||
|
config.FREEZE_USER_ON_LOGIN_FAILED = False
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.clear_cache()
|
||||||
|
|
||||||
def test_can_show_captcha(self):
|
def test_can_show_captcha(self):
|
||||||
resp = self._bad_passwd_login()
|
|
||||||
print resp.context['form']
|
|
||||||
|
|
||||||
resp = self._bad_passwd_login()
|
|
||||||
print resp.context['form']
|
|
||||||
|
|
||||||
resp = self._bad_passwd_login()
|
|
||||||
print resp.context['form']
|
|
||||||
|
|
||||||
print '-------------'
|
|
||||||
resp = self._login_page()
|
resp = self._login_page()
|
||||||
print resp.context['form']
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert self._get_user_login_failed_attempt(self.user.username) == 0
|
||||||
|
|
||||||
|
# first failed login
|
||||||
|
resp = self._bad_passwd_login()
|
||||||
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert isinstance(resp.context['form'], CaptchaAuthenticationForm) is False
|
||||||
|
assert self._get_user_login_failed_attempt(self.user.username) == 1
|
||||||
|
|
||||||
|
# second failed login
|
||||||
|
resp = self._bad_passwd_login()
|
||||||
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert isinstance(resp.context['form'], CaptchaAuthenticationForm) is False
|
||||||
|
assert self._get_user_login_failed_attempt(self.user.username) == 2
|
||||||
|
|
||||||
|
# third failed login, and show the captha
|
||||||
|
resp = self._bad_passwd_login()
|
||||||
|
assert isinstance(resp.context['form'], CaptchaAuthenticationForm) is True
|
||||||
|
assert self._get_user_login_failed_attempt(self.user.username) == 3
|
||||||
|
|
||||||
|
def test_can_clear_failed_attempt_after_login(self):
|
||||||
|
resp = self._login_page()
|
||||||
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert self._get_user_login_failed_attempt(self.user.username) == 0
|
||||||
|
|
||||||
|
# first failed login
|
||||||
|
resp = self._bad_passwd_login()
|
||||||
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert isinstance(resp.context['form'], CaptchaAuthenticationForm) is False
|
||||||
|
assert self._get_user_login_failed_attempt(self.user.username) == 1
|
||||||
|
|
||||||
|
# successful login
|
||||||
|
self.login_as(self.user)
|
||||||
|
|
||||||
|
assert self._get_user_login_failed_attempt(self.user.username) == 0
|
||||||
|
|
||||||
|
def test_login_with_login_id(self):
|
||||||
|
p = Profile.objects.add_or_update(self.user.username, 'nickname')
|
||||||
|
login_id = 'test_login_id'
|
||||||
|
p.login_id = login_id
|
||||||
|
p.save()
|
||||||
|
assert Profile.objects.get_username_by_login_id(login_id) == self.user.username
|
||||||
|
|
||||||
|
# first failed login
|
||||||
|
resp = self._bad_passwd_login()
|
||||||
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert isinstance(resp.context['form'], CaptchaAuthenticationForm) is False
|
||||||
|
assert self._get_user_login_failed_attempt(self.user.username) == 1
|
||||||
|
|
||||||
|
# successful login using login id
|
||||||
|
self.login_as(login_id)
|
||||||
|
|
||||||
|
assert self._get_user_login_failed_attempt(self.user.username) == 0
|
||||||
|
|
||||||
|
|
||||||
|
class FreezeUserOnLoginFailedTest(BaseTestCase, LoginTestMixin):
|
||||||
|
def setUp(self):
|
||||||
|
self.clear_cache() # make sure cache is clean
|
||||||
|
|
||||||
|
config.LOGIN_ATTEMPT_LIMIT = 3
|
||||||
|
config.FREEZE_USER_ON_LOGIN_FAILED = True
|
||||||
|
|
||||||
|
self.tmp_user = self.create_user()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.clear_cache()
|
||||||
|
self.remove_user(self.tmp_user.username)
|
||||||
|
|
||||||
|
def test_can_freeze(self):
|
||||||
|
assert bool(config.FREEZE_USER_ON_LOGIN_FAILED) is True
|
||||||
|
|
||||||
|
resp = self._login_page()
|
||||||
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert self._get_user_login_failed_attempt(self.tmp_user.username) == 0
|
||||||
|
|
||||||
|
# first failed login
|
||||||
|
resp = self._bad_passwd_login(user=self.tmp_user)
|
||||||
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert isinstance(resp.context['form'], CaptchaAuthenticationForm) is False
|
||||||
|
assert self._get_user_login_failed_attempt(self.tmp_user.username) == 1
|
||||||
|
assert User.objects.get(self.tmp_user.username).is_active is True
|
||||||
|
|
||||||
|
# second failed login
|
||||||
|
resp = self._bad_passwd_login(user=self.tmp_user)
|
||||||
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert isinstance(resp.context['form'], CaptchaAuthenticationForm) is False
|
||||||
|
assert self._get_user_login_failed_attempt(self.tmp_user.username) == 2
|
||||||
|
assert User.objects.get(self.tmp_user.username).is_active is True
|
||||||
|
|
||||||
|
# third failed login, and freeze user instead of showing captha
|
||||||
|
resp = self._bad_passwd_login(user=self.tmp_user)
|
||||||
|
assert isinstance(resp.context['form'], AuthenticationForm) is True
|
||||||
|
assert isinstance(resp.context['form'], CaptchaAuthenticationForm) is False
|
||||||
|
assert self._get_user_login_failed_attempt(self.tmp_user.username) == 3
|
||||||
|
|
||||||
|
assert User.objects.get(self.tmp_user.username).is_active is False
|
||||||
|
Loading…
Reference in New Issue
Block a user