Merge pull request #111986 from enj/enj/i/transformer_leak

kms: fix go routine leak in gRPC connection
This commit is contained in:
Kubernetes Prow Robot 2022-09-08 09:49:46 -07:00 committed by GitHub
commit cc4b7dc3c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 140 additions and 61 deletions

View File

@ -412,7 +412,7 @@ func buildGenericConfig(
lastErr = err lastErr = err
return return
} }
storageFactory, lastErr = completedStorageFactoryConfig.New() storageFactory, lastErr = completedStorageFactoryConfig.New(genericConfig.DrainedNotify())
if lastErr != nil { if lastErr != nil {
return return
} }

View File

@ -111,7 +111,7 @@ type completedStorageFactoryConfig struct {
} }
// New returns a new storage factory created from the completed storage factory configuration. // New returns a new storage factory created from the completed storage factory configuration.
func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error) { func (c *completedStorageFactoryConfig) New(stopCh <-chan struct{}) (*serverstorage.DefaultStorageFactory, error) {
resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides) resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides)
storageFactory := serverstorage.NewDefaultStorageFactory( storageFactory := serverstorage.NewDefaultStorageFactory(
c.StorageConfig, c.StorageConfig,
@ -142,7 +142,7 @@ func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFact
storageFactory.SetEtcdLocation(groupResource, servers) storageFactory.SetEtcdLocation(groupResource, servers)
} }
if len(c.EncryptionProviderConfigFilepath) != 0 { if len(c.EncryptionProviderConfigFilepath) != 0 {
transformerOverrides, err := encryptionconfig.GetTransformerOverrides(c.EncryptionProviderConfigFilepath) transformerOverrides, err := encryptionconfig.GetTransformerOverrides(c.EncryptionProviderConfigFilepath, stopCh)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package registrytest package registrytest
import ( import (
"context"
"testing" "testing"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -35,6 +36,9 @@ func NewEtcdStorage(t *testing.T, group string) (*storagebackend.ConfigForResour
func NewEtcdStorageForResource(t *testing.T, resource schema.GroupResource) (*storagebackend.ConfigForResource, *etcd3testing.EtcdTestServer) { func NewEtcdStorageForResource(t *testing.T, resource schema.GroupResource) (*storagebackend.ConfigForResource, *etcd3testing.EtcdTestServer) {
t.Helper() t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
server, config := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, config := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
options := options.NewEtcdOptions(config) options := options.NewEtcdOptions(config)
@ -43,7 +47,7 @@ func NewEtcdStorageForResource(t *testing.T, resource schema.GroupResource) (*st
t.Fatal(err) t.Fatal(err)
} }
completedConfig.APIResourceConfig = serverstorage.NewResourceConfig() completedConfig.APIResourceConfig = serverstorage.NewResourceConfig()
factory, err := completedConfig.New() factory, err := completedConfig.New(ctx.Done())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -33,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
apiserverconfig "k8s.io/apiserver/pkg/apis/config" apiserverconfig "k8s.io/apiserver/pkg/apis/config"
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1" apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1"
"k8s.io/apiserver/pkg/apis/config/validation" "k8s.io/apiserver/pkg/apis/config/validation"
@ -93,7 +95,7 @@ func (p *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker {
} }
// GetKMSPluginHealthzCheckers extracts KMSPluginProbes from the EncryptionConfig. // GetKMSPluginHealthzCheckers extracts KMSPluginProbes from the EncryptionConfig.
func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, error) { func GetKMSPluginHealthzCheckers(filepath string, stopCh <-chan struct{}) ([]healthz.HealthChecker, error) {
f, err := os.Open(filepath) f, err := os.Open(filepath)
if err != nil { if err != nil {
return nil, fmt.Errorf("error opening encryption provider configuration file %q: %v", filepath, err) return nil, fmt.Errorf("error opening encryption provider configuration file %q: %v", filepath, err)
@ -101,7 +103,7 @@ func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, erro
defer f.Close() defer f.Close()
var result []healthz.HealthChecker var result []healthz.HealthChecker
probes, err := getKMSPluginProbes(f) probes, err := getKMSPluginProbes(f, stopCh)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -120,7 +122,10 @@ func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, erro
return result, nil return result, nil
} }
func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) { func getKMSPluginProbes(reader io.Reader, stopCh <-chan struct{}) ([]interface{}, error) {
// we ignore the cancel func because this context should only be canceled when stopCh is closed
ctx, _ := wait.ContextForChannel(stopCh)
var result []interface{} var result []interface{}
configFileContents, err := io.ReadAll(reader) configFileContents, err := io.ReadAll(reader)
@ -138,7 +143,7 @@ func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) {
if p.KMS != nil { if p.KMS != nil {
switch p.KMS.APIVersion { switch p.KMS.APIVersion {
case kmsAPIVersionV1: case kmsAPIVersionV1:
s, err := envelope.NewGRPCService(p.KMS.Endpoint, p.KMS.Timeout.Duration) s, err := envelope.NewGRPCService(ctx, p.KMS.Endpoint, p.KMS.Timeout.Duration)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not configure KMSv1-Plugin's probe %q, error: %v", p.KMS.Name, err) return nil, fmt.Errorf("could not configure KMSv1-Plugin's probe %q, error: %v", p.KMS.Name, err)
} }
@ -156,7 +161,7 @@ func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) {
return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, KMSv2 feature is not enabled", p.KMS.Name) return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, KMSv2 feature is not enabled", p.KMS.Name)
} }
s, err := envelopekmsv2.NewGRPCService(p.KMS.Endpoint, p.KMS.Timeout.Duration) s, err := envelopekmsv2.NewGRPCService(ctx, p.KMS.Endpoint, p.KMS.Timeout.Duration)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %v", p.KMS.Name, err) return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %v", p.KMS.Name, err)
} }
@ -254,21 +259,21 @@ func isKMSv2ProviderHealthy(name string, response *envelopekmsv2.StatusResponse)
} }
// GetTransformerOverrides returns the transformer overrides by reading and parsing the encryption provider configuration file // GetTransformerOverrides returns the transformer overrides by reading and parsing the encryption provider configuration file
func GetTransformerOverrides(filepath string) (map[schema.GroupResource]value.Transformer, error) { func GetTransformerOverrides(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) {
f, err := os.Open(filepath) f, err := os.Open(filepath)
if err != nil { if err != nil {
return nil, fmt.Errorf("error opening encryption provider configuration file %q: %v", filepath, err) return nil, fmt.Errorf("error opening encryption provider configuration file %q: %v", filepath, err)
} }
defer f.Close() defer f.Close()
result, err := parseEncryptionConfiguration(f) result, err := parseEncryptionConfiguration(f, stopCh)
if err != nil { if err != nil {
return nil, fmt.Errorf("error while parsing encryption provider configuration file %q: %v", filepath, err) return nil, fmt.Errorf("error while parsing encryption provider configuration file %q: %v", filepath, err)
} }
return result, nil return result, nil
} }
func parseEncryptionConfiguration(f io.Reader) (map[schema.GroupResource]value.Transformer, error) { func parseEncryptionConfiguration(f io.Reader, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) {
configFileContents, err := io.ReadAll(f) configFileContents, err := io.ReadAll(f)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not read contents: %v", err) return nil, fmt.Errorf("could not read contents: %v", err)
@ -283,7 +288,7 @@ func parseEncryptionConfiguration(f io.Reader) (map[schema.GroupResource]value.T
// For each entry in the configuration // For each entry in the configuration
for _, resourceConfig := range config.Resources { for _, resourceConfig := range config.Resources {
transformers, err := prefixTransformers(&resourceConfig) transformers, err := prefixTransformers(&resourceConfig, stopCh)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -307,8 +312,8 @@ func parseEncryptionConfiguration(f io.Reader) (map[schema.GroupResource]value.T
func loadConfig(data []byte) (*apiserverconfig.EncryptionConfiguration, error) { func loadConfig(data []byte) (*apiserverconfig.EncryptionConfiguration, error) {
scheme := runtime.NewScheme() scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme) codecs := serializer.NewCodecFactory(scheme)
apiserverconfig.AddToScheme(scheme) utilruntime.Must(apiserverconfig.AddToScheme(scheme))
apiserverconfigv1.AddToScheme(scheme) utilruntime.Must(apiserverconfigv1.AddToScheme(scheme))
configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil) configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
if err != nil { if err != nil {
@ -330,7 +335,10 @@ var (
envelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService envelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService
) )
func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value.PrefixTransformer, error) { func prefixTransformers(config *apiserverconfig.ResourceConfiguration, stopCh <-chan struct{}) ([]value.PrefixTransformer, error) {
// we ignore the cancel func because this context should only be canceled when stopCh is closed
ctx, _ := wait.ContextForChannel(stopCh)
var result []value.PrefixTransformer var result []value.PrefixTransformer
for _, provider := range config.Providers { for _, provider := range config.Providers {
var ( var (
@ -349,7 +357,7 @@ func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value.
switch provider.KMS.APIVersion { switch provider.KMS.APIVersion {
case kmsAPIVersionV1: case kmsAPIVersionV1:
var envelopeService envelope.Service var envelopeService envelope.Service
if envelopeService, err = envelopeServiceFactory(provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil { if envelopeService, err = envelopeServiceFactory(ctx, provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil {
return nil, fmt.Errorf("could not configure KMS plugin %q, error: %v", provider.KMS.Name, err) return nil, fmt.Errorf("could not configure KMS plugin %q, error: %v", provider.KMS.Name, err)
} }
transformer, err = envelopePrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV1) transformer, err = envelopePrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV1)
@ -359,7 +367,7 @@ func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value.
} }
var envelopeService envelopekmsv2.Service var envelopeService envelopekmsv2.Service
if envelopeService, err = envelopeKMSv2ServiceFactory(provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil { if envelopeService, err = envelopeKMSv2ServiceFactory(ctx, provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil {
return nil, fmt.Errorf("could not configure KMSv2 plugin %q, error: %v", provider.KMS.Name, err) return nil, fmt.Errorf("could not configure KMSv2 plugin %q, error: %v", provider.KMS.Name, err)
} }
transformer, err = envelopekmsv2PrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV2) transformer, err = envelopekmsv2PrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV2)

View File

@ -115,7 +115,7 @@ func (t *testKMSv2EnvelopeService) Status(ctx context.Context) (*envelopekmsv2.S
} }
// The factory method to create mock envelope service. // The factory method to create mock envelope service.
func newMockEnvelopeService(endpoint string, timeout time.Duration) (envelope.Service, error) { func newMockEnvelopeService(ctx context.Context, endpoint string, timeout time.Duration) (envelope.Service, error) {
return &testEnvelopeService{nil}, nil return &testEnvelopeService{nil}, nil
} }
@ -125,7 +125,7 @@ func newMockErrorEnvelopeService(endpoint string, timeout time.Duration) (envelo
} }
// The factory method to create mock envelope kmsv2 service. // The factory method to create mock envelope kmsv2 service.
func newMockEnvelopeKMSv2Service(endpoint string, timeout time.Duration) (envelopekmsv2.Service, error) { func newMockEnvelopeKMSv2Service(ctx context.Context, endpoint string, timeout time.Duration) (envelopekmsv2.Service, error) {
return &testKMSv2EnvelopeService{nil}, nil return &testKMSv2EnvelopeService{nil}, nil
} }
@ -193,41 +193,43 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
envelopeKMSv2ServiceFactory = factoryKMSv2 envelopeKMSv2ServiceFactory = factoryKMSv2
}() }()
ctx := testContext(t)
// Creates compound/prefix transformers with different ordering of available transformers. // Creates compound/prefix transformers with different ordering of available transformers.
// Transforms data using one of them, and tries to untransform using the others. // Transforms data using one of them, and tries to untransform using the others.
// Repeats this for all possible combinations. // Repeats this for all possible combinations.
correctConfigWithIdentityFirst := "testdata/valid-configs/identity-first.yaml" correctConfigWithIdentityFirst := "testdata/valid-configs/identity-first.yaml"
identityFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithIdentityFirst)) identityFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithIdentityFirst), ctx.Done())
if err != nil { if err != nil {
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithIdentityFirst) t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithIdentityFirst)
} }
correctConfigWithAesGcmFirst := "testdata/valid-configs/aes-gcm-first.yaml" correctConfigWithAesGcmFirst := "testdata/valid-configs/aes-gcm-first.yaml"
aesGcmFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesGcmFirst)) aesGcmFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesGcmFirst), ctx.Done())
if err != nil { if err != nil {
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesGcmFirst) t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesGcmFirst)
} }
correctConfigWithAesCbcFirst := "testdata/valid-configs/aes-cbc-first.yaml" correctConfigWithAesCbcFirst := "testdata/valid-configs/aes-cbc-first.yaml"
aesCbcFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesCbcFirst)) aesCbcFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesCbcFirst), ctx.Done())
if err != nil { if err != nil {
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesCbcFirst) t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesCbcFirst)
} }
correctConfigWithSecretboxFirst := "testdata/valid-configs/secret-box-first.yaml" correctConfigWithSecretboxFirst := "testdata/valid-configs/secret-box-first.yaml"
secretboxFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithSecretboxFirst)) secretboxFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithSecretboxFirst), ctx.Done())
if err != nil { if err != nil {
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithSecretboxFirst) t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithSecretboxFirst)
} }
correctConfigWithKMSFirst := "testdata/valid-configs/kms-first.yaml" correctConfigWithKMSFirst := "testdata/valid-configs/kms-first.yaml"
kmsFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSFirst)) kmsFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSFirst), ctx.Done())
if err != nil { if err != nil {
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSFirst) t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSFirst)
} }
correctConfigWithKMSv2First := "testdata/valid-configs/kmsv2-first.yaml" correctConfigWithKMSv2First := "testdata/valid-configs/kmsv2-first.yaml"
kmsv2FirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSv2First)) kmsv2FirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSv2First), ctx.Done())
if err != nil { if err != nil {
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSv2First) t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSv2First)
} }
@ -240,7 +242,6 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
kmsFirstTransformer := kmsFirstTransformerOverrides[schema.ParseGroupResource("secrets")] kmsFirstTransformer := kmsFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
kmsv2FirstTransformer := kmsv2FirstTransformerOverrides[schema.ParseGroupResource("secrets")] kmsv2FirstTransformer := kmsv2FirstTransformerOverrides[schema.ParseGroupResource("secrets")]
ctx := context.Background()
dataCtx := value.DefaultContext([]byte(sampleContextText)) dataCtx := value.DefaultContext([]byte(sampleContextText))
originalText := []byte(sampleText) originalText := []byte(sampleText)
@ -280,11 +281,13 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
func TestKMSPluginHealthz(t *testing.T) { func TestKMSPluginHealthz(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
service, err := envelope.NewGRPCService("unix:///tmp/testprovider.sock", 3*time.Second) ctx := testContext(t)
service, err := envelope.NewGRPCService(ctx, "unix:///tmp/testprovider.sock", 3*time.Second)
if err != nil { if err != nil {
t.Fatalf("Could not initialize envelopeService, error: %v", err) t.Fatalf("Could not initialize envelopeService, error: %v", err)
} }
serviceKMSv2, err := envelopekmsv2.NewGRPCService("unix:///tmp/testprovider.sock", 3*time.Second) serviceKMSv2, err := envelopekmsv2.NewGRPCService(ctx, "unix:///tmp/testprovider.sock", 3*time.Second)
if err != nil { if err != nil {
t.Fatalf("Could not initialize kmsv2 envelopeService, error: %v", err) t.Fatalf("Could not initialize kmsv2 envelopeService, error: %v", err)
} }
@ -347,7 +350,7 @@ func TestKMSPluginHealthz(t *testing.T) {
for _, tt := range testCases { for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) { t.Run(tt.desc, func(t *testing.T) {
got, err := getKMSPluginProbes(mustConfigReader(t, tt.config)) got, err := getKMSPluginProbes(mustConfigReader(t, tt.config), ctx.Done())
if err != nil && !tt.wantErr { if err != nil && !tt.wantErr {
t.Fatalf("got %v, want nil for error", err) t.Fatalf("got %v, want nil for error", err)
} }
@ -360,7 +363,9 @@ func TestKMSPluginHealthz(t *testing.T) {
} }
func TestKMSPluginHealthzTTL(t *testing.T) { func TestKMSPluginHealthzTTL(t *testing.T) {
service, _ := newMockEnvelopeService("unix:///tmp/testprovider.sock", 3*time.Second) ctx := testContext(t)
service, _ := newMockEnvelopeService(ctx, "unix:///tmp/testprovider.sock", 3*time.Second)
errService, _ := newMockErrorEnvelopeService("unix:///tmp/testprovider.sock", 3*time.Second) errService, _ := newMockErrorEnvelopeService("unix:///tmp/testprovider.sock", 3*time.Second)
testCases := []struct { testCases := []struct {
@ -403,7 +408,9 @@ func TestKMSPluginHealthzTTL(t *testing.T) {
} }
func TestKMSv2PluginHealthzTTL(t *testing.T) { func TestKMSv2PluginHealthzTTL(t *testing.T) {
service, _ := newMockEnvelopeKMSv2Service("unix:///tmp/testprovider.sock", 3*time.Second) ctx := testContext(t)
service, _ := newMockEnvelopeKMSv2Service(ctx, "unix:///tmp/testprovider.sock", 3*time.Second)
errService, _ := newMockErrorEnvelopeKMSv2Service("unix:///tmp/testprovider.sock", 3*time.Second) errService, _ := newMockErrorEnvelopeKMSv2Service("unix:///tmp/testprovider.sock", 3*time.Second)
testCases := []struct { testCases := []struct {
@ -529,8 +536,10 @@ func testCBCKeyRotationWithProviders(t *testing.T, firstEncryptionConfig, firstP
} }
func getTransformerFromEncryptionConfig(t *testing.T, encryptionConfigPath string) value.Transformer { func getTransformerFromEncryptionConfig(t *testing.T, encryptionConfigPath string) value.Transformer {
ctx := testContext(t)
t.Helper() t.Helper()
transformers, err := parseEncryptionConfiguration(mustConfigReader(t, encryptionConfigPath)) transformers, err := parseEncryptionConfiguration(mustConfigReader(t, encryptionConfigPath), ctx.Done())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -577,3 +586,9 @@ func TestIsKMSv2ProviderHealthyError(t *testing.T) {
}) })
} }
} }
func testContext(t *testing.T) context.Context {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
return ctx
}

View File

@ -200,7 +200,7 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error {
transformerOverrides := make(map[schema.GroupResource]value.Transformer) transformerOverrides := make(map[schema.GroupResource]value.Transformer)
if len(s.EncryptionProviderConfigFilepath) > 0 { if len(s.EncryptionProviderConfigFilepath) > 0 {
var err error var err error
transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath) transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath, c.DrainedNotify())
if err != nil { if err != nil {
return err return err
} }
@ -246,7 +246,7 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
})) }))
if s.EncryptionProviderConfigFilepath != "" { if s.EncryptionProviderConfigFilepath != "" {
kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath) kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath, c.DrainedNotify())
if err != nil { if err != nil {
return err return err
} }

View File

@ -24,13 +24,13 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/klog/v2"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util" "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util"
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1" kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1"
"k8s.io/klog/v2"
) )
const ( const (
@ -52,7 +52,7 @@ type gRPCService struct {
} }
// NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider. // NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider.
func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) { func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Duration) (Service, error) {
klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint) klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint)
addr, err := util.ParseEndpoint(endpoint) addr, err := util.ParseEndpoint(endpoint)
@ -84,6 +84,14 @@ func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error)
} }
s.kmsClient = kmsapi.NewKeyManagementServiceClient(s.connection) s.kmsClient = kmsapi.NewKeyManagementServiceClient(s.connection)
go func() {
defer utilruntime.HandleCrash()
<-ctx.Done()
_ = s.connection.Close()
}()
return s, nil return s, nil
} }

View File

@ -21,6 +21,7 @@ limitations under the License.
package envelope package envelope
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
@ -56,7 +57,9 @@ func TestKMSPluginLateStart(t *testing.T) {
callTimeout := 3 * time.Second callTimeout := 3 * time.Second
s := newEndpoint() s := newEndpoint()
service, err := NewGRPCService(s.endpoint, callTimeout) ctx := testContext(t)
service, err := NewGRPCService(ctx, s.endpoint, callTimeout)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
@ -132,12 +135,14 @@ func TestTimeouts(t *testing.T) {
testCompletedWG.Add(1) testCompletedWG.Add(1)
defer testCompletedWG.Done() defer testCompletedWG.Done()
ctx := testContext(t)
kubeAPIServerWG.Add(1) kubeAPIServerWG.Add(1)
go func() { go func() {
// Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase. // Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase.
time.Sleep(tt.kubeAPIServerDelay) time.Sleep(tt.kubeAPIServerDelay)
service, err = NewGRPCService(socketName.endpoint, tt.callTimeout) service, err = NewGRPCService(ctx, socketName.endpoint, tt.callTimeout)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
@ -203,8 +208,10 @@ func TestIntermittentConnectionLoss(t *testing.T) {
t.Fatalf("Failed to start kms-plugin, err: %v", err) t.Fatalf("Failed to start kms-plugin, err: %v", err)
} }
ctx := testContext(t)
// connect to kms plugin // connect to kms plugin
service, err := NewGRPCService(endpoint.endpoint, timeout) service, err := NewGRPCService(ctx, endpoint.endpoint, timeout)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
@ -271,7 +278,9 @@ func TestUnsupportedVersion(t *testing.T) {
} }
defer f.CleanUp() defer f.CleanUp()
s, err := NewGRPCService(endpoint.endpoint, 1*time.Second) ctx := testContext(t)
s, err := NewGRPCService(ctx, endpoint.endpoint, 1*time.Second)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -285,7 +294,7 @@ func TestUnsupportedVersion(t *testing.T) {
destroyService(s) destroyService(s)
s, err = NewGRPCService(endpoint.endpoint, 1*time.Second) s, err = NewGRPCService(ctx, endpoint.endpoint, 1*time.Second)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -312,8 +321,10 @@ func TestGRPCService(t *testing.T) {
} }
defer f.CleanUp() defer f.CleanUp()
ctx := testContext(t)
// Create the gRPC client service. // Create the gRPC client service.
service, err := NewGRPCService(endpoint.endpoint, 1*time.Second) service, err := NewGRPCService(ctx, endpoint.endpoint, 1*time.Second)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
@ -351,8 +362,10 @@ func TestGRPCServiceConcurrentAccess(t *testing.T) {
} }
defer f.CleanUp() defer f.CleanUp()
ctx := testContext(t)
// Create the gRPC client service. // Create the gRPC client service.
service, err := NewGRPCService(endpoint.endpoint, 15*time.Second) service, err := NewGRPCService(ctx, endpoint.endpoint, 15*time.Second)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
@ -406,6 +419,8 @@ func TestInvalidConfiguration(t *testing.T) {
} }
defer f.CleanUp() defer f.CleanUp()
ctx := testContext(t)
invalidConfigs := []struct { invalidConfigs := []struct {
name string name string
endpoint string endpoint string
@ -416,10 +431,16 @@ func TestInvalidConfiguration(t *testing.T) {
for _, testCase := range invalidConfigs { for _, testCase := range invalidConfigs {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
_, err := NewGRPCService(testCase.endpoint, 1*time.Second) _, err := NewGRPCService(ctx, testCase.endpoint, 1*time.Second)
if err == nil { if err == nil {
t.Fatalf("should fail to create envelope service for %s.", testCase.name) t.Fatalf("should fail to create envelope service for %s.", testCase.name)
} }
}) })
} }
} }
func testContext(t *testing.T) context.Context {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
return ctx
}

View File

@ -23,12 +23,12 @@ import (
"net" "net"
"time" "time"
"k8s.io/klog/v2"
"google.golang.org/grpc" "google.golang.org/grpc"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util" "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util"
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1" kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1"
"k8s.io/klog/v2"
) )
const ( const (
@ -44,7 +44,7 @@ type gRPCService struct {
} }
// NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider. // NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider.
func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) { func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Duration) (Service, error) {
klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint) klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint)
addr, err := util.ParseEndpoint(endpoint) addr, err := util.ParseEndpoint(endpoint)
@ -75,6 +75,14 @@ func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error)
} }
s.kmsClient = kmsapi.NewKeyManagementServiceClient(s.connection) s.kmsClient = kmsapi.NewKeyManagementServiceClient(s.connection)
go func() {
defer utilruntime.HandleCrash()
<-ctx.Done()
_ = s.connection.Close()
}()
return s, nil return s, nil
} }

View File

@ -54,7 +54,9 @@ func TestKMSPluginLateStart(t *testing.T) {
callTimeout := 3 * time.Second callTimeout := 3 * time.Second
s := newEndpoint() s := newEndpoint()
service, err := NewGRPCService(s.endpoint, callTimeout) ctx := testContext(t)
service, err := NewGRPCService(ctx, s.endpoint, callTimeout)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
@ -72,7 +74,7 @@ func TestKMSPluginLateStart(t *testing.T) {
data := []byte("test data") data := []byte("test data")
uid := string(uuid.NewUUID()) uid := string(uuid.NewUUID())
_, err = service.Encrypt(context.Background(), uid, data) _, err = service.Encrypt(ctx, uid, data)
if err != nil { if err != nil {
t.Fatalf("failed when execute encrypt, error: %v", err) t.Fatalf("failed when execute encrypt, error: %v", err)
} }
@ -131,12 +133,14 @@ func TestTimeouts(t *testing.T) {
testCompletedWG.Add(1) testCompletedWG.Add(1)
defer testCompletedWG.Done() defer testCompletedWG.Done()
ctx := testContext(t)
kubeAPIServerWG.Add(1) kubeAPIServerWG.Add(1)
go func() { go func() {
// Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase. // Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase.
time.Sleep(tt.kubeAPIServerDelay) time.Sleep(tt.kubeAPIServerDelay)
service, err = NewGRPCService(socketName.endpoint, tt.callTimeout) service, err = NewGRPCService(ctx, socketName.endpoint, tt.callTimeout)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
@ -165,7 +169,7 @@ func TestTimeouts(t *testing.T) {
}() }()
kubeAPIServerWG.Wait() kubeAPIServerWG.Wait()
_, err = service.Encrypt(context.Background(), uid, data) _, err = service.Encrypt(ctx, uid, data)
if err == nil && tt.wantErr != "" { if err == nil && tt.wantErr != "" {
t.Fatalf("got nil, want %s", tt.wantErr) t.Fatalf("got nil, want %s", tt.wantErr)
@ -203,14 +207,15 @@ func TestIntermittentConnectionLoss(t *testing.T) {
t.Fatalf("Failed to start kms-plugin, err: %v", err) t.Fatalf("Failed to start kms-plugin, err: %v", err)
} }
ctx := testContext(t)
// connect to kms plugin // connect to kms plugin
service, err := NewGRPCService(endpoint.endpoint, timeout) service, err := NewGRPCService(ctx, endpoint.endpoint, timeout)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
defer destroyService(service) defer destroyService(service)
ctx := context.Background()
_, err = service.Encrypt(ctx, uid, data) _, err = service.Encrypt(ctx, uid, data)
if err != nil { if err != nil {
t.Fatalf("failed when execute encrypt, error: %v", err) t.Fatalf("failed when execute encrypt, error: %v", err)
@ -269,14 +274,15 @@ func TestGRPCService(t *testing.T) {
} }
defer f.CleanUp() defer f.CleanUp()
ctx := testContext(t)
// Create the gRPC client service. // Create the gRPC client service.
service, err := NewGRPCService(endpoint.endpoint, 1*time.Second) service, err := NewGRPCService(ctx, endpoint.endpoint, 1*time.Second)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
defer destroyService(service) defer destroyService(service)
ctx := context.Background()
// Call service to encrypt data. // Call service to encrypt data.
data := []byte("test data") data := []byte("test data")
uid := string(uuid.NewUUID()) uid := string(uuid.NewUUID())
@ -311,14 +317,15 @@ func TestGRPCServiceConcurrentAccess(t *testing.T) {
} }
defer f.CleanUp() defer f.CleanUp()
ctx := testContext(t)
// Create the gRPC client service. // Create the gRPC client service.
service, err := NewGRPCService(endpoint.endpoint, 15*time.Second) service, err := NewGRPCService(ctx, endpoint.endpoint, 15*time.Second)
if err != nil { if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err) t.Fatalf("failed to create envelope service, error: %v", err)
} }
defer destroyService(service) defer destroyService(service)
ctx := context.Background()
var wg sync.WaitGroup var wg sync.WaitGroup
n := 100 n := 100
wg.Add(n) wg.Add(n)
@ -369,6 +376,8 @@ func TestInvalidConfiguration(t *testing.T) {
} }
defer f.CleanUp() defer f.CleanUp()
ctx := testContext(t)
invalidConfigs := []struct { invalidConfigs := []struct {
name string name string
endpoint string endpoint string
@ -379,10 +388,16 @@ func TestInvalidConfiguration(t *testing.T) {
for _, testCase := range invalidConfigs { for _, testCase := range invalidConfigs {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
_, err := NewGRPCService(testCase.endpoint, 1*time.Second) _, err := NewGRPCService(ctx, testCase.endpoint, 1*time.Second)
if err == nil { if err == nil {
t.Fatalf("should fail to create envelope service for %s.", testCase.name) t.Fatalf("should fail to create envelope service for %s.", testCase.name)
} }
}) })
} }
} }
func testContext(t *testing.T) context.Context {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
return ctx
}

View File

@ -198,7 +198,7 @@ func EtcdMain(tests func() int) {
// like k8s.io/klog/v2.(*loggingT).flushDaemon() // like k8s.io/klog/v2.(*loggingT).flushDaemon()
// TODO(#108483): Reduce this number once we address the // TODO(#108483): Reduce this number once we address the
// couple remaining issues. // couple remaining issues.
if dg := runtime.NumGoroutine() - before; dg <= 15 { if dg := runtime.NumGoroutine() - before; dg <= 9 {
return true, nil return true, nil
} }
// Allow goroutines to schedule and die off. // Allow goroutines to schedule and die off.