From 8b84a793b39fed2a62af0876b2eda461a68008c9 Mon Sep 17 00:00:00 2001 From: Ravi Gudimetla Date: Mon, 7 Mar 2022 09:20:45 -0500 Subject: [PATCH 1/2] API Server Changes This commit includes all the changes needed for APIServer. Instead of modifying the existing signatures for the methods which either generate or return stopChannel, we generate a context from the channel and use the generated context to be passed to the controllers which are started in APIServer. This ensures we don't have to touch APIServer dependencies. --- cmd/kubelet/app/auth.go | 13 ++++++++++- ...cluster_authentication_trust_controller.go | 12 +++++----- pkg/controlplane/instance.go | 21 +++++++++++++---- .../headerrequest/requestheader_controller.go | 14 +++++------ .../requestheader_controller_test.go | 4 +++- .../configmap_cafile_content.go | 15 ++++++------ .../dynamic_cafile_content.go | 17 +++++++------- .../dynamic_serving_content.go | 15 ++++++------ .../dynamiccertificates/union_content.go | 9 ++++---- .../pkg/server/options/authentication.go | 6 ++++- .../authentication_dynamic_request_header.go | 15 ++++++------ .../apiserver/pkg/server/secure_serving.go | 23 +++++++++++++------ .../pkg/apiserver/apiserver.go | 19 ++++++++++++--- .../pkg/apiserver/handler_proxy_test.go | 5 ++-- 14 files changed, 122 insertions(+), 66 deletions(-) diff --git a/cmd/kubelet/app/auth.go b/cmd/kubelet/app/auth.go index 8b8b2e9047f..fcb8afe1946 100644 --- a/cmd/kubelet/app/auth.go +++ b/cmd/kubelet/app/auth.go @@ -17,6 +17,7 @@ limitations under the License. package app import ( + "context" "errors" "fmt" "reflect" @@ -95,8 +96,18 @@ func BuildAuthn(client authenticationclient.AuthenticationV1Interface, authn kub } return authenticator, func(stopCh <-chan struct{}) { + // generate a context from stopCh. This is to avoid modifying files which are relying on this method + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-stopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() if dynamicCAContentFromFile != nil { - go dynamicCAContentFromFile.Run(1, stopCh) + go dynamicCAContentFromFile.Run(ctx, 1) } }, err } diff --git a/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go b/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go index 630474d6bd6..dcfb67d3887 100644 --- a/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go +++ b/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go @@ -432,7 +432,7 @@ func (c *Controller) Enqueue() { } // Run the controller until stopped. -func (c *Controller) Run(workers int, stopCh <-chan struct{}) { +func (c *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() // make sure the work queue is shutdown which will trigger workers to end defer c.queue.ShutDown() @@ -441,25 +441,25 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) { defer klog.Infof("Shutting down cluster_authentication_trust_controller controller") // we have a personal informer that is narrowly scoped, start it. - go c.kubeSystemConfigMapInformer.Run(stopCh) + go c.kubeSystemConfigMapInformer.Run(ctx.Done()) // wait for your secondary caches to fill before starting your work - if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", stopCh, c.preRunCaches...) { + if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", ctx.Done(), c.preRunCaches...) { return } // only run one worker - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // checks are cheap. run once a minute just to be sure we stay in sync in case fsnotify fails again // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. _ = wait.PollImmediateUntil(1*time.Minute, func() (bool, error) { c.queue.Add(keyFn()) return false, nil - }, stopCh) + }, ctx.Done()) // wait until we're told to stop - <-stopCh + <-ctx.Done() } func (c *Controller) runWorker() { diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index fddbf1bb604..6fcc4023747 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -452,29 +452,40 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) } controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient) + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-hookContext.StopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() + // prime values and start listeners if m.ClusterAuthenticationInfo.ClientCA != nil { m.ClusterAuthenticationInfo.ClientCA.AddListener(controller) if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok { // runonce to be sure that we have a value. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { runtime.HandleError(err) } - go controller.Run(1, hookContext.StopCh) + go controller.Run(ctx, 1) } } if m.ClusterAuthenticationInfo.RequestHeaderCA != nil { m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller) if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok { // runonce to be sure that we have a value. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { runtime.HandleError(err) } - go controller.Run(1, hookContext.StopCh) + go controller.Run(ctx, 1) } } - go controller.Run(1, hookContext.StopCh) + go controller.Run(ctx, 1) return nil }) diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go index 561b6fba9ba..d8c4090b12a 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go @@ -162,29 +162,29 @@ func (c *RequestHeaderAuthRequestController) AllowedClientNames() []string { } // Run starts RequestHeaderAuthRequestController controller and blocks until stopCh is closed. -func (c *RequestHeaderAuthRequestController) Run(workers int, stopCh <-chan struct{}) { +func (c *RequestHeaderAuthRequestController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Infof("Starting %s", c.name) defer klog.Infof("Shutting down %s", c.name) - go c.configmapInformer.Run(stopCh) + go c.configmapInformer.Run(ctx.Done()) // wait for caches to fill before starting your work - if !cache.WaitForNamedCacheSync(c.name, stopCh, c.configmapInformerSynced) { + if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.configmapInformerSynced) { return } // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) - <-stopCh + <-ctx.Done() } // // RunOnce runs a single sync loop -func (c *RequestHeaderAuthRequestController) RunOnce() error { - configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(context.TODO(), c.configmapName, metav1.GetOptions{}) +func (c *RequestHeaderAuthRequestController) RunOnce(ctx context.Context) error { + configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(ctx, c.configmapName, metav1.GetOptions{}) switch { case errors.IsNotFound(err): // ignore, authConfigMap is nil now diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go index 2577d2635b2..36dfbf1ec29 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package headerrequest import ( + "context" "encoding/json" "k8s.io/apimachinery/pkg/api/equality" "testing" @@ -221,7 +222,8 @@ func TestRequestHeaderAuthRequestControllerSyncOnce(t *testing.T) { target.client = fakeKubeClient // act - err := target.RunOnce() + ctx := context.TODO() + err := target.RunOnce(ctx) if err != nil && !scenario.expectErr { t.Errorf("got unexpected error %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go index b09474bc4fe..428fd66bae7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "fmt" "sync/atomic" @@ -186,7 +187,7 @@ func (c *ConfigMapCAController) hasCAChanged(caBundle []byte) bool { } // RunOnce runs a single sync loop -func (c *ConfigMapCAController) RunOnce() error { +func (c *ConfigMapCAController) RunOnce(ctx context.Context) error { // Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for // a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures. _ = c.loadCABundle() @@ -194,7 +195,7 @@ func (c *ConfigMapCAController) RunOnce() error { } // Run starts the kube-apiserver and blocks until stopCh is closed. -func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) { +func (c *ConfigMapCAController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -202,23 +203,23 @@ func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // we have a personal informer that is narrowly scoped, start it. - go c.configMapInformer.Run(stopCh) + go c.configMapInformer.Run(ctx.Done()) // wait for your secondary caches to fill before starting your work - if !cache.WaitForNamedCacheSync(c.name, stopCh, c.preRunCaches...) { + if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.preRunCaches...) { return } // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) { c.queue.Add(workItemKey) return false, nil - }, stopCh) + }, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *ConfigMapCAController) runWorker() { diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go index fb1515c182a..58761acd925 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "fmt" "io/ioutil" @@ -39,10 +40,10 @@ var FileRefreshDuration = 1 * time.Minute // ControllerRunner is a generic interface for starting a controller type ControllerRunner interface { // RunOnce runs the sync loop a single time. This useful for synchronous priming - RunOnce() error + RunOnce(ctx context.Context) error // Run should be called a go .Run - Run(workers int, stopCh <-chan struct{}) + Run(ctx context.Context, workers int) } // DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content @@ -144,12 +145,12 @@ func (c *DynamicFileCAContent) hasCAChanged(caBundle []byte) bool { } // RunOnce runs a single sync loop -func (c *DynamicFileCAContent) RunOnce() error { +func (c *DynamicFileCAContent) RunOnce(ctx context.Context) error { return c.loadCABundle() } // Run starts the controller and blocks until stopCh is closed. -func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { +func (c *DynamicFileCAContent) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -157,16 +158,16 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start the loop that watches the CA file until stopCh is closed. go wait.Until(func() { - if err := c.watchCAFile(stopCh); err != nil { + if err := c.watchCAFile(ctx.Done()); err != nil { klog.ErrorS(err, "Failed to watch CA file, will retry later") } - }, time.Minute, stopCh) + }, time.Minute, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error { diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go index 00117176b0a..9ff1abb6494 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go @@ -17,6 +17,7 @@ limitations under the License. package dynamiccertificates import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -119,12 +120,12 @@ func (c *DynamicCertKeyPairContent) loadCertKeyPair() error { } // RunOnce runs a single sync loop -func (c *DynamicCertKeyPairContent) RunOnce() error { +func (c *DynamicCertKeyPairContent) RunOnce(ctx context.Context) error { return c.loadCertKeyPair() } -// Run starts the controller and blocks until stopCh is closed. -func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { +// Run starts the controller and blocks until context is killed. +func (c *DynamicCertKeyPairContent) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -132,16 +133,16 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start the loop that watches the cert and key files until stopCh is closed. go wait.Until(func() { - if err := c.watchCertKeyFile(stopCh); err != nil { + if err := c.watchCertKeyFile(ctx.Done()); err != nil { klog.ErrorS(err, "Failed to watch cert and key file, will retry later") } - }, time.Minute, stopCh) + }, time.Minute, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error { diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go index e10b112bc07..57622bd34ec 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "strings" @@ -81,11 +82,11 @@ func (c unionCAContent) AddListener(listener Listener) { } // AddListener adds a listener to be notified when the CA content changes. -func (c unionCAContent) RunOnce() error { +func (c unionCAContent) RunOnce(ctx context.Context) error { errors := []error{} for _, curr := range c { if controller, ok := curr.(ControllerRunner); ok { - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { errors = append(errors, err) } } @@ -95,10 +96,10 @@ func (c unionCAContent) RunOnce() error { } // Run runs the controller -func (c unionCAContent) Run(workers int, stopCh <-chan struct{}) { +func (c unionCAContent) Run(ctx context.Context, workers int) { for _, curr := range c { if controller, ok := curr.(ControllerRunner); ok { - go controller.Run(workers, stopCh) + go controller.Run(ctx, workers) } } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go b/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go index a82b4a7391d..8ff771af080 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "strings" "time" @@ -387,7 +388,10 @@ func (s *DelegatingAuthenticationOptions) createRequestHeaderConfig(client kuber } // look up authentication configuration in the cluster and in case of an err defer to authentication-tolerate-lookup-failure flag - if err := dynamicRequestHeaderProvider.RunOnce(); err != nil { + // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the + // context is not used at all. So passing a empty context shouldn't be a problem + ctx := context.TODO() + if err := dynamicRequestHeaderProvider.RunOnce(ctx); err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go b/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go index e2beb5c2382..0dac3402187 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "k8s.io/apimachinery/pkg/util/errors" @@ -64,15 +65,15 @@ func newDynamicRequestHeaderController(client kubernetes.Interface) (*DynamicReq }, nil } -func (c *DynamicRequestHeaderController) RunOnce() error { +func (c *DynamicRequestHeaderController) RunOnce(ctx context.Context) error { errs := []error{} - errs = append(errs, c.ConfigMapCAController.RunOnce()) - errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce()) + errs = append(errs, c.ConfigMapCAController.RunOnce(ctx)) + errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce(ctx)) return errors.NewAggregate(errs) } -func (c *DynamicRequestHeaderController) Run(workers int, stopCh <-chan struct{}) { - go c.ConfigMapCAController.Run(workers, stopCh) - go c.RequestHeaderAuthRequestController.Run(workers, stopCh) - <-stopCh +func (c *DynamicRequestHeaderController) Run(ctx context.Context, workers int) { + go c.ConfigMapCAController.Run(ctx, workers) + go c.RequestHeaderAuthRequestController.Run(ctx, workers) + <-ctx.Done() } diff --git a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go index d4caa08d36a..64bcc87ebf1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go +++ b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go @@ -93,36 +93,45 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro if s.Cert != nil { s.Cert.AddListener(dynamicCertificateController) } - + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-stopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() // start controllers if possible if controller, ok := s.ClientCA.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of client CA failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } if controller, ok := s.Cert.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of default serving certificate failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } for _, sniCert := range s.SNICerts { sniCert.AddListener(dynamicCertificateController) if controller, ok := sniCert.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of SNI serving certificate failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 45223e53cd8..dbbae01637c 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -244,14 +244,27 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg if err != nil { return nil, err } - if err := aggregatorProxyCerts.RunOnce(); err != nil { + // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the + // context is not used at all. So passing a empty context shouldn't be a problem + ctx := context.TODO() + if err := aggregatorProxyCerts.RunOnce(ctx); err != nil { return nil, err } aggregatorProxyCerts.AddListener(apiserviceRegistrationController) s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent - s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(context genericapiserver.PostStartHookContext) error { - go aggregatorProxyCerts.Run(1, context.StopCh) + s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error { + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-postStartHookContext.StopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() + go aggregatorProxyCerts.Run(ctx, 1) return nil }) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 3f607d74c64..fdc30b96c35 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -847,7 +847,8 @@ func TestProxyCertReload(t *testing.T) { if err != nil { t.Fatalf("Unable to create dynamic certificates: %v", err) } - err = certProvider.RunOnce() + ctx := context.TODO() + err = certProvider.RunOnce(ctx) if err != nil { t.Fatalf("Unable to load dynamic certificates: %v", err) } @@ -886,7 +887,7 @@ func TestProxyCertReload(t *testing.T) { // STEP 3: swap the certificate used by the aggregator to auth against the backend server and verify the request passes // note that this step uses the certificate that can be validated by the backend server with clientCaCrt() writeCerts(certFile, keyFile, clientCert(), clientKey(), t) - err = certProvider.RunOnce() + err = certProvider.RunOnce(ctx) if err != nil { t.Fatalf("Expected no error when refreshing dynamic certs, got %v", err) } From 72a62f47f72d765ce4cd15780c77db0693a89a2b Mon Sep 17 00:00:00 2001 From: Ravi Gudimetla Date: Mon, 7 Mar 2022 09:23:52 -0500 Subject: [PATCH 2/2] Wire context for cert controllers All the controllers should use context for signalling termination of communication with API server. Once kcm cancels context all the cert controllers which are started via kcm should cancel the APIServer request in flight instead of hanging around. --- .../app/certificates.go | 14 +++++------ .../certificates/approver/sarapprove.go | 10 ++++---- .../certificates/approver/sarapprove_test.go | 4 ++- .../certificates/certificate_controller.go | 25 ++++++++++--------- .../certificate_controller_test.go | 7 +++--- .../certificates/cleaner/cleaner.go | 14 +++++------ .../certificates/cleaner/cleaner_test.go | 5 ++-- .../rootcacertpublisher/publisher.go | 24 +++++++++--------- .../rootcacertpublisher/publisher_test.go | 9 ++++--- pkg/controller/certificates/signer/signer.go | 12 ++++----- .../certificates/signer/signer_test.go | 4 ++- .../certificates/controller_approval_test.go | 8 +++--- .../integration/certificates/duration_test.go | 9 ++----- 13 files changed, 73 insertions(+), 72 deletions(-) diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go index ca3e6c9f1a9..8508f003e92 100644 --- a/cmd/kube-controller-manager/app/certificates.go +++ b/cmd/kube-controller-manager/app/certificates.go @@ -52,7 +52,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err) } - go kubeletServingSigner.Run(5, ctx.Done()) + go kubeletServingSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving") } @@ -62,7 +62,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err) } - go kubeletClientSigner.Run(5, ctx.Done()) + go kubeletClientSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet") } @@ -72,7 +72,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err) } - go kubeAPIServerClientSigner.Run(5, ctx.Done()) + go kubeAPIServerClientSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client") } @@ -82,7 +82,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err) } - go legacyUnknownSigner.Run(5, ctx.Done()) + go legacyUnknownSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown") } @@ -153,7 +153,7 @@ func startCSRApprovingController(ctx context.Context, controllerContext Controll controllerContext.ClientBuilder.ClientOrDie("certificate-controller"), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go approver.Run(5, ctx.Done()) + go approver.Run(ctx, 5) return nil, true, nil } @@ -163,7 +163,7 @@ func startCSRCleanerController(ctx context.Context, controllerContext Controller controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go cleaner.Run(1, ctx.Done()) + go cleaner.Run(ctx, 1) return nil, true, nil } @@ -189,6 +189,6 @@ func startRootCACertPublisher(ctx context.Context, controllerContext ControllerC if err != nil { return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err) } - go sac.Run(1, ctx.Done()) + go sac.Run(ctx, 1) return nil, true, nil } diff --git a/pkg/controller/certificates/approver/sarapprove.go b/pkg/controller/certificates/approver/sarapprove.go index 397a75f5689..d739fc783b3 100644 --- a/pkg/controller/certificates/approver/sarapprove.go +++ b/pkg/controller/certificates/approver/sarapprove.go @@ -75,7 +75,7 @@ func recognizers() []csrRecognizer { return recognizers } -func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error { +func (a *sarApprover) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error { if len(csr.Status.Certificate) != 0 { return nil } @@ -96,13 +96,13 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error { tried = append(tried, r.permission.Subresource) - approved, err := a.authorize(csr, r.permission) + approved, err := a.authorize(ctx, csr, r.permission) if err != nil { return err } if approved { appendApprovalCondition(csr, r.successMessage) - _, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(context.Background(), csr.Name, csr, metav1.UpdateOptions{}) + _, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error updating approval for csr: %v", err) } @@ -117,7 +117,7 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error { return nil } -func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) { +func (a *sarApprover) authorize(ctx context.Context, csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) { extra := make(map[string]authorization.ExtraValue) for k, v := range csr.Spec.Extra { extra[k] = authorization.ExtraValue(v) @@ -132,7 +132,7 @@ func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs auth ResourceAttributes: &rattrs, }, } - sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), sar, metav1.CreateOptions{}) + sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{}) if err != nil { return false, err } diff --git a/pkg/controller/certificates/approver/sarapprove_test.go b/pkg/controller/certificates/approver/sarapprove_test.go index e3c02bc0923..9bbbcf114b1 100644 --- a/pkg/controller/certificates/approver/sarapprove_test.go +++ b/pkg/controller/certificates/approver/sarapprove_test.go @@ -17,6 +17,7 @@ limitations under the License. package approver import ( + "context" "crypto/ecdsa" "crypto/elliptic" "crypto/x509" @@ -130,7 +131,8 @@ func TestHandle(t *testing.T) { }, } csr := makeTestCsr() - if err := approver.handle(csr); err != nil && !c.err { + ctx := context.TODO() + if err := approver.handle(ctx, csr); err != nil && !c.err { t.Errorf("unexpected err: %v", err) } c.verify(t, client.Actions()) diff --git a/pkg/controller/certificates/certificate_controller.go b/pkg/controller/certificates/certificate_controller.go index e1fbd420142..7bb8b2fdb1a 100644 --- a/pkg/controller/certificates/certificate_controller.go +++ b/pkg/controller/certificates/certificate_controller.go @@ -19,6 +19,7 @@ limitations under the License. package certificates import ( + "context" "fmt" "time" @@ -48,7 +49,7 @@ type CertificateController struct { csrLister certificateslisters.CertificateSigningRequestLister csrsSynced cache.InformerSynced - handler func(*certificates.CertificateSigningRequest) error + handler func(context.Context, *certificates.CertificateSigningRequest) error queue workqueue.RateLimitingInterface } @@ -57,7 +58,7 @@ func NewCertificateController( name string, kubeClient clientset.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, - handler func(*certificates.CertificateSigningRequest) error, + handler func(context.Context, *certificates.CertificateSigningRequest) error, ) *CertificateController { // Send events to the apiserver eventBroadcaster := record.NewBroadcaster() @@ -111,39 +112,39 @@ func NewCertificateController( } // Run the main goroutine responsible for watching and syncing jobs. -func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) { +func (cc *CertificateController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer cc.queue.ShutDown() klog.Infof("Starting certificate controller %q", cc.name) defer klog.Infof("Shutting down certificate controller %q", cc.name) - if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), stopCh, cc.csrsSynced) { + if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), ctx.Done(), cc.csrsSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(cc.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, cc.worker, time.Second) } - <-stopCh + <-ctx.Done() } // worker runs a thread that dequeues CSRs, handles them, and marks them done. -func (cc *CertificateController) worker() { - for cc.processNextWorkItem() { +func (cc *CertificateController) worker(ctx context.Context) { + for cc.processNextWorkItem(ctx) { } } // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. -func (cc *CertificateController) processNextWorkItem() bool { +func (cc *CertificateController) processNextWorkItem(ctx context.Context) bool { cKey, quit := cc.queue.Get() if quit { return false } defer cc.queue.Done(cKey) - if err := cc.syncFunc(cKey.(string)); err != nil { + if err := cc.syncFunc(ctx, cKey.(string)); err != nil { cc.queue.AddRateLimited(cKey) if _, ignorable := err.(ignorableError); !ignorable { utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err)) @@ -167,7 +168,7 @@ func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) { cc.queue.Add(key) } -func (cc *CertificateController) syncFunc(key string) error { +func (cc *CertificateController) syncFunc(ctx context.Context, key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Since(startTime)) @@ -188,7 +189,7 @@ func (cc *CertificateController) syncFunc(key string) error { // need to operate on a copy so we don't mutate the csr in the shared cache csr = csr.DeepCopy() - return cc.handler(csr) + return cc.handler(ctx, csr) } // IgnorableError returns an error that we shouldn't handle (i.e. log) because diff --git a/pkg/controller/certificates/certificate_controller_test.go b/pkg/controller/certificates/certificate_controller_test.go index e8be06e334f..c9912e3ef2d 100644 --- a/pkg/controller/certificates/certificate_controller_test.go +++ b/pkg/controller/certificates/certificate_controller_test.go @@ -41,8 +41,7 @@ func TestCertificateController(t *testing.T) { client := fake.NewSimpleClientset(csr) informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(csr), controller.NoResyncPeriodFunc()) - - handler := func(csr *certificates.CertificateSigningRequest) error { + handler := func(ctx context.Context, csr *certificates.CertificateSigningRequest) error { csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{ Type: certificates.CertificateApproved, Reason: "test reason", @@ -70,8 +69,8 @@ func TestCertificateController(t *testing.T) { wait.PollUntil(10*time.Millisecond, func() (bool, error) { return controller.queue.Len() >= 1, nil }, stopCh) - - controller.processNextWorkItem() + ctx := context.TODO() + controller.processNextWorkItem(ctx) actions := client.Actions() if len(actions) != 1 { diff --git a/pkg/controller/certificates/cleaner/cleaner.go b/pkg/controller/certificates/cleaner/cleaner.go index 191c7974435..38956f82952 100644 --- a/pkg/controller/certificates/cleaner/cleaner.go +++ b/pkg/controller/certificates/cleaner/cleaner.go @@ -76,36 +76,36 @@ func NewCSRCleanerController( } // Run the main goroutine responsible for watching and syncing jobs. -func (ccc *CSRCleanerController) Run(workers int, stopCh <-chan struct{}) { +func (ccc *CSRCleanerController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() klog.Infof("Starting CSR cleaner controller") defer klog.Infof("Shutting down CSR cleaner controller") for i := 0; i < workers; i++ { - go wait.Until(ccc.worker, pollingInterval, stopCh) + go wait.UntilWithContext(ctx, ccc.worker, pollingInterval) } - <-stopCh + <-ctx.Done() } // worker runs a thread that dequeues CSRs, handles them, and marks them done. -func (ccc *CSRCleanerController) worker() { +func (ccc *CSRCleanerController) worker(ctx context.Context) { csrs, err := ccc.csrLister.List(labels.Everything()) if err != nil { klog.Errorf("Unable to list CSRs: %v", err) return } for _, csr := range csrs { - if err := ccc.handle(csr); err != nil { + if err := ccc.handle(ctx, csr); err != nil { klog.Errorf("Error while attempting to clean CSR %q: %v", csr.Name, err) } } } -func (ccc *CSRCleanerController) handle(csr *capi.CertificateSigningRequest) error { +func (ccc *CSRCleanerController) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error { if isIssuedPastDeadline(csr) || isDeniedPastDeadline(csr) || isFailedPastDeadline(csr) || isPendingPastDeadline(csr) || isIssuedExpired(csr) { - if err := ccc.csrClient.Delete(context.TODO(), csr.Name, metav1.DeleteOptions{}); err != nil { + if err := ccc.csrClient.Delete(ctx, csr.Name, metav1.DeleteOptions{}); err != nil { return fmt.Errorf("unable to delete CSR %q: %v", csr.Name, err) } } diff --git a/pkg/controller/certificates/cleaner/cleaner_test.go b/pkg/controller/certificates/cleaner/cleaner_test.go index 202aac94533..6faeeb7bdf0 100644 --- a/pkg/controller/certificates/cleaner/cleaner_test.go +++ b/pkg/controller/certificates/cleaner/cleaner_test.go @@ -17,6 +17,7 @@ limitations under the License. package cleaner import ( + "context" "testing" "time" @@ -225,8 +226,8 @@ func TestCleanerWithApprovedExpiredCSR(t *testing.T) { s := &CSRCleanerController{ csrClient: client.CertificatesV1().CertificateSigningRequests(), } - - err := s.handle(csr) + ctx := context.TODO() + err := s.handle(ctx, csr) if err != nil { t.Fatalf("failed to clean CSR: %v", err) } diff --git a/pkg/controller/certificates/rootcacertpublisher/publisher.go b/pkg/controller/certificates/rootcacertpublisher/publisher.go index a044779bdb7..cfd9f844c53 100644 --- a/pkg/controller/certificates/rootcacertpublisher/publisher.go +++ b/pkg/controller/certificates/rootcacertpublisher/publisher.go @@ -89,7 +89,7 @@ type Publisher struct { rootCA []byte // To allow injection for testing. - syncHandler func(key string) error + syncHandler func(ctx context.Context, key string) error cmLister corelisters.ConfigMapLister cmListerSynced cache.InformerSynced @@ -100,22 +100,22 @@ type Publisher struct { } // Run starts process -func (c *Publisher) Run(workers int, stopCh <-chan struct{}) { +func (c *Publisher) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Infof("Starting root CA certificate configmap publisher") defer klog.Infof("Shutting down root CA certificate configmap publisher") - if !cache.WaitForNamedCacheSync("crt configmap", stopCh, c.cmListerSynced) { + if !cache.WaitForNamedCacheSync("crt configmap", ctx.Done(), c.cmListerSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, c.runWorker, time.Second) } - <-stopCh + <-ctx.Done() } func (c *Publisher) configMapDeleted(obj interface{}) { @@ -155,21 +155,21 @@ func (c *Publisher) namespaceUpdated(oldObj interface{}, newObj interface{}) { c.queue.Add(newNamespace.Name) } -func (c *Publisher) runWorker() { - for c.processNextWorkItem() { +func (c *Publisher) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { } } // processNextWorkItem deals with one key off the queue. It returns false when // it's time to quit. -func (c *Publisher) processNextWorkItem() bool { +func (c *Publisher) processNextWorkItem(ctx context.Context) bool { key, quit := c.queue.Get() if quit { return false } defer c.queue.Done(key) - if err := c.syncHandler(key.(string)); err != nil { + if err := c.syncHandler(ctx, key.(string)); err != nil { utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err)) c.queue.AddRateLimited(key) return true @@ -179,7 +179,7 @@ func (c *Publisher) processNextWorkItem() bool { return true } -func (c *Publisher) syncNamespace(ns string) (err error) { +func (c *Publisher) syncNamespace(ctx context.Context, ns string) (err error) { startTime := time.Now() defer func() { recordMetrics(startTime, err) @@ -189,7 +189,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) { cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertConfigMapName) switch { case apierrors.IsNotFound(err): - _, err = c.client.CoreV1().ConfigMaps(ns).Create(context.TODO(), &v1.ConfigMap{ + _, err = c.client.CoreV1().ConfigMaps(ns).Create(ctx, &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: RootCACertConfigMapName, Annotations: map[string]string{DescriptionAnnotation: Description}, @@ -224,7 +224,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) { } cm.Annotations[DescriptionAnnotation] = Description - _, err = c.client.CoreV1().ConfigMaps(ns).Update(context.TODO(), cm, metav1.UpdateOptions{}) + _, err = c.client.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{}) return err } diff --git a/pkg/controller/certificates/rootcacertpublisher/publisher_test.go b/pkg/controller/certificates/rootcacertpublisher/publisher_test.go index c416894e595..cb5fb132e26 100644 --- a/pkg/controller/certificates/rootcacertpublisher/publisher_test.go +++ b/pkg/controller/certificates/rootcacertpublisher/publisher_test.go @@ -17,6 +17,7 @@ limitations under the License. package rootcacertpublisher import ( + "context" "reflect" "testing" @@ -154,9 +155,9 @@ func TestConfigMapCreation(t *testing.T) { cmStore.Add(tc.UpdatedConfigMap) controller.configMapUpdated(nil, tc.UpdatedConfigMap) } - + ctx := context.TODO() for controller.queue.Len() != 0 { - controller.processNextWorkItem() + controller.processNextWorkItem(ctx) } actions := client.Actions() @@ -263,8 +264,8 @@ func TestConfigMapUpdateNoHotLoop(t *testing.T) { cmListerSynced: func() bool { return true }, nsListerSynced: func() bool { return true }, } - - err := controller.syncNamespace("default") + ctx := context.TODO() + err := controller.syncNamespace(ctx, "default") if err != nil { t.Fatal(err) } diff --git a/pkg/controller/certificates/signer/signer.go b/pkg/controller/certificates/signer/signer.go index d4799a0fdd4..013f0b7cbf6 100644 --- a/pkg/controller/certificates/signer/signer.go +++ b/pkg/controller/certificates/signer/signer.go @@ -106,10 +106,10 @@ func NewCSRSigningController( } // Run the main goroutine responsible for watching and syncing jobs. -func (c *CSRSigningController) Run(workers int, stopCh <-chan struct{}) { - go c.dynamicCertReloader.Run(workers, stopCh) +func (c *CSRSigningController) Run(ctx context.Context, workers int) { + go c.dynamicCertReloader.Run(ctx, workers) - c.certificateController.Run(workers, stopCh) + c.certificateController.Run(ctx, workers) } type isRequestForSignerFunc func(req *x509.CertificateRequest, usages []capi.KeyUsage, signerName string) (bool, error) @@ -144,7 +144,7 @@ func newSigner(signerName, caFile, caKeyFile string, client clientset.Interface, return ret, nil } -func (s *signer) handle(csr *capi.CertificateSigningRequest) error { +func (s *signer) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error { // Ignore unapproved or failed requests if !certificates.IsCertificateRequestApproved(csr) || certificates.HasTrueCondition(csr, capi.CertificateFailed) { return nil @@ -167,7 +167,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error { Message: err.Error(), LastUpdateTime: metav1.Now(), }) - _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{}) + _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error adding failure condition for csr: %v", err) } @@ -181,7 +181,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error { return fmt.Errorf("error auto signing csr: %v", err) } csr.Status.Certificate = cert - _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{}) + _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error updating signature for csr: %v", err) } diff --git a/pkg/controller/certificates/signer/signer_test.go b/pkg/controller/certificates/signer/signer_test.go index af5ec3e546c..d590e060fb0 100644 --- a/pkg/controller/certificates/signer/signer_test.go +++ b/pkg/controller/certificates/signer/signer_test.go @@ -17,6 +17,7 @@ limitations under the License. package signer import ( + "context" "crypto/ecdsa" "crypto/elliptic" "crypto/x509" @@ -294,7 +295,8 @@ func TestHandle(t *testing.T) { } csr := makeTestCSR(csrBuilder{cn: c.commonName, signerName: c.signerName, approved: c.approved, failed: c.failed, usages: c.usages, org: c.org, dnsNames: c.dnsNames}) - if err := s.handle(csr); err != nil && !c.err { + ctx := context.TODO() + if err := s.handle(ctx, csr); err != nil && !c.err { t.Errorf("unexpected err: %v", err) } c.verify(t, client.Actions()) diff --git a/test/integration/certificates/controller_approval_test.go b/test/integration/certificates/controller_approval_test.go index 380adffea4b..7f00df42eeb 100644 --- a/test/integration/certificates/controller_approval_test.go +++ b/test/integration/certificates/controller_approval_test.go @@ -98,10 +98,10 @@ func TestController_AutoApproval(t *testing.T) { // Register the controller c := approver.NewCSRApprovingController(client, informers.Certificates().V1().CertificateSigningRequests()) // Start the controller & informers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go c.Run(1, stopCh) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + informers.Start(ctx.Done()) + go c.Run(ctx, 1) // Configure appropriate permissions if test.grantNodeClient { diff --git a/test/integration/certificates/duration_test.go b/test/integration/certificates/duration_test.go index 033d0d211b2..12543e0dedf 100644 --- a/test/integration/certificates/duration_test.go +++ b/test/integration/certificates/duration_test.go @@ -115,13 +115,8 @@ func TestCSRDuration(t *testing.T) { t.Fatal(err) } - stopCh := make(chan struct{}) - t.Cleanup(func() { - close(stopCh) - }) - - informerFactory.Start(stopCh) - go c.Run(1, stopCh) + informerFactory.Start(ctx.Done()) + go c.Run(ctx, 1) tests := []struct { name, csrName string