From 608e0c9f26b81e734a7a7b57433fc4b0dbdb82ae Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 25 Nov 2022 18:06:22 +0800 Subject: [PATCH] feat: support ed25519 key --- apps/assets/models/base.py | 24 +++---- apps/assets/serializers/utils.py | 11 +--- apps/common/utils/encode.py | 107 ++++++++++++++++++++++--------- 3 files changed, 89 insertions(+), 53 deletions(-) diff --git a/apps/assets/models/base.py b/apps/assets/models/base.py index cc6ea17fe..70c6b54e6 100644 --- a/apps/assets/models/base.py +++ b/apps/assets/models/base.py @@ -1,23 +1,22 @@ # -*- coding: utf-8 -*- # -import io import os -import sshpubkeys from hashlib import md5 -from django.db import models +import sshpubkeys from django.conf import settings -from django.utils import timezone +from django.db import models from django.db.models import QuerySet +from django.utils import timezone from django.utils.translation import ugettext_lazy as _ +from assets.const import Connectivity, SecretType +from common.db import fields from common.utils import ( ssh_key_string_to_obj, ssh_key_gen, get_logger, - random_string, ssh_pubkey_gen, lazyproperty + random_string, lazyproperty, parse_ssh_public_key_str ) -from common.db import fields from orgs.mixins.models import JMSOrgBaseModel -from assets.const import Connectivity, SecretType logger = get_logger(__file__) @@ -88,7 +87,7 @@ class BaseAccount(JMSOrgBaseModel): @lazyproperty def public_key(self): if self.secret_type == SecretType.SSH_KEY: - return ssh_pubkey_gen(private_key=self.private_key) + return parse_ssh_public_key_str(self.private_key) return None @property @@ -97,7 +96,7 @@ class BaseAccount(JMSOrgBaseModel): public_key = self.public_key elif self.private_key: try: - public_key = ssh_pubkey_gen(private_key=self.private_key) + public_key = parse_ssh_public_key_str(self.private_key) except IOError as e: return str(e) else: @@ -129,12 +128,9 @@ class BaseAccount(JMSOrgBaseModel): return key_path def get_private_key(self): - if not self.private_key_obj: + if not self.private_key: return None - string_io = io.StringIO() - self.private_key_obj.write_private_key(string_io) - private_key = string_io.getvalue() - return private_key + return self.private_key @property def public_key_obj(self): diff --git a/apps/assets/serializers/utils.py b/apps/assets/serializers/utils.py index 0734bc9f1..770710843 100644 --- a/apps/assets/serializers/utils.py +++ b/apps/assets/serializers/utils.py @@ -1,9 +1,7 @@ -from io import StringIO - from django.utils.translation import ugettext_lazy as _ from rest_framework import serializers -from common.utils import ssh_private_key_gen, validate_ssh_private_key +from common.utils import validate_ssh_private_key, parse_ssh_private_key_str def validate_password_for_ansible(password): @@ -24,9 +22,4 @@ def validate_ssh_key(ssh_key, passphrase=None): valid = validate_ssh_private_key(ssh_key, password=passphrase) if not valid: raise serializers.ValidationError(_("private key invalid or passphrase error")) - - ssh_key = ssh_private_key_gen(ssh_key, password=passphrase) - string_io = StringIO() - ssh_key.write_private_key(string_io) - ssh_key = string_io.getvalue() - return ssh_key + return parse_ssh_private_key_str(ssh_key, passphrase) diff --git a/apps/common/utils/encode.py b/apps/common/utils/encode.py index 2bf02ac4c..db5aee795 100644 --- a/apps/common/utils/encode.py +++ b/apps/common/utils/encode.py @@ -1,24 +1,23 @@ # -*- coding: utf-8 -*- # -import re -import json -from six import string_types import base64 -import os -import time import hashlib +import json +import os +import re +import time from io import StringIO -from itertools import chain import paramiko import sshpubkeys +from cryptography.hazmat.primitives import serialization +from django.conf import settings +from django.core.serializers.json import DjangoJSONEncoder from itsdangerous import ( TimedJSONWebSignatureSerializer, JSONWebSignatureSerializer, BadSignature, SignatureExpired ) -from django.conf import settings -from django.core.serializers.json import DjangoJSONEncoder -from django.db.models.fields.files import FileField +from six import string_types from .http import http_date @@ -69,22 +68,19 @@ class Signer(metaclass=Singleton): return None +_supported_paramiko_ssh_key_types = (paramiko.RSAKey, paramiko.DSSKey, paramiko.Ed25519Key) + + def ssh_key_string_to_obj(text, password=None): key = None - try: - key = paramiko.RSAKey.from_private_key(StringIO(text), password=password) - except paramiko.SSHException: - pass - else: - return key - - try: - key = paramiko.DSSKey.from_private_key(StringIO(text), password=password) - except paramiko.SSHException: - pass - else: - return key - + for ssh_key_type in _supported_paramiko_ssh_key_types: + if not isinstance(ssh_key_type, paramiko.PKey): + continue + try: + key = ssh_key_type.from_private_key(StringIO(text), password=password) + return key + except paramiko.SSHException: + pass return key @@ -137,17 +133,68 @@ def ssh_key_gen(length=2048, type='rsa', password=None, username='jumpserver', h def validate_ssh_private_key(text, password=None): - if isinstance(text, bytes): + if isinstance(text, str): try: - text = text.decode("utf-8") + text = text.encode("utf-8") + except UnicodeDecodeError: + return False + if isinstance(password, str): + try: + password = password.encode("utf-8") except UnicodeDecodeError: return False - key = ssh_key_string_to_obj(text, password=password) - if key is None: - return False - else: - return True + key = parse_ssh_private_key_str(text, password=password) + return bool(key) + + +def parse_ssh_private_key_str(text: bytes, password=None) -> str: + private_key = _parse_ssh_private_key(text, password=password) + if private_key is None: + return "" + private_key_bytes = private_key.private_bytes(serialization.Encoding.PEM, + serialization.PrivateFormat.OpenSSH, + serialization.NoEncryption()) + return private_key_bytes.decode('utf-8') + + +def parse_ssh_public_key_str(text: bytes = "", password=None) -> str: + private_key = _parse_ssh_private_key(text, password=password) + if private_key is None: + return "" + public_key_bytes = private_key.public_key().public_bytes(serialization.Encoding.OpenSSH, + serialization.PublicFormat.OpenSSH) + return public_key_bytes.decode('utf-8') + + +def _parse_ssh_private_key(text, password=None): + """ + text: bytes + password: str + return:private key types: + ec.EllipticCurvePrivateKey, + rsa.RSAPrivateKey, + dsa.DSAPrivateKey, + ed25519.Ed25519PrivateKey, + """ + if isinstance(text, str): + try: + text = text.encode("utf-8") + except UnicodeDecodeError: + return None + if password is not None: + if isinstance(password, str): + try: + password = password.encode("utf-8") + except UnicodeDecodeError: + return None + + try: + private_key = serialization.load_ssh_private_key(text, password=password) + return private_key + except (ValueError, TypeError): + pass + return None def validate_ssh_public_key(text):