mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-06 09:51:00 +00:00
[Feature] 支持otp
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
# ~*~ coding: utf-8 ~*~
|
||||
#
|
||||
from __future__ import unicode_literals
|
||||
import os
|
||||
import pyotp
|
||||
import base64
|
||||
import logging
|
||||
import uuid
|
||||
@@ -17,7 +19,6 @@ from common.tasks import send_mail_async
|
||||
from common.utils import reverse, get_object_or_none
|
||||
from .models import User, LoginLog
|
||||
|
||||
|
||||
logger = logging.getLogger('jumpserver')
|
||||
|
||||
|
||||
@@ -163,7 +164,7 @@ def generate_token(request, user):
|
||||
remote_addr = request.META.get('REMOTE_ADDR', '')
|
||||
if not isinstance(remote_addr, bytes):
|
||||
remote_addr = remote_addr.encode("utf-8")
|
||||
remote_addr = base64.b16encode(remote_addr) #.replace(b'=', '')
|
||||
remote_addr = base64.b16encode(remote_addr) # .replace(b'=', '')
|
||||
token = cache.get('%s_%s' % (user.id, remote_addr))
|
||||
if not token:
|
||||
token = uuid.uuid4().hex
|
||||
@@ -181,6 +182,16 @@ def validate_ip(ip):
|
||||
return False
|
||||
|
||||
|
||||
def get_login_ip(request):
|
||||
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '').split(',')
|
||||
if x_forwarded_for and x_forwarded_for[0]:
|
||||
login_ip = x_forwarded_for[0]
|
||||
else:
|
||||
login_ip = request.META.get('REMOTE_ADDR', '')
|
||||
|
||||
return login_ip
|
||||
|
||||
|
||||
def write_login_log(username, type='', ip='', user_agent=''):
|
||||
if not (ip and validate_ip(ip)):
|
||||
ip = ip[:15]
|
||||
@@ -211,3 +222,35 @@ def get_ip_city(ip, timeout=10):
|
||||
except ValueError:
|
||||
pass
|
||||
return city
|
||||
|
||||
|
||||
def get_user(request):
|
||||
if is_login(request):
|
||||
user = request.user
|
||||
else:
|
||||
user = cache.get(request.session.session_key)
|
||||
return user
|
||||
|
||||
|
||||
def is_login(request):
|
||||
return isinstance(request.user, User)
|
||||
|
||||
|
||||
def redirect_user_first_login_or_index(request, redirect_field_name):
|
||||
if request.user.is_first_login:
|
||||
return reverse('users:user-first-login')
|
||||
return request.POST.get(
|
||||
redirect_field_name,
|
||||
request.GET.get(redirect_field_name, reverse('index')))
|
||||
|
||||
|
||||
def generate_otp_uri(user, issuer="Jumpserver"):
|
||||
otp_secret_key = base64.b32encode(os.urandom(10)).decode('utf-8')
|
||||
cache.set('otp_secret_key', otp_secret_key, 300)
|
||||
totp = pyotp.TOTP(otp_secret_key)
|
||||
return totp.provisioning_uri(name=user.username, issuer_name=issuer)
|
||||
|
||||
|
||||
def check_otp_code(otp_secret_key, otp_code):
|
||||
totp = pyotp.TOTP(otp_secret_key)
|
||||
return totp.verify(otp_code)
|
||||
|
Reference in New Issue
Block a user