From 4e68e9b5ad70ae074b3fb20f0fb2ba25d0792274 Mon Sep 17 00:00:00 2001 From: Monis Khan Date: Wed, 24 Aug 2022 01:51:19 +0000 Subject: [PATCH] kms: fix go routine leak in gRPC connection Signed-off-by: Monis Khan --- cmd/kube-apiserver/app/server.go | 2 +- .../default_storage_factory_builder.go | 4 +- pkg/registry/registrytest/etcd.go | 6 ++- .../server/options/encryptionconfig/config.go | 36 +++++++++------ .../options/encryptionconfig/config_test.go | 45 ++++++++++++------- .../apiserver/pkg/server/options/etcd.go | 4 +- .../value/encrypt/envelope/grpc_service.go | 14 ++++-- .../envelope/grpc_service_unix_test.go | 37 +++++++++++---- .../encrypt/envelope/kmsv2/grpc_service.go | 14 ++++-- .../envelope/kmsv2/grpc_service_unix_test.go | 37 ++++++++++----- test/integration/framework/etcd.go | 2 +- 11 files changed, 140 insertions(+), 61 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index f05bb831e11..45e8c6eafa0 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -412,7 +412,7 @@ func buildGenericConfig( lastErr = err return } - storageFactory, lastErr = completedStorageFactoryConfig.New() + storageFactory, lastErr = completedStorageFactoryConfig.New(genericConfig.DrainedNotify()) if lastErr != nil { return } diff --git a/pkg/kubeapiserver/default_storage_factory_builder.go b/pkg/kubeapiserver/default_storage_factory_builder.go index 0d1ac4c8caf..1ad90354b8b 100644 --- a/pkg/kubeapiserver/default_storage_factory_builder.go +++ b/pkg/kubeapiserver/default_storage_factory_builder.go @@ -111,7 +111,7 @@ type completedStorageFactoryConfig struct { } // New returns a new storage factory created from the completed storage factory configuration. -func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error) { +func (c *completedStorageFactoryConfig) New(stopCh <-chan struct{}) (*serverstorage.DefaultStorageFactory, error) { resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides) storageFactory := serverstorage.NewDefaultStorageFactory( c.StorageConfig, @@ -142,7 +142,7 @@ func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFact storageFactory.SetEtcdLocation(groupResource, servers) } if len(c.EncryptionProviderConfigFilepath) != 0 { - transformerOverrides, err := encryptionconfig.GetTransformerOverrides(c.EncryptionProviderConfigFilepath) + transformerOverrides, err := encryptionconfig.GetTransformerOverrides(c.EncryptionProviderConfigFilepath, stopCh) if err != nil { return nil, err } diff --git a/pkg/registry/registrytest/etcd.go b/pkg/registry/registrytest/etcd.go index 8da7fc1370f..e635f530a60 100644 --- a/pkg/registry/registrytest/etcd.go +++ b/pkg/registry/registrytest/etcd.go @@ -17,6 +17,7 @@ limitations under the License. package registrytest import ( + "context" "testing" "k8s.io/apimachinery/pkg/runtime/schema" @@ -35,6 +36,9 @@ func NewEtcdStorage(t *testing.T, group string) (*storagebackend.ConfigForResour func NewEtcdStorageForResource(t *testing.T, resource schema.GroupResource) (*storagebackend.ConfigForResource, *etcd3testing.EtcdTestServer) { t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + server, config := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) options := options.NewEtcdOptions(config) @@ -43,7 +47,7 @@ func NewEtcdStorageForResource(t *testing.T, resource schema.GroupResource) (*st t.Fatal(err) } completedConfig.APIResourceConfig = serverstorage.NewResourceConfig() - factory, err := completedConfig.New() + factory, err := completedConfig.New(ctx.Done()) if err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go index 4c0048bf0ea..c2b167104f1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config.go @@ -33,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" apiserverconfig "k8s.io/apiserver/pkg/apis/config" apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1" "k8s.io/apiserver/pkg/apis/config/validation" @@ -93,7 +95,7 @@ func (p *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker { } // GetKMSPluginHealthzCheckers extracts KMSPluginProbes from the EncryptionConfig. -func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, error) { +func GetKMSPluginHealthzCheckers(filepath string, stopCh <-chan struct{}) ([]healthz.HealthChecker, error) { f, err := os.Open(filepath) if err != nil { return nil, fmt.Errorf("error opening encryption provider configuration file %q: %v", filepath, err) @@ -101,7 +103,7 @@ func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, erro defer f.Close() var result []healthz.HealthChecker - probes, err := getKMSPluginProbes(f) + probes, err := getKMSPluginProbes(f, stopCh) if err != nil { return nil, err } @@ -120,7 +122,10 @@ func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, erro return result, nil } -func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) { +func getKMSPluginProbes(reader io.Reader, stopCh <-chan struct{}) ([]interface{}, error) { + // we ignore the cancel func because this context should only be canceled when stopCh is closed + ctx, _ := wait.ContextForChannel(stopCh) + var result []interface{} configFileContents, err := io.ReadAll(reader) @@ -138,7 +143,7 @@ func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) { if p.KMS != nil { switch p.KMS.APIVersion { case kmsAPIVersionV1: - s, err := envelope.NewGRPCService(p.KMS.Endpoint, p.KMS.Timeout.Duration) + s, err := envelope.NewGRPCService(ctx, p.KMS.Endpoint, p.KMS.Timeout.Duration) if err != nil { return nil, fmt.Errorf("could not configure KMSv1-Plugin's probe %q, error: %v", p.KMS.Name, err) } @@ -156,7 +161,7 @@ func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) { return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, KMSv2 feature is not enabled", p.KMS.Name) } - s, err := envelopekmsv2.NewGRPCService(p.KMS.Endpoint, p.KMS.Timeout.Duration) + s, err := envelopekmsv2.NewGRPCService(ctx, p.KMS.Endpoint, p.KMS.Timeout.Duration) if err != nil { return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %v", p.KMS.Name, err) } @@ -254,21 +259,21 @@ func isKMSv2ProviderHealthy(name string, response *envelopekmsv2.StatusResponse) } // GetTransformerOverrides returns the transformer overrides by reading and parsing the encryption provider configuration file -func GetTransformerOverrides(filepath string) (map[schema.GroupResource]value.Transformer, error) { +func GetTransformerOverrides(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) { f, err := os.Open(filepath) if err != nil { return nil, fmt.Errorf("error opening encryption provider configuration file %q: %v", filepath, err) } defer f.Close() - result, err := parseEncryptionConfiguration(f) + result, err := parseEncryptionConfiguration(f, stopCh) if err != nil { return nil, fmt.Errorf("error while parsing encryption provider configuration file %q: %v", filepath, err) } return result, nil } -func parseEncryptionConfiguration(f io.Reader) (map[schema.GroupResource]value.Transformer, error) { +func parseEncryptionConfiguration(f io.Reader, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) { configFileContents, err := io.ReadAll(f) if err != nil { return nil, fmt.Errorf("could not read contents: %v", err) @@ -283,7 +288,7 @@ func parseEncryptionConfiguration(f io.Reader) (map[schema.GroupResource]value.T // For each entry in the configuration for _, resourceConfig := range config.Resources { - transformers, err := prefixTransformers(&resourceConfig) + transformers, err := prefixTransformers(&resourceConfig, stopCh) if err != nil { return nil, err } @@ -307,8 +312,8 @@ func parseEncryptionConfiguration(f io.Reader) (map[schema.GroupResource]value.T func loadConfig(data []byte) (*apiserverconfig.EncryptionConfiguration, error) { scheme := runtime.NewScheme() codecs := serializer.NewCodecFactory(scheme) - apiserverconfig.AddToScheme(scheme) - apiserverconfigv1.AddToScheme(scheme) + utilruntime.Must(apiserverconfig.AddToScheme(scheme)) + utilruntime.Must(apiserverconfigv1.AddToScheme(scheme)) configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil) if err != nil { @@ -330,7 +335,10 @@ var ( envelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService ) -func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value.PrefixTransformer, error) { +func prefixTransformers(config *apiserverconfig.ResourceConfiguration, stopCh <-chan struct{}) ([]value.PrefixTransformer, error) { + // we ignore the cancel func because this context should only be canceled when stopCh is closed + ctx, _ := wait.ContextForChannel(stopCh) + var result []value.PrefixTransformer for _, provider := range config.Providers { var ( @@ -349,7 +357,7 @@ func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value. switch provider.KMS.APIVersion { case kmsAPIVersionV1: var envelopeService envelope.Service - if envelopeService, err = envelopeServiceFactory(provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil { + if envelopeService, err = envelopeServiceFactory(ctx, provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil { return nil, fmt.Errorf("could not configure KMS plugin %q, error: %v", provider.KMS.Name, err) } transformer, err = envelopePrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV1) @@ -359,7 +367,7 @@ func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value. } var envelopeService envelopekmsv2.Service - if envelopeService, err = envelopeKMSv2ServiceFactory(provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil { + if envelopeService, err = envelopeKMSv2ServiceFactory(ctx, provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil { return nil, fmt.Errorf("could not configure KMSv2 plugin %q, error: %v", provider.KMS.Name, err) } transformer, err = envelopekmsv2PrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV2) diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go index 7136dc36cf7..aba6f65d2d7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/encryptionconfig/config_test.go @@ -115,7 +115,7 @@ func (t *testKMSv2EnvelopeService) Status(ctx context.Context) (*envelopekmsv2.S } // The factory method to create mock envelope service. -func newMockEnvelopeService(endpoint string, timeout time.Duration) (envelope.Service, error) { +func newMockEnvelopeService(ctx context.Context, endpoint string, timeout time.Duration) (envelope.Service, error) { return &testEnvelopeService{nil}, nil } @@ -125,7 +125,7 @@ func newMockErrorEnvelopeService(endpoint string, timeout time.Duration) (envelo } // The factory method to create mock envelope kmsv2 service. -func newMockEnvelopeKMSv2Service(endpoint string, timeout time.Duration) (envelopekmsv2.Service, error) { +func newMockEnvelopeKMSv2Service(ctx context.Context, endpoint string, timeout time.Duration) (envelopekmsv2.Service, error) { return &testKMSv2EnvelopeService{nil}, nil } @@ -193,41 +193,43 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) { envelopeKMSv2ServiceFactory = factoryKMSv2 }() + ctx := testContext(t) + // Creates compound/prefix transformers with different ordering of available transformers. // Transforms data using one of them, and tries to untransform using the others. // Repeats this for all possible combinations. correctConfigWithIdentityFirst := "testdata/valid-configs/identity-first.yaml" - identityFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithIdentityFirst)) + identityFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithIdentityFirst), ctx.Done()) if err != nil { t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithIdentityFirst) } correctConfigWithAesGcmFirst := "testdata/valid-configs/aes-gcm-first.yaml" - aesGcmFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesGcmFirst)) + aesGcmFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesGcmFirst), ctx.Done()) if err != nil { t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesGcmFirst) } correctConfigWithAesCbcFirst := "testdata/valid-configs/aes-cbc-first.yaml" - aesCbcFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesCbcFirst)) + aesCbcFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesCbcFirst), ctx.Done()) if err != nil { t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesCbcFirst) } correctConfigWithSecretboxFirst := "testdata/valid-configs/secret-box-first.yaml" - secretboxFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithSecretboxFirst)) + secretboxFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithSecretboxFirst), ctx.Done()) if err != nil { t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithSecretboxFirst) } correctConfigWithKMSFirst := "testdata/valid-configs/kms-first.yaml" - kmsFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSFirst)) + kmsFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSFirst), ctx.Done()) if err != nil { t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSFirst) } correctConfigWithKMSv2First := "testdata/valid-configs/kmsv2-first.yaml" - kmsv2FirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSv2First)) + kmsv2FirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSv2First), ctx.Done()) if err != nil { t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSv2First) } @@ -240,7 +242,6 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) { kmsFirstTransformer := kmsFirstTransformerOverrides[schema.ParseGroupResource("secrets")] kmsv2FirstTransformer := kmsv2FirstTransformerOverrides[schema.ParseGroupResource("secrets")] - ctx := context.Background() dataCtx := value.DefaultContext([]byte(sampleContextText)) originalText := []byte(sampleText) @@ -280,11 +281,13 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) { func TestKMSPluginHealthz(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)() - service, err := envelope.NewGRPCService("unix:///tmp/testprovider.sock", 3*time.Second) + ctx := testContext(t) + + service, err := envelope.NewGRPCService(ctx, "unix:///tmp/testprovider.sock", 3*time.Second) if err != nil { t.Fatalf("Could not initialize envelopeService, error: %v", err) } - serviceKMSv2, err := envelopekmsv2.NewGRPCService("unix:///tmp/testprovider.sock", 3*time.Second) + serviceKMSv2, err := envelopekmsv2.NewGRPCService(ctx, "unix:///tmp/testprovider.sock", 3*time.Second) if err != nil { t.Fatalf("Could not initialize kmsv2 envelopeService, error: %v", err) } @@ -347,7 +350,7 @@ func TestKMSPluginHealthz(t *testing.T) { for _, tt := range testCases { t.Run(tt.desc, func(t *testing.T) { - got, err := getKMSPluginProbes(mustConfigReader(t, tt.config)) + got, err := getKMSPluginProbes(mustConfigReader(t, tt.config), ctx.Done()) if err != nil && !tt.wantErr { t.Fatalf("got %v, want nil for error", err) } @@ -360,7 +363,9 @@ func TestKMSPluginHealthz(t *testing.T) { } func TestKMSPluginHealthzTTL(t *testing.T) { - service, _ := newMockEnvelopeService("unix:///tmp/testprovider.sock", 3*time.Second) + ctx := testContext(t) + + service, _ := newMockEnvelopeService(ctx, "unix:///tmp/testprovider.sock", 3*time.Second) errService, _ := newMockErrorEnvelopeService("unix:///tmp/testprovider.sock", 3*time.Second) testCases := []struct { @@ -403,7 +408,9 @@ func TestKMSPluginHealthzTTL(t *testing.T) { } func TestKMSv2PluginHealthzTTL(t *testing.T) { - service, _ := newMockEnvelopeKMSv2Service("unix:///tmp/testprovider.sock", 3*time.Second) + ctx := testContext(t) + + service, _ := newMockEnvelopeKMSv2Service(ctx, "unix:///tmp/testprovider.sock", 3*time.Second) errService, _ := newMockErrorEnvelopeKMSv2Service("unix:///tmp/testprovider.sock", 3*time.Second) testCases := []struct { @@ -529,8 +536,10 @@ func testCBCKeyRotationWithProviders(t *testing.T, firstEncryptionConfig, firstP } func getTransformerFromEncryptionConfig(t *testing.T, encryptionConfigPath string) value.Transformer { + ctx := testContext(t) + t.Helper() - transformers, err := parseEncryptionConfiguration(mustConfigReader(t, encryptionConfigPath)) + transformers, err := parseEncryptionConfiguration(mustConfigReader(t, encryptionConfigPath), ctx.Done()) if err != nil { t.Fatal(err) } @@ -577,3 +586,9 @@ func TestIsKMSv2ProviderHealthyError(t *testing.T) { }) } } + +func testContext(t *testing.T) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return ctx +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 00c43324b6f..380d44d94bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -200,7 +200,7 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error { transformerOverrides := make(map[schema.GroupResource]value.Transformer) if len(s.EncryptionProviderConfigFilepath) > 0 { var err error - transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath) + transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath, c.DrainedNotify()) if err != nil { return err } @@ -246,7 +246,7 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { })) if s.EncryptionProviderConfigFilepath != "" { - kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath) + kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath, c.DrainedNotify()) if err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/grpc_service.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/grpc_service.go index c5304cd09f2..e1302e93c44 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/grpc_service.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/grpc_service.go @@ -24,13 +24,13 @@ import ( "sync" "time" - "k8s.io/klog/v2" - "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util" kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1" + "k8s.io/klog/v2" ) const ( @@ -52,7 +52,7 @@ type gRPCService struct { } // NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider. -func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) { +func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Duration) (Service, error) { klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint) addr, err := util.ParseEndpoint(endpoint) @@ -84,6 +84,14 @@ func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) } s.kmsClient = kmsapi.NewKeyManagementServiceClient(s.connection) + + go func() { + defer utilruntime.HandleCrash() + + <-ctx.Done() + _ = s.connection.Close() + }() + return s, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/grpc_service_unix_test.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/grpc_service_unix_test.go index c8e42408f31..1fa2c21f21a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/grpc_service_unix_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/grpc_service_unix_test.go @@ -21,6 +21,7 @@ limitations under the License. package envelope import ( + "context" "fmt" "reflect" "sync" @@ -56,7 +57,9 @@ func TestKMSPluginLateStart(t *testing.T) { callTimeout := 3 * time.Second s := newEndpoint() - service, err := NewGRPCService(s.endpoint, callTimeout) + ctx := testContext(t) + + service, err := NewGRPCService(ctx, s.endpoint, callTimeout) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } @@ -132,12 +135,14 @@ func TestTimeouts(t *testing.T) { testCompletedWG.Add(1) defer testCompletedWG.Done() + ctx := testContext(t) + kubeAPIServerWG.Add(1) go func() { // Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase. time.Sleep(tt.kubeAPIServerDelay) - service, err = NewGRPCService(socketName.endpoint, tt.callTimeout) + service, err = NewGRPCService(ctx, socketName.endpoint, tt.callTimeout) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } @@ -203,8 +208,10 @@ func TestIntermittentConnectionLoss(t *testing.T) { t.Fatalf("Failed to start kms-plugin, err: %v", err) } + ctx := testContext(t) + // connect to kms plugin - service, err := NewGRPCService(endpoint.endpoint, timeout) + service, err := NewGRPCService(ctx, endpoint.endpoint, timeout) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } @@ -271,7 +278,9 @@ func TestUnsupportedVersion(t *testing.T) { } defer f.CleanUp() - s, err := NewGRPCService(endpoint.endpoint, 1*time.Second) + ctx := testContext(t) + + s, err := NewGRPCService(ctx, endpoint.endpoint, 1*time.Second) if err != nil { t.Fatal(err) } @@ -285,7 +294,7 @@ func TestUnsupportedVersion(t *testing.T) { destroyService(s) - s, err = NewGRPCService(endpoint.endpoint, 1*time.Second) + s, err = NewGRPCService(ctx, endpoint.endpoint, 1*time.Second) if err != nil { t.Fatal(err) } @@ -312,8 +321,10 @@ func TestGRPCService(t *testing.T) { } defer f.CleanUp() + ctx := testContext(t) + // Create the gRPC client service. - service, err := NewGRPCService(endpoint.endpoint, 1*time.Second) + service, err := NewGRPCService(ctx, endpoint.endpoint, 1*time.Second) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } @@ -351,8 +362,10 @@ func TestGRPCServiceConcurrentAccess(t *testing.T) { } defer f.CleanUp() + ctx := testContext(t) + // Create the gRPC client service. - service, err := NewGRPCService(endpoint.endpoint, 15*time.Second) + service, err := NewGRPCService(ctx, endpoint.endpoint, 15*time.Second) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } @@ -406,6 +419,8 @@ func TestInvalidConfiguration(t *testing.T) { } defer f.CleanUp() + ctx := testContext(t) + invalidConfigs := []struct { name string endpoint string @@ -416,10 +431,16 @@ func TestInvalidConfiguration(t *testing.T) { for _, testCase := range invalidConfigs { t.Run(testCase.name, func(t *testing.T) { - _, err := NewGRPCService(testCase.endpoint, 1*time.Second) + _, err := NewGRPCService(ctx, testCase.endpoint, 1*time.Second) if err == nil { t.Fatalf("should fail to create envelope service for %s.", testCase.name) } }) } } + +func testContext(t *testing.T) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return ctx +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/grpc_service.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/grpc_service.go index b96fce4aa46..db49851f2b6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/grpc_service.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/grpc_service.go @@ -23,12 +23,12 @@ import ( "net" "time" - "k8s.io/klog/v2" - "google.golang.org/grpc" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util" kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1" + "k8s.io/klog/v2" ) const ( @@ -44,7 +44,7 @@ type gRPCService struct { } // NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider. -func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) { +func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Duration) (Service, error) { klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint) addr, err := util.ParseEndpoint(endpoint) @@ -75,6 +75,14 @@ func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) } s.kmsClient = kmsapi.NewKeyManagementServiceClient(s.connection) + + go func() { + defer utilruntime.HandleCrash() + + <-ctx.Done() + _ = s.connection.Close() + }() + return s, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/grpc_service_unix_test.go b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/grpc_service_unix_test.go index 9482a794f3d..41c3ca9670a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/grpc_service_unix_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/grpc_service_unix_test.go @@ -54,7 +54,9 @@ func TestKMSPluginLateStart(t *testing.T) { callTimeout := 3 * time.Second s := newEndpoint() - service, err := NewGRPCService(s.endpoint, callTimeout) + ctx := testContext(t) + + service, err := NewGRPCService(ctx, s.endpoint, callTimeout) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } @@ -72,7 +74,7 @@ func TestKMSPluginLateStart(t *testing.T) { data := []byte("test data") uid := string(uuid.NewUUID()) - _, err = service.Encrypt(context.Background(), uid, data) + _, err = service.Encrypt(ctx, uid, data) if err != nil { t.Fatalf("failed when execute encrypt, error: %v", err) } @@ -131,12 +133,14 @@ func TestTimeouts(t *testing.T) { testCompletedWG.Add(1) defer testCompletedWG.Done() + ctx := testContext(t) + kubeAPIServerWG.Add(1) go func() { // Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase. time.Sleep(tt.kubeAPIServerDelay) - service, err = NewGRPCService(socketName.endpoint, tt.callTimeout) + service, err = NewGRPCService(ctx, socketName.endpoint, tt.callTimeout) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } @@ -165,7 +169,7 @@ func TestTimeouts(t *testing.T) { }() kubeAPIServerWG.Wait() - _, err = service.Encrypt(context.Background(), uid, data) + _, err = service.Encrypt(ctx, uid, data) if err == nil && tt.wantErr != "" { t.Fatalf("got nil, want %s", tt.wantErr) @@ -203,14 +207,15 @@ func TestIntermittentConnectionLoss(t *testing.T) { t.Fatalf("Failed to start kms-plugin, err: %v", err) } + ctx := testContext(t) + // connect to kms plugin - service, err := NewGRPCService(endpoint.endpoint, timeout) + service, err := NewGRPCService(ctx, endpoint.endpoint, timeout) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } defer destroyService(service) - ctx := context.Background() _, err = service.Encrypt(ctx, uid, data) if err != nil { t.Fatalf("failed when execute encrypt, error: %v", err) @@ -269,14 +274,15 @@ func TestGRPCService(t *testing.T) { } defer f.CleanUp() + ctx := testContext(t) + // Create the gRPC client service. - service, err := NewGRPCService(endpoint.endpoint, 1*time.Second) + service, err := NewGRPCService(ctx, endpoint.endpoint, 1*time.Second) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } defer destroyService(service) - ctx := context.Background() // Call service to encrypt data. data := []byte("test data") uid := string(uuid.NewUUID()) @@ -311,14 +317,15 @@ func TestGRPCServiceConcurrentAccess(t *testing.T) { } defer f.CleanUp() + ctx := testContext(t) + // Create the gRPC client service. - service, err := NewGRPCService(endpoint.endpoint, 15*time.Second) + service, err := NewGRPCService(ctx, endpoint.endpoint, 15*time.Second) if err != nil { t.Fatalf("failed to create envelope service, error: %v", err) } defer destroyService(service) - ctx := context.Background() var wg sync.WaitGroup n := 100 wg.Add(n) @@ -369,6 +376,8 @@ func TestInvalidConfiguration(t *testing.T) { } defer f.CleanUp() + ctx := testContext(t) + invalidConfigs := []struct { name string endpoint string @@ -379,10 +388,16 @@ func TestInvalidConfiguration(t *testing.T) { for _, testCase := range invalidConfigs { t.Run(testCase.name, func(t *testing.T) { - _, err := NewGRPCService(testCase.endpoint, 1*time.Second) + _, err := NewGRPCService(ctx, testCase.endpoint, 1*time.Second) if err == nil { t.Fatalf("should fail to create envelope service for %s.", testCase.name) } }) } } + +func testContext(t *testing.T) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return ctx +} diff --git a/test/integration/framework/etcd.go b/test/integration/framework/etcd.go index c26c91a56a8..3a9844b843f 100644 --- a/test/integration/framework/etcd.go +++ b/test/integration/framework/etcd.go @@ -198,7 +198,7 @@ func EtcdMain(tests func() int) { // like k8s.io/klog/v2.(*loggingT).flushDaemon() // TODO(#108483): Reduce this number once we address the // couple remaining issues. - if dg := runtime.NumGoroutine() - before; dg <= 15 { + if dg := runtime.NumGoroutine() - before; dg <= 9 { return true, nil } // Allow goroutines to schedule and die off.