jumpserver/apps/accounts/utils.py

142 lines
4.3 KiB
Python

import copy
from django.conf import settings
from django.core.cache import cache
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from accounts.const import SecretType, DEFAULT_PASSWORD_RULES
from common.utils import ssh_key_gen, random_string
from common.utils import validate_ssh_private_key, parse_ssh_private_key_str
class SecretGenerator:
def __init__(self, secret_strategy, secret_type, password_rules=None):
self.secret_strategy = secret_strategy
self.secret_type = secret_type
self.password_rules = password_rules
@staticmethod
def generate_ssh_key():
private_key, __ = ssh_key_gen()
return private_key
def generate_password(self):
password_rules = self.password_rules
if not password_rules or not isinstance(password_rules, dict):
password_rules = {}
rules = copy.deepcopy(DEFAULT_PASSWORD_RULES)
rules.update(password_rules)
rules = {
"length": rules["length"],
"lower": rules["lowercase"],
"upper": rules["uppercase"],
"digit": rules["digit"],
"special_char": rules["symbol"],
"exclude_chars": rules.get("exclude_symbols", ""),
}
return random_string(**rules)
def get_secret(self):
if self.secret_type == SecretType.SSH_KEY:
secret = self.generate_ssh_key()
elif self.secret_type == SecretType.PASSWORD:
secret = self.generate_password()
else:
raise ValueError("Secret must be set")
return secret
def validate_password_for_ansible(password):
"""校验 Ansible 不支持的特殊字符"""
if password.startswith("{{") and password.endswith("}}"):
raise serializers.ValidationError(
_(
"If the password starts with {{` and ends with }} `, then the password is not allowed."
)
)
def validate_ssh_key(ssh_key, passphrase=None):
valid = validate_ssh_private_key(ssh_key, password=passphrase)
if not valid:
raise serializers.ValidationError(_("private key invalid or passphrase error"))
return parse_ssh_private_key_str(ssh_key, passphrase)
class AccountSecretTaskStatus:
def __init__(
self,
prefix='queue:change_secret:',
debounce_key='debounce:change_secret:task',
debounce_timeout=10,
queue_status_timeout=60,
default_timeout=3600,
delayed_task_countdown=20,
):
self.prefix = prefix
self.debounce_key = debounce_key
self.debounce_timeout = debounce_timeout
self.queue_status_timeout = queue_status_timeout
self.default_timeout = default_timeout
self.delayed_task_countdown = delayed_task_countdown
self.enabled = getattr(settings, 'CHANGE_SECRET_AFTER_SESSION_END', False)
def _key(self, identifier):
return f"{self.prefix}{identifier}"
@property
def account_ids(self):
for key in cache.iter_keys(f"{self.prefix}*"):
yield key.split(':')[-1]
def is_debounced(self):
return cache.add(self.debounce_key, True, self.debounce_timeout)
def get_queue_key(self, identifier):
return self._key(identifier)
def set_status(
self,
identifier,
status,
timeout=None,
metadata=None,
use_add=False
):
if not self.enabled:
return
key = self._key(identifier)
data = {"status": status}
if metadata:
data.update(metadata)
if use_add:
return cache.add(key, data, timeout or self.queue_status_timeout)
cache.set(key, data, timeout or self.default_timeout)
def get(self, identifier):
return cache.get(self._key(identifier), {})
def get_status(self, identifier):
if not self.enabled:
return
record = cache.get(self._key(identifier), {})
return record.get("status")
def get_ttl(self, identifier):
return cache.ttl(self._key(identifier))
def clear(self, identifier):
if not self.enabled:
return
cache.delete(self._key(identifier))
account_secret_task_status = AccountSecretTaskStatus()