mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-02 16:05:29 +00:00
feat: Vault adds Amazon Secrets Manager (#14515)
* feat: Vault adds Amazon Secrets Manager * perf: optimizing the code --------- Co-authored-by: jiangweidong <1053570670@qq.com>
This commit is contained in:
@@ -1,41 +1,13 @@
|
||||
import sys
|
||||
from abc import ABC
|
||||
|
||||
from common.db.utils import Encryptor
|
||||
from common.utils import lazyproperty
|
||||
|
||||
current_module = sys.modules[__name__]
|
||||
|
||||
__all__ = ['build_entry']
|
||||
from ..base.entries import BaseEntry
|
||||
|
||||
|
||||
class BaseEntry(ABC):
|
||||
|
||||
def __init__(self, instance):
|
||||
self.instance = instance
|
||||
|
||||
@lazyproperty
|
||||
class AzureBaseEntry(BaseEntry):
|
||||
@property
|
||||
def full_path(self):
|
||||
return self.path_spec
|
||||
|
||||
@property
|
||||
def path_spec(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def to_internal_data(self):
|
||||
secret = getattr(self.instance, '_secret', None)
|
||||
if secret is not None:
|
||||
secret = Encryptor(secret).encrypt()
|
||||
return secret
|
||||
|
||||
@staticmethod
|
||||
def to_external_data(secret):
|
||||
if secret is not None:
|
||||
secret = Encryptor(secret).decrypt()
|
||||
return secret
|
||||
|
||||
|
||||
class AccountEntry(BaseEntry):
|
||||
class AccountEntry(AzureBaseEntry):
|
||||
|
||||
@property
|
||||
def path_spec(self):
|
||||
@@ -45,7 +17,7 @@ class AccountEntry(BaseEntry):
|
||||
return path
|
||||
|
||||
|
||||
class AccountTemplateEntry(BaseEntry):
|
||||
class AccountTemplateEntry(AzureBaseEntry):
|
||||
|
||||
@property
|
||||
def path_spec(self):
|
||||
@@ -53,18 +25,9 @@ class AccountTemplateEntry(BaseEntry):
|
||||
return path
|
||||
|
||||
|
||||
class HistoricalAccountEntry(BaseEntry):
|
||||
class HistoricalAccountEntry(AzureBaseEntry):
|
||||
|
||||
@property
|
||||
def path_spec(self):
|
||||
path = f'accounts-{self.instance.instance.id}-histories-{self.instance.history_id}'
|
||||
return path
|
||||
|
||||
|
||||
def build_entry(instance) -> BaseEntry:
|
||||
class_name = instance.__class__.__name__
|
||||
entry_class_name = f'{class_name}Entry'
|
||||
entry_class = getattr(current_module, entry_class_name, None)
|
||||
if not entry_class:
|
||||
raise Exception(f'Entry class {entry_class_name} is not found')
|
||||
return entry_class(instance)
|
||||
|
Reference in New Issue
Block a user