mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-10-22 16:31:33 +00:00
perf: password 等使用 rsa 加密传输 (#8188)
* perf: 修改 model fields 路径 * stash it * pref: 统一加密方式,密码字段采用 rsa 加密 * pref: 临时密码使用 rsa * perf: 去掉 debug msg * perf: 去掉 Debug * perf: 去掉 debug * perf: 抽出来 Co-authored-by: ibuler <ibuler@qq.com>
This commit is contained in:
@@ -1,62 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
import base64
|
||||
from Cryptodome.PublicKey import RSA
|
||||
from Cryptodome.Cipher import PKCS1_v1_5
|
||||
from Cryptodome import Random
|
||||
|
||||
from django.conf import settings
|
||||
from .notifications import DifferentCityLoginMessage
|
||||
from audits.models import UserLoginLog
|
||||
from audits.const import DEFAULT_CITY
|
||||
|
||||
from common.utils import validate_ip, get_ip_city, get_request_ip
|
||||
from common.utils import get_logger
|
||||
from audits.models import UserLoginLog
|
||||
from audits.const import DEFAULT_CITY
|
||||
from .notifications import DifferentCityLoginMessage
|
||||
|
||||
logger = get_logger(__file__)
|
||||
|
||||
|
||||
def gen_key_pair():
|
||||
""" 生成加密key
|
||||
用于登录页面提交用户名/密码时,对密码进行加密(前端)/解密(后端)
|
||||
"""
|
||||
random_generator = Random.new().read
|
||||
rsa = RSA.generate(1024, random_generator)
|
||||
rsa_private_key = rsa.exportKey().decode()
|
||||
rsa_public_key = rsa.publickey().exportKey().decode()
|
||||
return rsa_private_key, rsa_public_key
|
||||
|
||||
|
||||
def rsa_encrypt(message, rsa_public_key):
|
||||
""" 加密登录密码 """
|
||||
key = RSA.importKey(rsa_public_key)
|
||||
cipher = PKCS1_v1_5.new(key)
|
||||
cipher_text = base64.b64encode(cipher.encrypt(message.encode())).decode()
|
||||
return cipher_text
|
||||
|
||||
|
||||
def rsa_decrypt(cipher_text, rsa_private_key=None):
|
||||
""" 解密登录密码 """
|
||||
if rsa_private_key is None:
|
||||
# rsa_private_key 为 None,可以能是API请求认证,不需要解密
|
||||
return cipher_text
|
||||
|
||||
key = RSA.importKey(rsa_private_key)
|
||||
cipher = PKCS1_v1_5.new(key)
|
||||
cipher_decoded = base64.b64decode(cipher_text.encode())
|
||||
# Todo: 弄明白为何要以下这么写,https://xbuba.com/questions/57035263
|
||||
if len(cipher_decoded) == 127:
|
||||
hex_fixed = '00' + cipher_decoded.hex()
|
||||
cipher_decoded = base64.b16decode(hex_fixed.upper())
|
||||
message = cipher.decrypt(cipher_decoded, b'error').decode()
|
||||
return message
|
||||
|
||||
|
||||
def check_different_city_login_if_need(user, request):
|
||||
if not settings.SECURITY_CHECK_DIFFERENT_CITY_LOGIN:
|
||||
return
|
||||
|
||||
ip = get_request_ip(request) or '0.0.0.0'
|
||||
|
||||
if not (ip and validate_ip(ip)):
|
||||
city = DEFAULT_CITY
|
||||
else:
|
||||
|
Reference in New Issue
Block a user