Merge pull request #111126 from aramase/kms-v2alpha1-impl

Implement KMS v2alpha1
This commit is contained in:
Kubernetes Prow Robot 2022-08-03 16:41:43 -07:00 committed by GitHub
commit 0a2ae7ab3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 2446 additions and 103 deletions

View File

@ -21,7 +21,9 @@ set -o pipefail
KUBE_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
KUBE_KMS_V1BETA1="${KUBE_ROOT}/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1/"
KUBE_KMS_V2ALPHA1="${KUBE_ROOT}/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1/"
KUBE_KMS_V2="${KUBE_ROOT}/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1/"
source "${KUBE_ROOT}/hack/lib/protoc.sh"
kube::protoc::generate_proto "${KUBE_KMS_V1BETA1}"
kube::protoc::generate_proto "${KUBE_KMS_V2ALPHA1}"
kube::protoc::generate_proto "${KUBE_KMS_V2}"

View File

@ -26,6 +26,7 @@ KUBE_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
ERROR="KMS gRPC is out of date. Please run hack/update-generated-kms.sh"
KUBE_KMS_V1BETA1="${KUBE_ROOT}/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1/"
KUBE_KMS_V2ALPHA1="${KUBE_ROOT}/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1/"
KUBE_KMS_V2="${KUBE_ROOT}/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1/"
source "${KUBE_ROOT}/hack/lib/protoc.sh"
kube::golang::setup_env
@ -33,6 +34,7 @@ kube::golang::setup_env
function cleanup {
rm -rf "${KUBE_KMS_V1BETA1}/_tmp/"
rm -rf "${KUBE_KMS_V2ALPHA1}/_tmp/"
rm -rf "${KUBE_KMS_V2}/_tmp/"
}
trap cleanup EXIT
@ -41,9 +43,13 @@ mkdir -p "${KUBE_KMS_V1BETA1}/_tmp"
cp "${KUBE_KMS_V1BETA1}/api.pb.go" "${KUBE_KMS_V1BETA1}/_tmp/"
mkdir -p "${KUBE_KMS_V2ALPHA1}/_tmp"
cp "${KUBE_KMS_V2ALPHA1}/api.pb.go" "${KUBE_KMS_V2ALPHA1}/_tmp/"
mkdir -p "${KUBE_KMS_V2}/_tmp"
cp "${KUBE_KMS_V2}/api.pb.go" "${KUBE_KMS_V2}/_tmp/"
KUBE_VERBOSE=3 "${KUBE_ROOT}/hack/update-generated-kms.sh"
kube::protoc::diff "${KUBE_KMS_V1BETA1}/api.pb.go" "${KUBE_KMS_V1BETA1}/_tmp/api.pb.go" "${ERROR}"
echo "Generated kms v1beta1 api is up to date."
kube::protoc::diff "${KUBE_KMS_V2ALPHA1}/api.pb.go" "${KUBE_KMS_V2ALPHA1}/_tmp/api.pb.go" "${ERROR}"
echo "Generated kms v2alpha1 api is up to date."
kube::protoc::diff "${KUBE_KMS_V2}/api.pb.go" "${KUBE_KMS_V2}/_tmp/api.pb.go" "${ERROR}"
echo "Generated kms v2 api is up to date."

View File

@ -86,6 +86,9 @@ type IdentityConfiguration struct{}
// KMSConfiguration contains the name, cache size and path to configuration file for a KMS based envelope transformer.
type KMSConfiguration struct {
// apiVersion of KeyManagementService
// +optional
APIVersion string
// name is the name of the KMS plugin to be used.
Name string
// cachesize is the maximum number of secrets which are cached in memory. The default value is 1000.

View File

@ -24,8 +24,9 @@ import (
)
var (
defaultTimeout = &metav1.Duration{Duration: 3 * time.Second}
defaultCacheSize int32 = 1000
defaultTimeout = &metav1.Duration{Duration: 3 * time.Second}
defaultCacheSize int32 = 1000
defaultAPIVersion = "v1"
)
func addDefaultingFuncs(scheme *runtime.Scheme) error {
@ -41,4 +42,8 @@ func SetDefaults_KMSConfiguration(obj *KMSConfiguration) {
if obj.CacheSize == nil {
obj.CacheSize = &defaultCacheSize
}
if obj.APIVersion == "" {
obj.APIVersion = defaultAPIVersion
}
}

View File

@ -34,12 +34,12 @@ func TestKMSProviderTimeoutDefaults(t *testing.T) {
{
desc: "timeout not supplied",
in: &KMSConfiguration{},
want: &KMSConfiguration{Timeout: defaultTimeout, CacheSize: &defaultCacheSize},
want: &KMSConfiguration{Timeout: defaultTimeout, CacheSize: &defaultCacheSize, APIVersion: defaultAPIVersion},
},
{
desc: "timeout supplied",
in: &KMSConfiguration{Timeout: &v1.Duration{Duration: 1 * time.Minute}},
want: &KMSConfiguration{Timeout: &v1.Duration{Duration: 1 * time.Minute}, CacheSize: &defaultCacheSize},
want: &KMSConfiguration{Timeout: &v1.Duration{Duration: 1 * time.Minute}, CacheSize: &defaultCacheSize, APIVersion: defaultAPIVersion},
},
}
@ -67,17 +67,45 @@ func TestKMSProviderCacheDefaults(t *testing.T) {
{
desc: "cache size not supplied",
in: &KMSConfiguration{},
want: &KMSConfiguration{Timeout: defaultTimeout, CacheSize: &defaultCacheSize},
want: &KMSConfiguration{Timeout: defaultTimeout, CacheSize: &defaultCacheSize, APIVersion: defaultAPIVersion},
},
{
desc: "cache of zero size supplied",
in: &KMSConfiguration{CacheSize: &zero},
want: &KMSConfiguration{Timeout: defaultTimeout, CacheSize: &zero},
want: &KMSConfiguration{Timeout: defaultTimeout, CacheSize: &zero, APIVersion: defaultAPIVersion},
},
{
desc: "positive cache size supplied",
in: &KMSConfiguration{CacheSize: &ten},
want: &KMSConfiguration{Timeout: defaultTimeout, CacheSize: &ten},
want: &KMSConfiguration{Timeout: defaultTimeout, CacheSize: &ten, APIVersion: defaultAPIVersion},
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
SetDefaults_KMSConfiguration(tt.in)
if d := cmp.Diff(tt.want, tt.in); d != "" {
t.Fatalf("KMS Provider mismatch (-want +got):\n%s", d)
}
})
}
}
func TestKMSProviderAPIVersionDefaults(t *testing.T) {
testCases := []struct {
desc string
in *KMSConfiguration
want *KMSConfiguration
}{
{
desc: "apiVersion not supplied",
in: &KMSConfiguration{},
want: &KMSConfiguration{Timeout: defaultTimeout, CacheSize: &defaultCacheSize, APIVersion: defaultAPIVersion},
},
{
desc: "apiVersion supplied",
in: &KMSConfiguration{Timeout: &v1.Duration{Duration: 1 * time.Minute}, APIVersion: "v2"},
want: &KMSConfiguration{Timeout: &v1.Duration{Duration: 1 * time.Minute}, CacheSize: &defaultCacheSize, APIVersion: "v2"},
},
}

View File

@ -86,6 +86,9 @@ type IdentityConfiguration struct{}
// KMSConfiguration contains the name, cache size and path to configuration file for a KMS based envelope transformer.
type KMSConfiguration struct {
// apiVersion of KeyManagementService
// +optional
APIVersion string `json:"apiVersion"`
// name is the name of the KMS plugin to be used.
Name string `json:"name"`
// cachesize is the maximum number of secrets which are cached in memory. The default value is 1000.

View File

@ -179,6 +179,7 @@ func Convert_config_IdentityConfiguration_To_v1_IdentityConfiguration(in *config
}
func autoConvert_v1_KMSConfiguration_To_config_KMSConfiguration(in *KMSConfiguration, out *config.KMSConfiguration, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Name = in.Name
out.CacheSize = (*int32)(unsafe.Pointer(in.CacheSize))
out.Endpoint = in.Endpoint
@ -192,6 +193,7 @@ func Convert_v1_KMSConfiguration_To_config_KMSConfiguration(in *KMSConfiguration
}
func autoConvert_config_KMSConfiguration_To_v1_KMSConfiguration(in *config.KMSConfiguration, out *KMSConfiguration, s conversion.Scope) error {
out.APIVersion = in.APIVersion
out.Name = in.Name
out.CacheSize = (*int32)(unsafe.Pointer(in.CacheSize))
out.Endpoint = in.Endpoint

View File

@ -21,22 +21,25 @@ import (
"encoding/base64"
"fmt"
"net/url"
"strings"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/apis/config"
)
const (
moreThanOneElementErr = "more than one provider specified in a single element, should split into different list elements"
keyLenErrFmt = "secret is not of the expected length, got %d, expected one of %v"
unsupportedSchemeErrFmt = "unsupported scheme %q for KMS provider, only unix is supported"
atLeastOneRequiredErrFmt = "at least one %s is required"
invalidURLErrFmt = "invalid endpoint for kms provider, error: parse %s: net/url: invalid control character in URL"
mandatoryFieldErrFmt = "%s is a mandatory field for a %s"
base64EncodingErr = "secrets must be base64 encoded"
zeroOrNegativeErrFmt = "%s should be a positive value"
nonZeroErrFmt = "%s should be a positive value, or negative to disable"
encryptionConfigNilErr = "EncryptionConfiguration can't be nil"
moreThanOneElementErr = "more than one provider specified in a single element, should split into different list elements"
keyLenErrFmt = "secret is not of the expected length, got %d, expected one of %v"
unsupportedSchemeErrFmt = "unsupported scheme %q for KMS provider, only unix is supported"
unsupportedKMSAPIVersionErrFmt = "unsupported apiVersion %s for KMS provider, only v1 and v2 are supported"
atLeastOneRequiredErrFmt = "at least one %s is required"
invalidURLErrFmt = "invalid endpoint for kms provider, error: parse %s: net/url: invalid control character in URL"
mandatoryFieldErrFmt = "%s is a mandatory field for a %s"
base64EncodingErr = "secrets must be base64 encoded"
zeroOrNegativeErrFmt = "%s should be a positive value"
nonZeroErrFmt = "%s should be a positive value, or negative to disable"
encryptionConfigNilErr = "EncryptionConfiguration can't be nil"
invalidKMSConfigNameErrFmt = "invalid KMS provider name %s, must not contain ':'"
)
var (
@ -174,12 +177,12 @@ func validateKey(key config.Key, fieldPath *field.Path, expectedLen []int) field
func validateKMSConfiguration(c *config.KMSConfiguration, fieldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if c.Name == "" {
allErrs = append(allErrs, field.Required(fieldPath.Child("name"), fmt.Sprintf(mandatoryFieldErrFmt, "name", "provider")))
}
allErrs = append(allErrs, validateKMSConfigName(c, fieldPath.Child("name"))...)
allErrs = append(allErrs, validateKMSTimeout(c, fieldPath.Child("timeout"))...)
allErrs = append(allErrs, validateKMSEndpoint(c, fieldPath.Child("endpoint"))...)
allErrs = append(allErrs, validateKMSCacheSize(c, fieldPath.Child("cachesize"))...)
allErrs = append(allErrs, validateKMSAPIVersion(c, fieldPath.Child("apiVersion"))...)
return allErrs
}
@ -218,3 +221,25 @@ func validateKMSEndpoint(c *config.KMSConfiguration, fieldPath *field.Path) fiel
return allErrs
}
func validateKMSAPIVersion(c *config.KMSConfiguration, fieldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if c.APIVersion != "v1" && c.APIVersion != "v2" {
allErrs = append(allErrs, field.Invalid(fieldPath, c.APIVersion, fmt.Sprintf(unsupportedKMSAPIVersionErrFmt, "apiVersion")))
}
return allErrs
}
func validateKMSConfigName(c *config.KMSConfiguration, fieldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if c.Name == "" {
allErrs = append(allErrs, field.Required(fieldPath, fmt.Sprintf(mandatoryFieldErrFmt, "name", "provider")))
}
if c.APIVersion != "v1" && strings.Contains(c.Name, ":") {
allErrs = append(allErrs, field.Invalid(fieldPath, c.Name, fmt.Sprintf(invalidKMSConfigNameErrFmt, c.Name)))
}
return allErrs
}

View File

@ -350,3 +350,79 @@ func TestKMSProviderCacheSize(t *testing.T) {
})
}
}
func TestKMSProviderAPIVersion(t *testing.T) {
apiVersionField := field.NewPath("Resource").Index(0).Child("Provider").Index(0).Child("KMS").Child("APIVersion")
testCases := []struct {
desc string
in *config.KMSConfiguration
want field.ErrorList
}{
{
desc: "valid v1 api version",
in: &config.KMSConfiguration{APIVersion: "v1"},
want: field.ErrorList{},
},
{
desc: "valid v2 api version",
in: &config.KMSConfiguration{APIVersion: "v2"},
want: field.ErrorList{},
},
{
desc: "invalid api version",
in: &config.KMSConfiguration{APIVersion: "v3"},
want: field.ErrorList{
field.Invalid(apiVersionField, "v3", fmt.Sprintf(unsupportedKMSAPIVersionErrFmt, "apiVersion")),
},
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
got := validateKMSAPIVersion(tt.in, apiVersionField)
if d := cmp.Diff(tt.want, got); d != "" {
t.Fatalf("KMS Provider validation mismatch (-want +got):\n%s", d)
}
})
}
}
func TestKMSProviderName(t *testing.T) {
nameField := field.NewPath("Resource").Index(0).Child("Provider").Index(0).Child("KMS").Child("name")
testCases := []struct {
desc string
in *config.KMSConfiguration
want field.ErrorList
}{
{
desc: "valid name",
in: &config.KMSConfiguration{Name: "foo"},
want: field.ErrorList{},
},
{
desc: "empty name",
in: &config.KMSConfiguration{},
want: field.ErrorList{
field.Required(nameField, fmt.Sprintf(mandatoryFieldErrFmt, "name", "provider")),
},
},
{
desc: "invalid name with :",
in: &config.KMSConfiguration{Name: "foo:bar"},
want: field.ErrorList{
field.Invalid(nameField, "foo:bar", fmt.Sprintf(invalidKMSConfigNameErrFmt, "foo:bar")),
},
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
got := validateKMSConfigName(tt.in, nameField)
if d := cmp.Diff(tt.want, got); d != "" {
t.Fatalf("KMS Provider validation mismatch (-want +got):\n%s", d)
}
})
}
}

View File

@ -107,6 +107,13 @@ const (
// Allows for updating watchcache resource version with progress notify events.
EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption"
// owner: @aramase
// kep: http://kep.k8s.io/3299
// alpha: v1.25
//
// Enables KMS v2 API for encryption at rest.
KMSv2 featuregate.Feature = "KMSv2"
// owner: @jiahuif
// kep: http://kep.k8s.io/2887
// alpha: v1.23
@ -205,6 +212,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
EfficientWatchResumption: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
KMSv2: {Default: false, PreRelease: featuregate.Alpha},
OpenAPIEnums: {Default: true, PreRelease: featuregate.Beta},
OpenAPIV3: {Default: true, PreRelease: featuregate.Beta},

View File

@ -37,12 +37,15 @@ import (
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1"
"k8s.io/apiserver/pkg/apis/config/validation"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/storage/value"
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope"
envelopekmsv2 "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
"k8s.io/apiserver/pkg/storage/value/encrypt/secretbox"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
const (
@ -50,8 +53,11 @@ const (
aesGCMTransformerPrefixV1 = "k8s:enc:aesgcm:v1:"
secretboxTransformerPrefixV1 = "k8s:enc:secretbox:v1:"
kmsTransformerPrefixV1 = "k8s:enc:kms:v1:"
kmsTransformerPrefixV2 = "k8s:enc:kms:v2:"
kmsPluginHealthzNegativeTTL = 3 * time.Second
kmsPluginHealthzPositiveTTL = 20 * time.Second
kmsAPIVersionV1 = "v1"
kmsAPIVersionV2 = "v2"
)
type kmsPluginHealthzResponse struct {
@ -67,12 +73,26 @@ type kmsPluginProbe struct {
l *sync.Mutex
}
type kmsv2PluginProbe struct {
name string
ttl time.Duration
envelopekmsv2.Service
lastResponse *kmsPluginHealthzResponse
l *sync.Mutex
}
func (h *kmsPluginProbe) toHealthzCheck(idx int) healthz.HealthChecker {
return healthz.NamedCheck(fmt.Sprintf("kms-provider-%d", idx), func(r *http.Request) error {
return h.Check()
})
}
func (p *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker {
return healthz.NamedCheck(fmt.Sprintf("kms-provider-%d", idx), func(r *http.Request) error {
return p.Check()
})
}
// GetKMSPluginHealthzCheckers extracts KMSPluginProbes from the EncryptionConfig.
func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, error) {
f, err := os.Open(filepath)
@ -80,47 +100,79 @@ func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, erro
return nil, fmt.Errorf("error opening encryption provider configuration file %q: %v", filepath, err)
}
defer f.Close()
var result []healthz.HealthChecker
probes, err := getKMSPluginProbes(f)
if err != nil {
return nil, err
}
for i, p := range probes {
probe := p
result = append(result, probe.toHealthzCheck(i))
switch t := probe.(type) {
case *kmsPluginProbe:
result = append(result, t.toHealthzCheck(i))
case *kmsv2PluginProbe:
result = append(result, t.toHealthzCheck(i))
default:
return nil, fmt.Errorf("unsupported KMS plugin type: %T", t)
}
}
return result, nil
}
func getKMSPluginProbes(reader io.Reader) ([]*kmsPluginProbe, error) {
var result []*kmsPluginProbe
func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) {
var result []interface{}
configFileContents, err := ioutil.ReadAll(reader)
if err != nil {
return result, fmt.Errorf("could not read content of encryption provider configuration: %v", err)
return nil, fmt.Errorf("could not read content of encryption provider configuration: %v", err)
}
config, err := loadConfig(configFileContents)
if err != nil {
return result, fmt.Errorf("error while parsing encryption provider configuration: %v", err)
return nil, fmt.Errorf("error while parsing encryption provider configuration: %v", err)
}
for _, r := range config.Resources {
for _, p := range r.Providers {
if p.KMS != nil {
s, err := envelope.NewGRPCService(p.KMS.Endpoint, p.KMS.Timeout.Duration)
if err != nil {
return nil, fmt.Errorf("could not configure KMS-Plugin's probe %q, error: %v", p.KMS.Name, err)
}
switch p.KMS.APIVersion {
case kmsAPIVersionV1:
s, err := envelope.NewGRPCService(p.KMS.Endpoint, p.KMS.Timeout.Duration)
if err != nil {
return nil, fmt.Errorf("could not configure KMSv1-Plugin's probe %q, error: %v", p.KMS.Name, err)
}
result = append(result, &kmsPluginProbe{
name: p.KMS.Name,
ttl: kmsPluginHealthzNegativeTTL,
Service: s,
l: &sync.Mutex{},
lastResponse: &kmsPluginHealthzResponse{},
})
result = append(result, &kmsPluginProbe{
name: p.KMS.Name,
ttl: kmsPluginHealthzNegativeTTL,
Service: s,
l: &sync.Mutex{},
lastResponse: &kmsPluginHealthzResponse{},
})
case kmsAPIVersionV2:
if !utilfeature.DefaultFeatureGate.Enabled(features.KMSv2) {
return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, KMSv2 feature is not enabled", p.KMS.Name)
}
s, err := envelopekmsv2.NewGRPCService(p.KMS.Endpoint, p.KMS.Timeout.Duration)
if err != nil {
return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %v", p.KMS.Name, err)
}
result = append(result, &kmsv2PluginProbe{
name: p.KMS.Name,
ttl: kmsPluginHealthzNegativeTTL,
Service: s,
l: &sync.Mutex{},
lastResponse: &kmsPluginHealthzResponse{},
})
default:
return nil, fmt.Errorf("could not configure KMS Plugin's probe %q, unsupported KMS API version %q", p.KMS.Name, p.KMS.APIVersion)
}
}
}
}
@ -155,6 +207,53 @@ func (h *kmsPluginProbe) Check() error {
return nil
}
// Check gets the healthz status of the KMSv2-Plugin using the Status() method.
func (h *kmsv2PluginProbe) Check() error {
h.l.Lock()
defer h.l.Unlock()
if (time.Since(h.lastResponse.received)) < h.ttl {
return h.lastResponse.err
}
ctx := context.Background()
p, err := h.Service.Status(ctx)
if err != nil {
h.lastResponse = &kmsPluginHealthzResponse{err: err, received: time.Now()}
h.ttl = kmsPluginHealthzNegativeTTL
return fmt.Errorf("failed to perform status section of the healthz check for KMS Provider %s, error: %v", h.name, err)
}
if err := isKMSv2ProviderHealthy(h.name, p); err != nil {
h.lastResponse = &kmsPluginHealthzResponse{err: err, received: time.Now()}
h.ttl = kmsPluginHealthzNegativeTTL
return err
}
h.lastResponse = &kmsPluginHealthzResponse{err: nil, received: time.Now()}
h.ttl = kmsPluginHealthzPositiveTTL
return nil
}
// isKMSv2ProviderHealthy checks if the KMSv2-Plugin is healthy.
func isKMSv2ProviderHealthy(name string, response *envelopekmsv2.StatusResponse) error {
var errs []error
if response.Healthz != "ok" {
errs = append(errs, fmt.Errorf("got unexpected healthz status: %s", response.Healthz))
}
if response.Version != envelopekmsv2.KMSAPIVersion {
errs = append(errs, fmt.Errorf("expected KMSv2 API version %s, got %s", envelopekmsv2.KMSAPIVersion, response.Version))
}
if len(response.KeyID) == 0 {
errs = append(errs, fmt.Errorf("expected KMSv2 KeyID to be set, got %s", response.KeyID))
}
if err := utilerrors.Reduce(utilerrors.NewAggregate(errs)); err != nil {
return fmt.Errorf("kmsv2 Provider %s is not healthy, error: %v", name, err)
}
return nil
}
// GetTransformerOverrides returns the transformer overrides by reading and parsing the encryption provider configuration file
func GetTransformerOverrides(filepath string) (map[schema.GroupResource]value.Transformer, error) {
f, err := os.Open(filepath)
@ -224,8 +323,13 @@ func loadConfig(data []byte) (*apiserverconfig.EncryptionConfiguration, error) {
return config, validation.ValidateEncryptionConfiguration(config).ToAggregate()
}
// The factory to create kms service. This is to make writing test easier.
var envelopeServiceFactory = envelope.NewGRPCService
var (
// The factory to create kms service. This is to make writing test easier.
envelopeServiceFactory = envelope.NewGRPCService
// The factory to create kmsv2 service.
envelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService
)
func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value.PrefixTransformer, error) {
var result []value.PrefixTransformer
@ -243,13 +347,26 @@ func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value.
case provider.Secretbox != nil:
transformer, err = secretboxPrefixTransformer(provider.Secretbox)
case provider.KMS != nil:
var envelopeService envelope.Service
envelopeService, err = envelopeServiceFactory(provider.KMS.Endpoint, provider.KMS.Timeout.Duration)
if err != nil {
return nil, fmt.Errorf("could not configure KMS plugin %q, error: %v", provider.KMS.Name, err)
}
switch provider.KMS.APIVersion {
case kmsAPIVersionV1:
var envelopeService envelope.Service
if envelopeService, err = envelopeServiceFactory(provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil {
return nil, fmt.Errorf("could not configure KMS plugin %q, error: %v", provider.KMS.Name, err)
}
transformer, err = envelopePrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV1)
case kmsAPIVersionV2:
if !utilfeature.DefaultFeatureGate.Enabled(features.KMSv2) {
return nil, fmt.Errorf("could not configure KMSv2 plugin %q, KMSv2 feature is not enabled", provider.KMS.Name)
}
transformer, err = envelopePrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV1)
var envelopeService envelopekmsv2.Service
if envelopeService, err = envelopeKMSv2ServiceFactory(provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil {
return nil, fmt.Errorf("could not configure KMSv2 plugin %q, error: %v", provider.KMS.Name, err)
}
transformer, err = envelopekmsv2PrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV2)
default:
return nil, fmt.Errorf("could not configure KMS plugin %q, unsupported KMS API version %q", provider.KMS.Name, provider.KMS.APIVersion)
}
case provider.Identity != nil:
transformer = value.PrefixTransformer{
Transformer: identity.NewEncryptCheckTransformer(),
@ -385,6 +502,18 @@ func envelopePrefixTransformer(config *apiserverconfig.KMSConfiguration, envelop
}, nil
}
func envelopekmsv2PrefixTransformer(config *apiserverconfig.KMSConfiguration, envelopeService envelopekmsv2.Service, prefix string) (value.PrefixTransformer, error) {
// using AES-GCM by default for encrypting data with KMSv2
envelopeTransformer, err := envelopekmsv2.NewEnvelopeTransformer(envelopeService, int(*config.CacheSize), aestransformer.NewGCMTransformer)
if err != nil {
return value.PrefixTransformer{}, err
}
return value.PrefixTransformer{
Transformer: envelopeTransformer,
Prefix: []byte(prefix + config.Name + ":"),
}, nil
}
type unionTransformers []value.Transformer
func (u unionTransformers) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, stale bool, err error) {

View File

@ -32,8 +32,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope"
envelopekmsv2 "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
const (
@ -81,6 +85,36 @@ func (t *testEnvelopeService) Encrypt(data []byte) ([]byte, error) {
return []byte(base64.StdEncoding.EncodeToString(data)), nil
}
// testKMSv2EnvelopeService is a mock kmsv2 envelope service which can be used to simulate remote Envelope v2 services
// for testing of the envelope transformer with other transformers.
type testKMSv2EnvelopeService struct {
err error
}
func (t *testKMSv2EnvelopeService) Decrypt(ctx context.Context, uid string, req *envelopekmsv2.DecryptRequest) ([]byte, error) {
if t.err != nil {
return nil, t.err
}
return base64.StdEncoding.DecodeString(string(req.Ciphertext))
}
func (t *testKMSv2EnvelopeService) Encrypt(ctx context.Context, uid string, data []byte) (*envelopekmsv2.EncryptResponse, error) {
if t.err != nil {
return nil, t.err
}
return &envelopekmsv2.EncryptResponse{
Ciphertext: []byte(base64.StdEncoding.EncodeToString(data)),
KeyID: "1",
}, nil
}
func (t *testKMSv2EnvelopeService) Status(ctx context.Context) (*envelopekmsv2.StatusResponse, error) {
if t.err != nil {
return nil, t.err
}
return &envelopekmsv2.StatusResponse{Healthz: "ok", KeyID: "1", Version: "v2alpha1"}, nil
}
// The factory method to create mock envelope service.
func newMockEnvelopeService(endpoint string, timeout time.Duration) (envelope.Service, error) {
return &testEnvelopeService{nil}, nil
@ -91,6 +125,16 @@ func newMockErrorEnvelopeService(endpoint string, timeout time.Duration) (envelo
return &testEnvelopeService{errors.New("test")}, nil
}
// The factory method to create mock envelope kmsv2 service.
func newMockEnvelopeKMSv2Service(endpoint string, timeout time.Duration) (envelopekmsv2.Service, error) {
return &testKMSv2EnvelopeService{nil}, nil
}
// The factory method to create mock envelope kmsv2 service which always returns error.
func newMockErrorEnvelopeKMSv2Service(endpoint string, timeout time.Duration) (envelopekmsv2.Service, error) {
return &testKMSv2EnvelopeService{errors.New("test")}, nil
}
func TestLegacyConfig(t *testing.T) {
legacyV1Config := "testdata/valid-configs/legacy.yaml"
legacyConfigObject, err := loadConfig(mustReadConfig(t, legacyV1Config))
@ -112,10 +156,11 @@ func TestLegacyConfig(t *testing.T) {
},
}},
{KMS: &apiserverconfig.KMSConfiguration{
Name: "testprovider",
Endpoint: "unix:///tmp/testprovider.sock",
CacheSize: &cacheSize,
Timeout: &metav1.Duration{Duration: 3 * time.Second},
APIVersion: "v1",
Name: "testprovider",
Endpoint: "unix:///tmp/testprovider.sock",
CacheSize: &cacheSize,
Timeout: &metav1.Duration{Duration: 3 * time.Second},
}},
{AESCBC: &apiserverconfig.AESConfiguration{
Keys: []apiserverconfig.Key{
@ -138,11 +183,15 @@ func TestLegacyConfig(t *testing.T) {
}
func TestEncryptionProviderConfigCorrect(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
// Set factory for mock envelope service
factory := envelopeServiceFactory
factoryKMSv2 := envelopeKMSv2ServiceFactory
envelopeServiceFactory = newMockEnvelopeService
envelopeKMSv2ServiceFactory = newMockEnvelopeKMSv2Service
defer func() {
envelopeServiceFactory = factory
envelopeKMSv2ServiceFactory = factoryKMSv2
}()
// Creates compound/prefix transformers with different ordering of available transformers.
@ -178,12 +227,19 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSFirst)
}
correctConfigWithKMSv2First := "testdata/valid-configs/kmsv2-first.yaml"
kmsv2FirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSv2First))
if err != nil {
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSv2First)
}
// Pick the transformer for any of the returned resources.
identityFirstTransformer := identityFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
aesGcmFirstTransformer := aesGcmFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
aesCbcFirstTransformer := aesCbcFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
secretboxFirstTransformer := secretboxFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
kmsFirstTransformer := kmsFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
kmsv2FirstTransformer := kmsv2FirstTransformerOverrides[schema.ParseGroupResource("secrets")]
ctx := context.Background()
dataCtx := value.DefaultContext([]byte(sampleContextText))
@ -198,6 +254,7 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
{secretboxFirstTransformer, "secretboxFirst"},
{identityFirstTransformer, "identityFirst"},
{kmsFirstTransformer, "kmsFirst"},
{kmsv2FirstTransformer, "kmvs2First"},
}
for _, testCase := range transformers {
@ -222,22 +279,28 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
}
func TestKMSPluginHealthz(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
service, err := envelope.NewGRPCService("unix:///tmp/testprovider.sock", 3*time.Second)
if err != nil {
t.Fatalf("Could not initialize envelopeService, error: %v", err)
}
serviceKMSv2, err := envelopekmsv2.NewGRPCService("unix:///tmp/testprovider.sock", 3*time.Second)
if err != nil {
t.Fatalf("Could not initialize kmsv2 envelopeService, error: %v", err)
}
testCases := []struct {
desc string
config string
want []*kmsPluginProbe
want []interface{}
wantErr bool
}{
{
desc: "Install Healthz",
config: "testdata/valid-configs/kms/default-timeout.yaml",
want: []*kmsPluginProbe{
{
want: []interface{}{
&kmsPluginProbe{
name: "foo",
Service: service,
},
@ -246,12 +309,12 @@ func TestKMSPluginHealthz(t *testing.T) {
{
desc: "Install multiple healthz",
config: "testdata/valid-configs/kms/multiple-providers.yaml",
want: []*kmsPluginProbe{
{
want: []interface{}{
&kmsPluginProbe{
name: "foo",
Service: service,
},
{
&kmsPluginProbe{
name: "bar",
Service: service,
},
@ -261,6 +324,26 @@ func TestKMSPluginHealthz(t *testing.T) {
desc: "No KMS Providers",
config: "testdata/valid-configs/aes/aes-gcm.yaml",
},
{
desc: "Install multiple healthz with v1 and v2",
config: "testdata/valid-configs/kms/multiple-providers-kmsv2.yaml",
want: []interface{}{
&kmsv2PluginProbe{
name: "foo",
Service: serviceKMSv2,
},
&kmsPluginProbe{
name: "bar",
Service: service,
},
},
},
{
desc: "Invalid API version",
config: "testdata/invalid-configs/kms/invalid-apiversion.yaml",
want: nil,
wantErr: true,
},
}
for _, tt := range testCases {
@ -270,7 +353,7 @@ func TestKMSPluginHealthz(t *testing.T) {
t.Fatalf("got %v, want nil for error", err)
}
if d := cmp.Diff(tt.want, got, cmp.Comparer(serviceComparer)); d != "" {
if d := cmp.Diff(tt.want, got, cmp.Comparer(serviceComparer), cmp.Comparer(serviceKMSv2Comparer)); d != "" {
t.Fatalf("HealthzConfig mismatch (-want +got):\n%s", d)
}
})
@ -320,12 +403,59 @@ func TestKMSPluginHealthzTTL(t *testing.T) {
}
}
func TestKMSv2PluginHealthzTTL(t *testing.T) {
service, _ := newMockEnvelopeKMSv2Service("unix:///tmp/testprovider.sock", 3*time.Second)
errService, _ := newMockErrorEnvelopeKMSv2Service("unix:///tmp/testprovider.sock", 3*time.Second)
testCases := []struct {
desc string
probe *kmsv2PluginProbe
wantTTL time.Duration
}{
{
desc: "kmsv2 provider in good state",
probe: &kmsv2PluginProbe{
name: "test",
ttl: kmsPluginHealthzNegativeTTL,
Service: service,
l: &sync.Mutex{},
lastResponse: &kmsPluginHealthzResponse{},
},
wantTTL: kmsPluginHealthzPositiveTTL,
},
{
desc: "kmsv2 provider in bad state",
probe: &kmsv2PluginProbe{
name: "test",
ttl: kmsPluginHealthzPositiveTTL,
Service: errService,
l: &sync.Mutex{},
lastResponse: &kmsPluginHealthzResponse{},
},
wantTTL: kmsPluginHealthzNegativeTTL,
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
tt.probe.Check()
if tt.probe.ttl != tt.wantTTL {
t.Fatalf("want ttl %v, got ttl %v", tt.wantTTL, tt.probe.ttl)
}
})
}
}
// As long as got and want contain envelope.Service we will return true.
// If got has an envelope.Service and want does note (or vice versa) this will return false.
func serviceComparer(_, _ envelope.Service) bool {
return true
}
func serviceKMSv2Comparer(_, _ envelopekmsv2.Service) bool {
return true
}
func TestCBCKeyRotationWithOverlappingProviders(t *testing.T) {
testCBCKeyRotationWithProviders(
t,
@ -413,3 +543,38 @@ func getTransformerFromEncryptionConfig(t *testing.T, encryptionConfigPath strin
}
panic("unreachable")
}
func TestIsKMSv2ProviderHealthyError(t *testing.T) {
testCases := []struct {
desc string
statusResponse *envelopekmsv2.StatusResponse
}{
{
desc: "healthz status is not ok",
statusResponse: &envelopekmsv2.StatusResponse{
Healthz: "unhealthy",
},
},
{
desc: "version is not v2alpha1",
statusResponse: &envelopekmsv2.StatusResponse{
Version: "v1beta1",
},
},
{
desc: "missing keyID",
statusResponse: &envelopekmsv2.StatusResponse{
Healthz: "ok",
Version: "v2alpha1",
},
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
if err := isKMSv2ProviderHealthy("testplugin", tt.statusResponse); err == nil {
t.Fatalf("isKMSv2ProviderHealthy() should have returned an error")
}
})
}
}

View File

@ -0,0 +1,15 @@
kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- secrets
providers:
- kms:
apiVersion: v3
name: foo
endpoint: unix:///tmp/testprovider.sock
timeout: 15s
- kms:
name: bar
endpoint: unix:///tmp/testprovider.sock
timeout: 15s

View File

@ -14,6 +14,11 @@ resources:
name: testprovider
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- kms:
apiVersion: v2
name: testproviderv2
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- identity: {}
- secretbox:
keys:

View File

@ -18,6 +18,11 @@ resources:
name: testprovider
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- kms:
apiVersion: v2
name: testproviderv2
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- aescbc:
keys:
- name: key1

View File

@ -16,6 +16,11 @@ resources:
name: testprovider
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- kms:
apiVersion: v2
name: testproviderv2
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- aescbc:
keys:
- name: key1

View File

@ -8,6 +8,11 @@ resources:
name: testprovider
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- kms:
apiVersion: v2
name: testproviderv2
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- secretbox:
keys:
- name: key1

View File

@ -0,0 +1,15 @@
kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- secrets
providers:
- kms:
apiVersion: v2
name: foo
endpoint: unix:///tmp/testprovider.sock
timeout: 15s
- kms:
name: bar
endpoint: unix:///tmp/testprovider.sock
timeout: 15s

View File

@ -0,0 +1,32 @@
kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- secrets
providers:
- kms:
apiVersion: v2
name: testproviderv2
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- kms:
name: testprovider
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- secretbox:
keys:
- name: key1
secret: YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=
- aescbc:
keys:
- name: key1
secret: c2VjcmV0IGlzIHNlY3VyZQ==
- name: key2
secret: dGhpcyBpcyBwYXNzd29yZA==
- identity: {}
- aesgcm:
keys:
- name: key1
secret: c2VjcmV0IGlzIHNlY3VyZQ==
- name: key2
secret: dGhpcyBpcyBwYXNzd29yZA==

View File

@ -18,6 +18,11 @@ resources:
name: testprovider
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- kms:
apiVersion: v2
name: testproviderv2
endpoint: unix:///tmp/testprovider.sock
cachesize: 10
- identity: {}
- aesgcm:
keys:

View File

@ -27,6 +27,7 @@ import (
"time"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics"
"k8s.io/utils/lru"
"golang.org/x/crypto/cryptobyte"
@ -34,7 +35,7 @@ import (
func init() {
value.RegisterMetrics()
registerMetrics()
metrics.RegisterMetrics()
}
// Service allows encrypting and decrypting data using an external Key Management Service.
@ -81,7 +82,7 @@ func NewEnvelopeTransformer(envelopeService Service, cacheSize int, baseTransfor
// TransformFromStorage decrypts data encrypted by this transformer using envelope encryption.
func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
recordArrival(fromStorageLabel, time.Now())
metrics.RecordArrival(metrics.FromStorageLabel, time.Now())
// Read the 16 bit length-of-DEK encoded at the start of the encrypted DEK. 16 bits can
// represent a maximum key length of 65536 bytes. We are using a 256 bit key, whose
@ -119,7 +120,7 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
// TransformToStorage encrypts data to be written to disk using envelope encryption.
func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
recordArrival(toStorageLabel, time.Now())
metrics.RecordArrival(metrics.ToStorageLabel, time.Now())
newKey, err := generateKey(32)
if err != nil {
return nil, err
@ -165,7 +166,7 @@ func (t *envelopeTransformer) addTransformer(encKey []byte, key []byte) (value.T
// cannot hash []uint8.
if t.cacheEnabled {
t.transformers.Add(base64.StdEncoding.EncodeToString(encKey), transformer)
dekCacheFillPercent.Set(float64(t.transformers.Len()) / float64(t.cacheSize))
metrics.RecordDekCacheFillPercent(float64(t.transformers.Len()) / float64(t.cacheSize))
}
return transformer, nil
}

View File

@ -21,8 +21,6 @@ import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"
@ -31,13 +29,13 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util"
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1"
)
const (
// Now only supported unix domain socket.
// unixProtocol is the only supported protocol for remote KMS provider.
unixProtocol = "unix"
// Current version for the protocol interface definition.
kmsapiVersion = "v1beta1"
@ -57,7 +55,7 @@ type gRPCService struct {
func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) {
klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint)
addr, err := parseEndpoint(endpoint)
addr, err := util.ParseEndpoint(endpoint)
if err != nil {
return nil, err
}
@ -89,32 +87,6 @@ func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error)
return s, nil
}
// Parse the endpoint to extract schema, host or path.
func parseEndpoint(endpoint string) (string, error) {
if len(endpoint) == 0 {
return "", fmt.Errorf("remote KMS provider can't use empty string as endpoint")
}
u, err := url.Parse(endpoint)
if err != nil {
return "", fmt.Errorf("invalid endpoint %q for remote KMS provider, error: %v", endpoint, err)
}
if u.Scheme != unixProtocol {
return "", fmt.Errorf("unsupported scheme %q for remote KMS provider", u.Scheme)
}
// Linux abstract namespace socket - no physical file required
// Warning: Linux Abstract sockets have not concept of ACL (unlike traditional file based sockets).
// However, Linux Abstract sockets are subject to Linux networking namespace, so will only be accessible to
// containers within the same pod (unless host networking is used).
if strings.HasPrefix(u.Path, "/@") {
return strings.TrimPrefix(u.Path, "/"), nil
}
return u.Path, nil
}
func (g *gRPCService) checkAPIVersion(ctx context.Context) error {
g.mux.Lock()
defer g.mux.Unlock()

View File

@ -27,7 +27,7 @@ import (
"testing"
"time"
mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing"
mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v1beta1"
"k8s.io/apimachinery/pkg/util/uuid"
)

View File

@ -0,0 +1,246 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package kmsv2 transforms values for storage at rest using a Envelope v2 provider
package kmsv2
import (
"context"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"fmt"
"time"
"github.com/gogo/protobuf/proto"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/storage/value"
kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics"
"k8s.io/utils/lru"
)
const (
// KMSAPIVersion is the version of the KMS API.
KMSAPIVersion = "v2alpha1"
)
// Service allows encrypting and decrypting data using an external Key Management Service.
type Service interface {
// Decrypt a given bytearray to obtain the original data as bytes.
Decrypt(ctx context.Context, uid string, req *DecryptRequest) ([]byte, error)
// Encrypt bytes to a ciphertext.
Encrypt(ctx context.Context, uid string, data []byte) (*EncryptResponse, error)
// Status returns the status of the KMS.
Status(ctx context.Context) (*StatusResponse, error)
}
type envelopeTransformer struct {
envelopeService Service
// transformers is a thread-safe LRU cache which caches decrypted DEKs indexed by their encrypted form.
transformers *lru.Cache
// baseTransformerFunc creates a new transformer for encrypting the data with the DEK.
baseTransformerFunc func(cipher.Block) value.Transformer
cacheSize int
cacheEnabled bool
pluginName string
}
// EncryptResponse is the response from the Envelope service when encrypting data.
type EncryptResponse struct {
Ciphertext []byte
KeyID string
Annotations map[string][]byte
}
// DecryptRequest is the request to the Envelope service when decrypting data.
type DecryptRequest struct {
Ciphertext []byte
KeyID string
Annotations map[string][]byte
}
// StatusResponse is the response from the Envelope service when getting the status of the service.
type StatusResponse struct {
Version string
Healthz string
KeyID string
}
// NewEnvelopeTransformer returns a transformer which implements a KEK-DEK based envelope encryption scheme.
// It uses envelopeService to encrypt and decrypt DEKs. Respective DEKs (in encrypted form) are prepended to
// the data items they encrypt. A cache (of size cacheSize) is maintained to store the most recently
// used decrypted DEKs in memory.
func NewEnvelopeTransformer(envelopeService Service, cacheSize int, baseTransformerFunc func(cipher.Block) value.Transformer) (value.Transformer, error) {
var cache *lru.Cache
if cacheSize > 0 {
// TODO(aramase): Switch to using expiring cache: kubernetes/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go.
// It handles scans a lot better, doesn't have to be right sized, and don't have a global lock on reads.
cache = lru.New(cacheSize)
}
return &envelopeTransformer{
envelopeService: envelopeService,
transformers: cache,
baseTransformerFunc: baseTransformerFunc,
cacheEnabled: cacheSize > 0,
cacheSize: cacheSize,
}, nil
}
// TransformFromStorage decrypts data encrypted by this transformer using envelope encryption.
func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
metrics.RecordArrival(metrics.FromStorageLabel, time.Now())
// Deserialize the EncryptedObject from the data.
encryptedObject, err := t.doDecode(data)
if err != nil {
return nil, false, err
}
// Look up the decrypted DEK from cache or Envelope.
transformer := t.getTransformer(encryptedObject.EncryptedDEK)
if transformer == nil {
if t.cacheEnabled {
value.RecordCacheMiss()
}
uid := string(uuid.NewUUID())
key, err := t.envelopeService.Decrypt(ctx, uid, &DecryptRequest{
Ciphertext: encryptedObject.EncryptedDEK,
KeyID: encryptedObject.KeyID,
Annotations: encryptedObject.Annotations,
})
if err != nil {
return nil, false, fmt.Errorf("failed to decrypt DEK, error: %w", err)
}
transformer, err = t.addTransformer(encryptedObject.EncryptedDEK, key)
if err != nil {
return nil, false, err
}
}
return transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx)
}
// TransformToStorage encrypts data to be written to disk using envelope encryption.
func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
metrics.RecordArrival(metrics.ToStorageLabel, time.Now())
newKey, err := generateKey(32)
if err != nil {
return nil, err
}
uid := string(uuid.NewUUID())
resp, err := t.envelopeService.Encrypt(ctx, uid, newKey)
if err != nil {
return nil, fmt.Errorf("failed to encrypt DEK, error: %w", err)
}
transformer, err := t.addTransformer(resp.Ciphertext, newKey)
if err != nil {
return nil, err
}
result, err := transformer.TransformToStorage(ctx, data, dataCtx)
if err != nil {
return nil, err
}
encObject := &kmstypes.EncryptedObject{
KeyID: resp.KeyID,
EncryptedDEK: resp.Ciphertext,
EncryptedData: result,
Annotations: resp.Annotations,
}
// Serialize the EncryptedObject to a byte array.
return t.doEncode(encObject)
}
// addTransformer inserts a new transformer to the Envelope cache of DEKs for future reads.
func (t *envelopeTransformer) addTransformer(encKey []byte, key []byte) (value.Transformer, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
transformer := t.baseTransformerFunc(block)
// Use base64 of encKey as the key into the cache because hashicorp/golang-lru
// cannot hash []uint8.
if t.cacheEnabled {
t.transformers.Add(base64.StdEncoding.EncodeToString(encKey), transformer)
metrics.RecordDekCacheFillPercent(float64(t.transformers.Len()) / float64(t.cacheSize))
}
return transformer, nil
}
// getTransformer fetches the transformer corresponding to encKey from cache, if it exists.
func (t *envelopeTransformer) getTransformer(encKey []byte) value.Transformer {
if !t.cacheEnabled {
return nil
}
_transformer, found := t.transformers.Get(base64.StdEncoding.EncodeToString(encKey))
if found {
return _transformer.(value.Transformer)
}
return nil
}
// doEncode encodes the EncryptedObject to a byte array.
func (t *envelopeTransformer) doEncode(request *kmstypes.EncryptedObject) ([]byte, error) {
return proto.Marshal(request)
}
// doDecode decodes the byte array to an EncryptedObject.
func (t *envelopeTransformer) doDecode(originalData []byte) (*kmstypes.EncryptedObject, error) {
o := &kmstypes.EncryptedObject{}
if err := proto.Unmarshal(originalData, o); err != nil {
return nil, err
}
// validate the EncryptedObject
if o.EncryptedData == nil {
return nil, fmt.Errorf("encrypted data is nil after unmarshal")
}
if o.KeyID == "" {
return nil, fmt.Errorf("keyID is empty after unmarshal")
}
if o.EncryptedDEK == nil {
return nil, fmt.Errorf("encrypted dek is nil after unmarshal")
}
return o, nil
}
// generateKey generates a random key using system randomness.
func generateKey(length int) (key []byte, err error) {
defer func(start time.Time) {
value.RecordDataKeyGeneration(start, err)
}(time.Now())
key = make([]byte, length)
if _, err = rand.Read(key); err != nil {
return nil, err
}
return key, nil
}

View File

@ -0,0 +1,262 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package kmsv2 transforms values for storage at rest using a Envelope v2 provider
package kmsv2
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"reflect"
"strconv"
"testing"
"k8s.io/apiserver/pkg/storage/value"
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1"
)
const (
testText = "abcdefghijklmnopqrstuvwxyz"
testContextText = "0123456789"
testEnvelopeCacheSize = 10
)
// testEnvelopeService is a mock Envelope service which can be used to simulate remote Envelope services
// for testing of Envelope based encryption providers.
type testEnvelopeService struct {
disabled bool
keyVersion string
}
func (t *testEnvelopeService) Decrypt(ctx context.Context, uid string, req *DecryptRequest) ([]byte, error) {
if t.disabled {
return nil, fmt.Errorf("Envelope service was disabled")
}
if len(uid) == 0 {
return nil, fmt.Errorf("uid is required")
}
if len(req.KeyID) == 0 {
return nil, fmt.Errorf("keyID is required")
}
return base64.StdEncoding.DecodeString(string(req.Ciphertext))
}
func (t *testEnvelopeService) Encrypt(ctx context.Context, uid string, data []byte) (*EncryptResponse, error) {
if t.disabled {
return nil, fmt.Errorf("Envelope service was disabled")
}
if len(uid) == 0 {
return nil, fmt.Errorf("uid is required")
}
return &EncryptResponse{Ciphertext: []byte(base64.StdEncoding.EncodeToString(data)), KeyID: t.keyVersion, Annotations: map[string][]byte{"kms.kubernetes.io/local-kek": []byte("encrypted-local-kek")}}, nil
}
func (t *testEnvelopeService) Status(ctx context.Context) (*StatusResponse, error) {
if t.disabled {
return nil, fmt.Errorf("Envelope service was disabled")
}
return &StatusResponse{KeyID: t.keyVersion}, nil
}
func (t *testEnvelopeService) SetDisabledStatus(status bool) {
t.disabled = status
}
func (t *testEnvelopeService) Rotate() {
i, _ := strconv.Atoi(t.keyVersion)
t.keyVersion = strconv.FormatInt(int64(i+1), 10)
}
func newTestEnvelopeService() *testEnvelopeService {
return &testEnvelopeService{
keyVersion: "1",
}
}
// Throw error if Envelope transformer tries to contact Envelope without hitting cache.
func TestEnvelopeCaching(t *testing.T) {
testCases := []struct {
desc string
cacheSize int
simulateKMSPluginFailure bool
}{
{
desc: "positive cache size should withstand plugin failure",
cacheSize: 1000,
simulateKMSPluginFailure: true,
},
{
desc: "cache disabled size should not withstand plugin failure",
cacheSize: 0,
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
envelopeService := newTestEnvelopeService()
envelopeTransformer, err := NewEnvelopeTransformer(envelopeService, tt.cacheSize, aestransformer.NewGCMTransformer)
if err != nil {
t.Fatalf("failed to initialize envelope transformer: %v", err)
}
ctx := context.Background()
dataCtx := value.DefaultContext([]byte(testContextText))
originalText := []byte(testText)
transformedData, err := envelopeTransformer.TransformToStorage(ctx, originalText, dataCtx)
if err != nil {
t.Fatalf("envelopeTransformer: error while transforming data to storage: %s", err)
}
untransformedData, _, err := envelopeTransformer.TransformFromStorage(ctx, transformedData, dataCtx)
if err != nil {
t.Fatalf("could not decrypt Envelope transformer's encrypted data even once: %v", err)
}
if !bytes.Equal(untransformedData, originalText) {
t.Fatalf("envelopeTransformer transformed data incorrectly. Expected: %v, got %v", originalText, untransformedData)
}
envelopeService.SetDisabledStatus(tt.simulateKMSPluginFailure)
// Subsequent read for the same data should work fine due to caching.
untransformedData, _, err = envelopeTransformer.TransformFromStorage(ctx, transformedData, dataCtx)
if err != nil {
t.Fatalf("could not decrypt Envelope transformer's encrypted data using just cache: %v", err)
}
if !bytes.Equal(untransformedData, originalText) {
t.Fatalf("envelopeTransformer transformed data incorrectly using cache. Got: %v, want %v", untransformedData, originalText)
}
})
}
}
// Makes Envelope transformer hit cache limit, throws error if it misbehaves.
func TestEnvelopeCacheLimit(t *testing.T) {
envelopeTransformer, err := NewEnvelopeTransformer(newTestEnvelopeService(), testEnvelopeCacheSize, aestransformer.NewGCMTransformer)
if err != nil {
t.Fatalf("failed to initialize envelope transformer: %v", err)
}
ctx := context.Background()
dataCtx := value.DefaultContext([]byte(testContextText))
transformedOutputs := map[int][]byte{}
// Overwrite lots of entries in the map
for i := 0; i < 2*testEnvelopeCacheSize; i++ {
numberText := []byte(strconv.Itoa(i))
res, err := envelopeTransformer.TransformToStorage(ctx, numberText, dataCtx)
transformedOutputs[i] = res
if err != nil {
t.Fatalf("envelopeTransformer: error while transforming data (%v) to storage: %s", numberText, err)
}
}
// Try reading all the data now, ensuring cache misses don't cause a concern.
for i := 0; i < 2*testEnvelopeCacheSize; i++ {
numberText := []byte(strconv.Itoa(i))
output, _, err := envelopeTransformer.TransformFromStorage(ctx, transformedOutputs[i], dataCtx)
if err != nil {
t.Fatalf("envelopeTransformer: error while transforming data (%v) from storage: %s", transformedOutputs[i], err)
}
if !bytes.Equal(numberText, output) {
t.Fatalf("envelopeTransformer transformed data incorrectly using cache. Expected: %v, got %v", numberText, output)
}
}
}
func TestEncodeDecode(t *testing.T) {
envelopeTransformer := &envelopeTransformer{
pluginName: "testplugin",
}
obj := &kmstypes.EncryptedObject{
EncryptedData: []byte{0x01, 0x02, 0x03},
KeyID: "1",
EncryptedDEK: []byte{0x04, 0x05, 0x06},
}
data, err := envelopeTransformer.doEncode(obj)
if err != nil {
t.Fatalf("envelopeTransformer: error while encoding data: %s", err)
}
got, err := envelopeTransformer.doDecode(data)
if err != nil {
t.Fatalf("envelopeTransformer: error while decoding data: %s", err)
}
// reset internal field modified by marshaling obj
obj.XXX_sizecache = 0
if !reflect.DeepEqual(got, obj) {
t.Fatalf("envelopeTransformer: decoded data does not match original data. Got: %v, want %v", got, obj)
}
}
func TestDecodeError(t *testing.T) {
et := &envelopeTransformer{
pluginName: "testplugin",
}
testCases := []struct {
desc string
originalData func() []byte
expectedError error
}{
{
desc: "encrypted data is nil",
originalData: func() []byte {
data, _ := et.doEncode(&kmstypes.EncryptedObject{})
return data
},
expectedError: fmt.Errorf("encrypted data is nil after unmarshal"),
},
{
desc: "keyID is nil",
originalData: func() []byte {
data, _ := et.doEncode(&kmstypes.EncryptedObject{
EncryptedData: []byte{0x01, 0x02, 0x03},
})
return data
},
expectedError: fmt.Errorf("keyID is empty after unmarshal"),
},
{
desc: "encrypted dek is nil",
originalData: func() []byte {
data, _ := et.doEncode(&kmstypes.EncryptedObject{
EncryptedData: []byte{0x01, 0x02, 0x03},
KeyID: "1",
})
return data
},
expectedError: fmt.Errorf("encrypted dek is nil after unmarshal"),
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
_, err := et.doDecode(tt.originalData())
if err == nil {
t.Fatalf("envelopeTransformer: expected error while decoding data, got nil")
}
if err.Error() != tt.expectedError.Error() {
t.Fatalf("doDecode() error: expected %v, got %v", tt.expectedError, err)
}
})
}
}

View File

@ -0,0 +1,130 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package kmsv2 transforms values for storage at rest using a Envelope provider
package kmsv2
import (
"context"
"fmt"
"net"
"time"
"k8s.io/klog/v2"
"google.golang.org/grpc"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util"
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1"
)
const (
// unixProtocol is the only supported protocol for remote KMS provider.
unixProtocol = "unix"
)
// The gRPC implementation for envelope.Service.
type gRPCService struct {
kmsClient kmsapi.KeyManagementServiceClient
connection *grpc.ClientConn
callTimeout time.Duration
}
// NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider.
func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) {
klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint)
addr, err := util.ParseEndpoint(endpoint)
if err != nil {
return nil, err
}
s := &gRPCService{callTimeout: callTimeout}
s.connection, err = grpc.Dial(
addr,
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
grpc.WithContextDialer(
func(context.Context, string) (net.Conn, error) {
// Ignoring addr and timeout arguments:
// addr - comes from the closure
c, err := net.DialUnix(unixProtocol, nil, &net.UnixAddr{Name: addr})
if err != nil {
klog.Errorf("failed to create connection to unix socket: %s, error: %v", addr, err)
} else {
klog.V(4).Infof("Successfully dialed Unix socket %v", addr)
}
return c, err
}))
if err != nil {
return nil, fmt.Errorf("failed to create connection to %s, error: %v", endpoint, err)
}
s.kmsClient = kmsapi.NewKeyManagementServiceClient(s.connection)
return s, nil
}
// Decrypt a given data string to obtain the original byte data.
func (g *gRPCService) Decrypt(ctx context.Context, uid string, req *DecryptRequest) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, g.callTimeout)
defer cancel()
request := &kmsapi.DecryptRequest{
Ciphertext: req.Ciphertext,
Uid: uid,
KeyId: req.KeyID,
Annotations: req.Annotations,
}
response, err := g.kmsClient.Decrypt(ctx, request)
if err != nil {
return nil, err
}
return response.Plaintext, nil
}
// Encrypt bytes to a string ciphertext.
func (g *gRPCService) Encrypt(ctx context.Context, uid string, plaintext []byte) (*EncryptResponse, error) {
ctx, cancel := context.WithTimeout(ctx, g.callTimeout)
defer cancel()
request := &kmsapi.EncryptRequest{
Plaintext: plaintext,
Uid: uid,
}
response, err := g.kmsClient.Encrypt(ctx, request)
if err != nil {
return nil, err
}
return &EncryptResponse{
Ciphertext: response.Ciphertext,
KeyID: response.KeyId,
Annotations: response.Annotations,
}, nil
}
// Status returns the status of the KMSv2 provider.
func (g *gRPCService) Status(ctx context.Context) (*StatusResponse, error) {
ctx, cancel := context.WithTimeout(ctx, g.callTimeout)
defer cancel()
request := &kmsapi.StatusRequest{}
response, err := g.kmsClient.Status(ctx, request)
if err != nil {
return nil, err
}
return &StatusResponse{Version: response.Version, Healthz: response.Healthz, KeyID: response.KeyId}, nil
}

View File

@ -0,0 +1,388 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package kmsv2 transforms values for storage at rest using a Envelope v2 provider
package kmsv2
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"
mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2alpha1"
"k8s.io/apimachinery/pkg/util/uuid"
)
type testSocket struct {
path string
endpoint string
}
// newEndpoint constructs a unique name for a Linux Abstract Socket to be used in a test.
// This package uses Linux Domain Sockets to remove the need for clean-up of socket files.
func newEndpoint() *testSocket {
p := fmt.Sprintf("@%s.sock", uuid.NewUUID())
return &testSocket{
path: p,
endpoint: fmt.Sprintf("unix:///%s", p),
}
}
// TestKMSPluginLateStart tests the scenario where kms-plugin pod/container starts after kube-apiserver pod/container.
// Since the Dial to kms-plugin is non-blocking we expect the construction of gRPC service to succeed even when
// kms-plugin is not yet up - dialing happens in the background.
func TestKMSPluginLateStart(t *testing.T) {
t.Parallel()
callTimeout := 3 * time.Second
s := newEndpoint()
service, err := NewGRPCService(s.endpoint, callTimeout)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
defer destroyService(service)
time.Sleep(callTimeout / 2)
f, err := mock.NewBase64Plugin(s.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
data := []byte("test data")
uid := string(uuid.NewUUID())
_, err = service.Encrypt(context.Background(), uid, data)
if err != nil {
t.Fatalf("failed when execute encrypt, error: %v", err)
}
}
func TestTimeouts(t *testing.T) {
t.Parallel()
var testCases = []struct {
desc string
callTimeout time.Duration
pluginDelay time.Duration
kubeAPIServerDelay time.Duration
wantErr string
}{
{
desc: "timeout zero - expect failure when call from kube-apiserver arrives before plugin starts",
callTimeout: 0 * time.Second,
pluginDelay: 3 * time.Second,
wantErr: "rpc error: code = DeadlineExceeded desc = context deadline exceeded",
},
{
desc: "timeout zero but kms-plugin already up - still failure - zero timeout is an invalid value",
callTimeout: 0 * time.Second,
pluginDelay: 0 * time.Second,
kubeAPIServerDelay: 2 * time.Second,
wantErr: "rpc error: code = DeadlineExceeded desc = context deadline exceeded",
},
{
desc: "timeout greater than kms-plugin delay - expect success",
callTimeout: 6 * time.Second,
pluginDelay: 3 * time.Second,
},
{
desc: "timeout less than kms-plugin delay - expect failure",
callTimeout: 3 * time.Second,
pluginDelay: 6 * time.Second,
wantErr: "rpc error: code = DeadlineExceeded desc = context deadline exceeded",
},
}
for _, tt := range testCases {
tt := tt
t.Run(tt.desc, func(t *testing.T) {
t.Parallel()
var (
service Service
err error
data = []byte("test data")
uid = string(uuid.NewUUID())
kubeAPIServerWG sync.WaitGroup
kmsPluginWG sync.WaitGroup
testCompletedWG sync.WaitGroup
socketName = newEndpoint()
)
testCompletedWG.Add(1)
defer testCompletedWG.Done()
kubeAPIServerWG.Add(1)
go func() {
// Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase.
time.Sleep(tt.kubeAPIServerDelay)
service, err = NewGRPCService(socketName.endpoint, tt.callTimeout)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
defer destroyService(service)
kubeAPIServerWG.Done()
// Keeping kube-apiserver up to process requests.
testCompletedWG.Wait()
}()
kmsPluginWG.Add(1)
go func() {
// Simulating delayed start of kms-plugin, kube-apiserver is up before the plugin, if requested by the testcase.
time.Sleep(tt.pluginDelay)
f, err := mock.NewBase64Plugin(socketName.path)
if err != nil {
t.Fatalf("failed to construct test KMS provider server, error: %v", err)
}
if err := f.Start(); err != nil {
t.Fatalf("Failed to start test KMS provider server, error: %v", err)
}
defer f.CleanUp()
kmsPluginWG.Done()
// Keeping plugin up to process requests.
testCompletedWG.Wait()
}()
kubeAPIServerWG.Wait()
_, err = service.Encrypt(context.Background(), uid, data)
if err == nil && tt.wantErr != "" {
t.Fatalf("got nil, want %s", tt.wantErr)
}
if err != nil && tt.wantErr == "" {
t.Fatalf("got %q, want nil", err.Error())
}
// Collecting kms-plugin - allowing plugin to clean-up.
kmsPluginWG.Wait()
})
}
}
// TestIntermittentConnectionLoss tests the scenario where the connection with kms-plugin is intermittently lost.
func TestIntermittentConnectionLoss(t *testing.T) {
t.Parallel()
var (
wg1 sync.WaitGroup
wg2 sync.WaitGroup
timeout = 30 * time.Second
blackOut = 1 * time.Second
data = []byte("test data")
uid = string(uuid.NewUUID())
endpoint = newEndpoint()
encryptErr error
)
// Start KMS Plugin
f, err := mock.NewBase64Plugin(endpoint.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
// connect to kms plugin
service, err := NewGRPCService(endpoint.endpoint, timeout)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
defer destroyService(service)
ctx := context.Background()
_, err = service.Encrypt(ctx, uid, data)
if err != nil {
t.Fatalf("failed when execute encrypt, error: %v", err)
}
t.Log("Connected to KMSPlugin")
// Stop KMS Plugin - simulating connection loss
t.Log("KMS Plugin is stopping")
f.CleanUp()
time.Sleep(2 * time.Second)
wg1.Add(1)
wg2.Add(1)
go func() {
defer wg2.Done()
// Call service to encrypt data.
t.Log("Sending encrypt request")
wg1.Done()
_, err := service.Encrypt(ctx, uid, data)
if err != nil {
encryptErr = fmt.Errorf("failed when executing encrypt, error: %v", err)
}
}()
wg1.Wait()
time.Sleep(blackOut)
// Start KMS Plugin
f, err = mock.NewBase64Plugin(endpoint.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
t.Log("Restarted KMS Plugin")
wg2.Wait()
if encryptErr != nil {
t.Error(encryptErr)
}
}
// Normal encryption and decryption operation.
func TestGRPCService(t *testing.T) {
t.Parallel()
// Start a test gRPC server.
endpoint := newEndpoint()
f, err := mock.NewBase64Plugin(endpoint.path)
if err != nil {
t.Fatalf("failed to construct test KMS provider server, error: %v", err)
}
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
// Create the gRPC client service.
service, err := NewGRPCService(endpoint.endpoint, 1*time.Second)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
defer destroyService(service)
ctx := context.Background()
// Call service to encrypt data.
data := []byte("test data")
uid := string(uuid.NewUUID())
resp, err := service.Encrypt(ctx, uid, data)
if err != nil {
t.Fatalf("failed when execute encrypt, error: %v", err)
}
keyID := "1"
// Call service to decrypt data.
result, err := service.Decrypt(ctx, uid, &DecryptRequest{Ciphertext: resp.Ciphertext, KeyID: keyID})
if err != nil {
t.Fatalf("failed when execute decrypt, error: %v", err)
}
if !reflect.DeepEqual(data, result) {
t.Errorf("expect: %v, but: %v", data, result)
}
}
// Normal encryption and decryption operation by multiple go-routines.
func TestGRPCServiceConcurrentAccess(t *testing.T) {
t.Parallel()
// Start a test gRPC server.
endpoint := newEndpoint()
f, err := mock.NewBase64Plugin(endpoint.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
// Create the gRPC client service.
service, err := NewGRPCService(endpoint.endpoint, 15*time.Second)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
defer destroyService(service)
ctx := context.Background()
var wg sync.WaitGroup
n := 100
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
// Call service to encrypt data.
data := []byte("test data")
uid := string(uuid.NewUUID())
resp, err := service.Encrypt(ctx, uid, data)
if err != nil {
t.Errorf("failed when execute encrypt, error: %v", err)
}
keyID := "1"
// Call service to decrypt data.
result, err := service.Decrypt(ctx, uid, &DecryptRequest{Ciphertext: resp.Ciphertext, KeyID: keyID})
if err != nil {
t.Errorf("failed when execute decrypt, error: %v", err)
}
if !reflect.DeepEqual(data, result) {
t.Errorf("expect: %v, but: %v", data, result)
}
}()
}
wg.Wait()
}
func destroyService(service Service) {
if service != nil {
s := service.(*gRPCService)
s.connection.Close()
}
}
// Test all those invalid configuration for KMS provider.
func TestInvalidConfiguration(t *testing.T) {
t.Parallel()
// Start a test gRPC server.
f, err := mock.NewBase64Plugin(newEndpoint().path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
invalidConfigs := []struct {
name string
endpoint string
}{
{"emptyConfiguration", ""},
{"invalidScheme", "tcp://localhost:6060"},
}
for _, testCase := range invalidConfigs {
t.Run(testCase.name, func(t *testing.T) {
_, err := NewGRPCService(testCase.endpoint, 1*time.Second)
if err == nil {
t.Fatalf("should fail to create envelope service for %s.", testCase.name)
}
})
}
}

View File

@ -0,0 +1,9 @@
# See the OWNERS docs at https://go.k8s.io/owners
# Disable inheritance as this is an api owners file
options:
no_parent_owners: true
approvers:
- api-approvers
reviewers:
- sig-auth-api-reviewers

View File

@ -0,0 +1,128 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: api.proto
package v2alpha1
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
// EncryptedObject is the representation of data stored in etcd after envelope encryption.
type EncryptedObject struct {
// EncryptedData is the encrypted data.
EncryptedData []byte `protobuf:"bytes,1,opt,name=encryptedData,proto3" json:"encryptedData,omitempty"`
// KeyID is the KMS key ID used for encryption operations.
KeyID string `protobuf:"bytes,2,opt,name=keyID,proto3" json:"keyID,omitempty"`
// EncryptedDEK is the encrypted DEK.
EncryptedDEK []byte `protobuf:"bytes,3,opt,name=encryptedDEK,proto3" json:"encryptedDEK,omitempty"`
// Annotations is additional metadata that was provided by the KMS plugin.
Annotations map[string][]byte `protobuf:"bytes,4,rep,name=annotations,proto3" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *EncryptedObject) Reset() { *m = EncryptedObject{} }
func (m *EncryptedObject) String() string { return proto.CompactTextString(m) }
func (*EncryptedObject) ProtoMessage() {}
func (*EncryptedObject) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{0}
}
func (m *EncryptedObject) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_EncryptedObject.Unmarshal(m, b)
}
func (m *EncryptedObject) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_EncryptedObject.Marshal(b, m, deterministic)
}
func (m *EncryptedObject) XXX_Merge(src proto.Message) {
xxx_messageInfo_EncryptedObject.Merge(m, src)
}
func (m *EncryptedObject) XXX_Size() int {
return xxx_messageInfo_EncryptedObject.Size(m)
}
func (m *EncryptedObject) XXX_DiscardUnknown() {
xxx_messageInfo_EncryptedObject.DiscardUnknown(m)
}
var xxx_messageInfo_EncryptedObject proto.InternalMessageInfo
func (m *EncryptedObject) GetEncryptedData() []byte {
if m != nil {
return m.EncryptedData
}
return nil
}
func (m *EncryptedObject) GetKeyID() string {
if m != nil {
return m.KeyID
}
return ""
}
func (m *EncryptedObject) GetEncryptedDEK() []byte {
if m != nil {
return m.EncryptedDEK
}
return nil
}
func (m *EncryptedObject) GetAnnotations() map[string][]byte {
if m != nil {
return m.Annotations
}
return nil
}
func init() {
proto.RegisterType((*EncryptedObject)(nil), "v2alpha1.EncryptedObject")
proto.RegisterMapType((map[string][]byte)(nil), "v2alpha1.EncryptedObject.AnnotationsEntry")
}
func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) }
var fileDescriptor_00212fb1f9d3bf1c = []byte{
// 200 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4c, 0x2c, 0xc8, 0xd4,
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x28, 0x33, 0x4a, 0xcc, 0x29, 0xc8, 0x48, 0x34, 0x54,
0xfa, 0xcf, 0xc8, 0xc5, 0xef, 0x9a, 0x97, 0x5c, 0x54, 0x59, 0x50, 0x92, 0x9a, 0xe2, 0x9f, 0x94,
0x95, 0x9a, 0x5c, 0x22, 0xa4, 0xc2, 0xc5, 0x9b, 0x0a, 0x13, 0x72, 0x49, 0x2c, 0x49, 0x94, 0x60,
0x54, 0x60, 0xd4, 0xe0, 0x09, 0x42, 0x15, 0x14, 0x12, 0xe1, 0x62, 0xcd, 0x4e, 0xad, 0xf4, 0x74,
0x91, 0x60, 0x52, 0x60, 0xd4, 0xe0, 0x0c, 0x82, 0x70, 0x84, 0x94, 0xb8, 0x78, 0x10, 0xca, 0x5c,
0xbd, 0x25, 0x98, 0xc1, 0x5a, 0x51, 0xc4, 0x84, 0x7c, 0xb8, 0xb8, 0x13, 0xf3, 0xf2, 0xf2, 0x4b,
0x12, 0x4b, 0x32, 0xf3, 0xf3, 0x8a, 0x25, 0x58, 0x14, 0x98, 0x35, 0xb8, 0x8d, 0xb4, 0xf4, 0x60,
0x6e, 0xd2, 0x43, 0x73, 0x8f, 0x9e, 0x23, 0x42, 0xb1, 0x6b, 0x5e, 0x49, 0x51, 0x65, 0x10, 0xb2,
0x76, 0x29, 0x3b, 0x2e, 0x01, 0x74, 0x05, 0x42, 0x02, 0x5c, 0xcc, 0xd9, 0xa9, 0x95, 0x60, 0x77,
0x73, 0x06, 0x81, 0x98, 0x20, 0xd7, 0x96, 0x25, 0xe6, 0x94, 0xa6, 0x82, 0x5d, 0xcb, 0x13, 0x04,
0xe1, 0x58, 0x31, 0x59, 0x30, 0x26, 0xb1, 0x81, 0x83, 0xc4, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff,
0x88, 0x8c, 0xbb, 0x4e, 0x1f, 0x01, 0x00, 0x00,
}

View File

@ -0,0 +1,35 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// To regenerate api.pb.go run hack/update-generated-kms.sh
syntax = "proto3";
package v2alpha1;
// EncryptedObject is the representation of data stored in etcd after envelope encryption.
message EncryptedObject {
// EncryptedData is the encrypted data.
bytes encryptedData = 1;
// KeyID is the KMS key ID used for encryption operations.
string keyID = 2;
// EncryptedDEK is the encrypted DEK.
bytes encryptedDEK = 3;
// Annotations is additional metadata that was provided by the KMS plugin.
map<string, bytes> annotations = 4;
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package v2alpha1 contains definition of kms-plugin's serialized types.
package v2alpha1

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package envelope
package metrics
import (
"sync"
@ -27,8 +27,8 @@ import (
const (
namespace = "apiserver"
subsystem = "envelope_encryption"
fromStorageLabel = "from_storage"
toStorageLabel = "to_storage"
FromStorageLabel = "from_storage"
ToStorageLabel = "to_storage"
)
/*
@ -71,16 +71,16 @@ var (
var registerMetricsFunc sync.Once
func registerMetrics() {
func RegisterMetrics() {
registerMetricsFunc.Do(func() {
legacyregistry.MustRegister(dekCacheFillPercent)
legacyregistry.MustRegister(dekCacheInterArrivals)
})
}
func recordArrival(transformationType string, start time.Time) {
func RecordArrival(transformationType string, start time.Time) {
switch transformationType {
case fromStorageLabel:
case FromStorageLabel:
lockLastFromStorage.Lock()
defer lockLastFromStorage.Unlock()
@ -89,7 +89,7 @@ func recordArrival(transformationType string, start time.Time) {
}
dekCacheInterArrivals.WithLabelValues(transformationType).Observe(start.Sub(lastFromStorage).Seconds())
lastFromStorage = start
case toStorageLabel:
case ToStorageLabel:
lockLastToStorage.Lock()
defer lockLastToStorage.Unlock()
@ -100,3 +100,7 @@ func recordArrival(transformationType string, start time.Time) {
lastToStorage = start
}
}
func RecordDekCacheFillPercent(percent float64) {
dekCacheFillPercent.Set(percent)
}

View File

@ -17,7 +17,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
package v1beta1
import (
"context"

View File

@ -0,0 +1,191 @@
//go:build !windows
// +build !windows
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v2alpha1
import (
"context"
"encoding/base64"
"fmt"
"net"
"os"
"runtime"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/wait"
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1"
"k8s.io/klog/v2"
)
const (
// Now only supported unix domain socket.
unixProtocol = "unix"
// Current version for the protocol interface definition.
kmsapiVersion = "v2alpha1"
)
// Base64Plugin gRPC sever for a mock KMS provider.
// Uses base64 to simulate encrypt and decrypt.
type Base64Plugin struct {
grpcServer *grpc.Server
listener net.Listener
mu *sync.Mutex
lastEncryptRequest *kmsapi.EncryptRequest
inFailedState bool
ver string
socketPath string
}
// NewBase64Plugin is a constructor for Base64Plugin.
func NewBase64Plugin(socketPath string) (*Base64Plugin, error) {
server := grpc.NewServer()
result := &Base64Plugin{
grpcServer: server,
mu: &sync.Mutex{},
ver: kmsapiVersion,
socketPath: socketPath,
}
kmsapi.RegisterKeyManagementServiceServer(server, result)
return result, nil
}
// WaitForBase64PluginToBeUp waits until the plugin is ready to serve requests.
func WaitForBase64PluginToBeUp(plugin *Base64Plugin) error {
var gRPCErr error
var resp *kmsapi.StatusResponse
pollErr := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
resp, gRPCErr = plugin.Status(context.Background(), &kmsapi.StatusRequest{})
return gRPCErr == nil && resp.Healthz == "ok", nil
})
if pollErr == wait.ErrWaitTimeout {
return fmt.Errorf("failed to start kms-plugin, error: %v", gRPCErr)
}
return nil
}
// LastEncryptRequest returns the last EncryptRequest.Plain sent to the plugin.
func (s *Base64Plugin) LastEncryptRequest() []byte {
return s.lastEncryptRequest.Plaintext
}
// SetVersion sets the version of kms-plugin.
func (s *Base64Plugin) SetVersion(ver string) {
s.ver = ver
}
// Start starts plugin's gRPC service.
func (s *Base64Plugin) Start() error {
var err error
s.listener, err = net.Listen(unixProtocol, s.socketPath)
if err != nil {
return fmt.Errorf("failed to listen on the unix socket, error: %v", err)
}
klog.Infof("Listening on %s", s.socketPath)
go s.grpcServer.Serve(s.listener)
return nil
}
// CleanUp stops gRPC server and the underlying listener.
func (s *Base64Plugin) CleanUp() {
s.grpcServer.Stop()
s.listener.Close()
if !strings.HasPrefix(s.socketPath, "@") || runtime.GOOS != "linux" {
os.Remove(s.socketPath)
}
}
// EnterFailedState places the plugin into failed state.
func (s *Base64Plugin) EnterFailedState() {
s.mu.Lock()
defer s.mu.Unlock()
s.inFailedState = true
}
// ExitFailedState removes the plugin from the failed state.
func (s *Base64Plugin) ExitFailedState() {
s.mu.Lock()
defer s.mu.Unlock()
s.inFailedState = false
}
// Status returns the status of the kms-plugin.
func (s *Base64Plugin) Status(ctx context.Context, request *kmsapi.StatusRequest) (*kmsapi.StatusResponse, error) {
klog.Infof("Received request for Status: %v", request)
s.mu.Lock()
defer s.mu.Unlock()
if s.inFailedState {
return nil, status.Error(codes.FailedPrecondition, "failed precondition - key disabled")
}
return &kmsapi.StatusResponse{Version: s.ver, Healthz: "ok", KeyId: "1"}, nil
}
// Decrypt performs base64 decoding of the payload of kms.DecryptRequest.
func (s *Base64Plugin) Decrypt(ctx context.Context, request *kmsapi.DecryptRequest) (*kmsapi.DecryptResponse, error) {
klog.V(3).Infof("Received Decrypt Request for DEK: %s", string(request.Ciphertext))
s.mu.Lock()
defer s.mu.Unlock()
if s.inFailedState {
return nil, status.Error(codes.FailedPrecondition, "failed precondition - key disabled")
}
if len(request.Uid) == 0 {
return nil, status.Error(codes.InvalidArgument, "uid is required")
}
buf := make([]byte, base64.StdEncoding.DecodedLen(len(request.Ciphertext)))
n, err := base64.StdEncoding.Decode(buf, request.Ciphertext)
if err != nil {
return nil, err
}
return &kmsapi.DecryptResponse{Plaintext: buf[:n]}, nil
}
// Encrypt performs base64 encoding of the payload of kms.EncryptRequest.
func (s *Base64Plugin) Encrypt(ctx context.Context, request *kmsapi.EncryptRequest) (*kmsapi.EncryptResponse, error) {
klog.V(3).Infof("Received Encrypt Request for DEK: %x", request.Plaintext)
s.mu.Lock()
defer s.mu.Unlock()
s.lastEncryptRequest = request
if s.inFailedState {
return nil, status.Error(codes.FailedPrecondition, "failed precondition - key disabled")
}
if len(request.Uid) == 0 {
return nil, status.Error(codes.InvalidArgument, "uid is required")
}
buf := make([]byte, base64.StdEncoding.EncodedLen(len(request.Plaintext)))
base64.StdEncoding.Encode(buf, request.Plaintext)
return &kmsapi.EncryptResponse{Ciphertext: buf, KeyId: "1", Annotations: map[string][]byte{"kms.kubernetes.io/local-kek": []byte("encrypted-local-kek")}}, nil
}

View File

@ -0,0 +1,54 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net/url"
"strings"
)
const (
// unixProtocol is the only supported protocol for remote KMS provider.
unixProtocol = "unix"
)
// Parse the endpoint to extract schema, host or path.
func ParseEndpoint(endpoint string) (string, error) {
if len(endpoint) == 0 {
return "", fmt.Errorf("remote KMS provider can't use empty string as endpoint")
}
u, err := url.Parse(endpoint)
if err != nil {
return "", fmt.Errorf("invalid endpoint %q for remote KMS provider, error: %v", endpoint, err)
}
if u.Scheme != unixProtocol {
return "", fmt.Errorf("unsupported scheme %q for remote KMS provider", u.Scheme)
}
// Linux abstract namespace socket - no physical file required
// Warning: Linux Abstract sockets have not concept of ACL (unlike traditional file based sockets).
// However, Linux Abstract sockets are subject to Linux networking namespace, so will only be accessible to
// containers within the same pod (unless host networking is used).
if strings.HasPrefix(u.Path, "/@") {
return strings.TrimPrefix(u.Path, "/"), nil
}
return u.Path, nil
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"strings"
"testing"
)
func TestParseEndpoint(t *testing.T) {
testCases := []struct {
desc string
endpoint string
want string
}{
{
desc: "path with prefix",
endpoint: "unix:///@path",
want: "@path",
},
{
desc: "path without prefix",
endpoint: "unix:///path",
want: "/path",
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
got, err := ParseEndpoint(tt.endpoint)
if err != nil {
t.Errorf("ParseEndpoint(%q) error: %v", tt.endpoint, err)
}
if got != tt.want {
t.Errorf("ParseEndpoint(%q) = %q, want %q", tt.endpoint, got, tt.want)
}
})
}
}
func TestParseEndpointError(t *testing.T) {
testCases := []struct {
desc string
endpoint string
wantErr string
}{
{
desc: "empty endpoint",
endpoint: "",
wantErr: "remote KMS provider can't use empty string as endpoint",
},
{
desc: "invalid scheme",
endpoint: "http:///path",
wantErr: "unsupported scheme \"http\" for remote KMS provider",
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
_, err := ParseEndpoint(tt.endpoint)
if err == nil {
t.Errorf("ParseEndpoint(%q) error: %v", tt.endpoint, err)
}
if !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("ParseEndpoint(%q) = %q, want %q", tt.endpoint, err, tt.wantErr)
}
})
}
}

View File

@ -36,7 +36,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/value"
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing"
mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v1beta1"
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

View File

@ -0,0 +1,275 @@
//go:build !windows
// +build !windows
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package transformation
import (
"bytes"
"context"
"crypto/aes"
"fmt"
"strings"
"testing"
"time"
"github.com/gogo/protobuf/proto"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage/value"
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1"
kmsv2mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2alpha1"
kmsv2api "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
type envelopekmsv2 struct {
providerName string
rawEnvelope []byte
plainTextDEK []byte
}
func (r envelopekmsv2) prefix() string {
return fmt.Sprintf("k8s:enc:kms:v2:%s:", r.providerName)
}
func (r envelopekmsv2) prefixLen() int {
return len(r.prefix())
}
func (r envelopekmsv2) cipherTextDEK() ([]byte, error) {
o := &kmstypes.EncryptedObject{}
if err := proto.Unmarshal(r.rawEnvelope[r.startOfPayload(r.providerName):], o); err != nil {
return nil, err
}
return o.EncryptedDEK, nil
}
func (r envelopekmsv2) startOfPayload(_ string) int {
return r.prefixLen()
}
func (r envelopekmsv2) cipherTextPayload() ([]byte, error) {
o := &kmstypes.EncryptedObject{}
if err := proto.Unmarshal(r.rawEnvelope[r.startOfPayload(r.providerName):], o); err != nil {
return nil, err
}
return o.EncryptedData, nil
}
func (r envelopekmsv2) plainTextPayload(secretETCDPath string) ([]byte, error) {
block, err := aes.NewCipher(r.plainTextDEK)
if err != nil {
return nil, fmt.Errorf("failed to initialize AES Cipher: %v", err)
}
ctx := context.Background()
dataCtx := value.DefaultContext([]byte(secretETCDPath))
aesgcmTransformer := aestransformer.NewGCMTransformer(block)
data, err := r.cipherTextPayload()
if err != nil {
return nil, fmt.Errorf("failed to get cipher text payload: %v", err)
}
plainSecret, _, err := aesgcmTransformer.TransformFromStorage(ctx, data, dataCtx)
if err != nil {
return nil, fmt.Errorf("failed to transform from storage via AESGCM, err: %w", err)
}
return plainSecret, nil
}
// TestKMSv2Provider is an integration test between KubeAPI, ETCD and KMSv2 Plugin
// Concretely, this test verifies the following integration contracts:
// 1. Raw records in ETCD that were processed by KMSv2 Provider should be prefixed with []byte{'e', 'k', '8', 's', 0}
// 2. Data Encryption Key (DEK) should be generated by envelopeTransformer and passed to KMS gRPC Plugin
// 3. KMS gRPC Plugin should encrypt the DEK with a Key Encryption Key (KEK) and pass it back to envelopeTransformer
// 4. The cipherTextPayload (ex. Secret) should be encrypted via AES GCM transform
// 5. kmstypes.EncryptedObject structure should be serialized and deposited in ETCD
func TestKMSv2Provider(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
encryptionConfig := `
kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- secrets
providers:
- kms:
apiVersion: v2
name: kms-provider
cachesize: 1000
endpoint: unix:///@kms-provider.sock
`
providerName := "kms-provider"
pluginMock, err := kmsv2mock.NewBase64Plugin("@kms-provider.sock")
if err != nil {
t.Fatalf("failed to create mock of KMSv2 Plugin: %v", err)
}
go pluginMock.Start()
if err := kmsv2mock.WaitForBase64PluginToBeUp(pluginMock); err != nil {
t.Fatalf("Failed start plugin, err: %v", err)
}
defer pluginMock.CleanUp()
test, err := newTransformTest(t, encryptionConfig)
if err != nil {
t.Fatalf("failed to start KUBE API Server with encryptionConfig\n %s, error: %v", encryptionConfig, err)
}
defer test.cleanUp()
test.secret, err = test.createSecret(testSecret, testNamespace)
if err != nil {
t.Fatalf("Failed to create test secret, error: %v", err)
}
// Since Data Encryption Key (DEK) is randomly generated (per encryption operation), we need to ask KMS Mock for it.
plainTextDEK := pluginMock.LastEncryptRequest()
secretETCDPath := test.getETCDPath()
rawEnvelope, err := test.getRawSecretFromETCD()
if err != nil {
t.Fatalf("failed to read %s from etcd: %v", secretETCDPath, err)
}
envelopeData := envelopekmsv2{
providerName: providerName,
rawEnvelope: rawEnvelope,
plainTextDEK: plainTextDEK,
}
wantPrefix := string(envelopeData.prefix())
if !bytes.HasPrefix(rawEnvelope, []byte(wantPrefix)) {
t.Fatalf("expected secret to be prefixed with %s, but got %s", wantPrefix, rawEnvelope)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ciphertext, err := envelopeData.cipherTextDEK()
if err != nil {
t.Fatalf("failed to get ciphertext DEK from KMSv2 Plugin: %v", err)
}
decryptResponse, err := pluginMock.Decrypt(ctx, &kmsv2api.DecryptRequest{Uid: string(types.UID(uuid.NewUUID())), Ciphertext: ciphertext})
if err != nil {
t.Fatalf("failed to decrypt DEK, %v", err)
}
dekPlainAsWouldBeSeenByETCD := decryptResponse.Plaintext
if !bytes.Equal(plainTextDEK, dekPlainAsWouldBeSeenByETCD) {
t.Fatalf("expected plainTextDEK %v to be passed to KMS Plugin, but got %s",
plainTextDEK, dekPlainAsWouldBeSeenByETCD)
}
plainSecret, err := envelopeData.plainTextPayload(secretETCDPath)
if err != nil {
t.Fatalf("failed to transform from storage via AESGCM, err: %v", err)
}
if !strings.Contains(string(plainSecret), secretVal) {
t.Fatalf("expected %q after decryption, but got %q", secretVal, string(plainSecret))
}
secretClient := test.restClient.CoreV1().Secrets(testNamespace)
// Secrets should be un-enveloped on direct reads from Kube API Server.
s, err := secretClient.Get(ctx, testSecret, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get Secret from %s, err: %v", testNamespace, err)
}
if secretVal != string(s.Data[secretKey]) {
t.Fatalf("expected %s from KubeAPI, but got %s", secretVal, string(s.Data[secretKey]))
}
}
func TestKMSv2Healthz(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
encryptionConfig := `
kind: EncryptionConfiguration
apiVersion: apiserver.config.k8s.io/v1
resources:
- resources:
- secrets
providers:
- kms:
apiVersion: v2
name: provider-1
endpoint: unix:///@kms-provider-1.sock
- kms:
apiVersion: v2
name: provider-2
endpoint: unix:///@kms-provider-2.sock
`
pluginMock1, err := kmsv2mock.NewBase64Plugin("@kms-provider-1.sock")
if err != nil {
t.Fatalf("failed to create mock of KMS Plugin #1: %v", err)
}
if err := pluginMock1.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer pluginMock1.CleanUp()
if err := kmsv2mock.WaitForBase64PluginToBeUp(pluginMock1); err != nil {
t.Fatalf("Failed to start plugin #1, err: %v", err)
}
pluginMock2, err := kmsv2mock.NewBase64Plugin("@kms-provider-2.sock")
if err != nil {
t.Fatalf("Failed to create mock of KMS Plugin #2: err: %v", err)
}
if err := pluginMock2.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer pluginMock2.CleanUp()
if err := kmsv2mock.WaitForBase64PluginToBeUp(pluginMock2); err != nil {
t.Fatalf("Failed to start KMS Plugin #2: err: %v", err)
}
test, err := newTransformTest(t, encryptionConfig)
if err != nil {
t.Fatalf("Failed to start kube-apiserver, error: %v", err)
}
defer test.cleanUp()
// Name of the healthz check is calculated based on a constant "kms-provider-" + position of the
// provider in the config.
// Stage 1 - Since all kms-plugins are guaranteed to be up, healthz checks for:
// healthz/kms-provider-0 and /healthz/kms-provider-1 should be OK.
mustBeHealthy(t, "kms-provider-0", test.kubeAPIServer.ClientConfig)
mustBeHealthy(t, "kms-provider-1", test.kubeAPIServer.ClientConfig)
// Stage 2 - kms-plugin for provider-1 is down. Therefore, expect the health check for provider-1
// to fail, but provider-2 should still be OK
pluginMock1.EnterFailedState()
mustBeUnHealthy(t, "kms-provider-0", test.kubeAPIServer.ClientConfig)
mustBeHealthy(t, "kms-provider-1", test.kubeAPIServer.ClientConfig)
pluginMock1.ExitFailedState()
// Stage 3 - kms-plugin for provider-1 is now up. Therefore, expect the health check for provider-1
// to succeed now, but provider-2 is now down.
// Need to sleep since health check chases responses for 3 seconds.
pluginMock2.EnterFailedState()
mustBeHealthy(t, "kms-provider-0", test.kubeAPIServer.ClientConfig)
mustBeUnHealthy(t, "kms-provider-1", test.kubeAPIServer.ClientConfig)
}

View File

@ -198,7 +198,7 @@ func EtcdMain(tests func() int) {
// like k8s.io/klog/v2.(*loggingT).flushDaemon()
// TODO(#108483): Reduce this number once we address the
// couple remaining issues.
if dg := runtime.NumGoroutine() - before; dg <= 10 {
if dg := runtime.NumGoroutine() - before; dg <= 15 {
return true, nil
}
// Allow goroutines to schedule and die off.

8
vendor/modules.txt vendored
View File

@ -1648,8 +1648,14 @@ k8s.io/apiserver/pkg/storage/testing
k8s.io/apiserver/pkg/storage/value
k8s.io/apiserver/pkg/storage/value/encrypt/aes
k8s.io/apiserver/pkg/storage/value/encrypt/envelope
k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing
k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2
k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1
k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics
k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v1beta1
k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing/v2alpha1
k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util
k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1
k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1
k8s.io/apiserver/pkg/storage/value/encrypt/identity
k8s.io/apiserver/pkg/storage/value/encrypt/secretbox
k8s.io/apiserver/pkg/storageversion