From 8b84a793b39fed2a62af0876b2eda461a68008c9 Mon Sep 17 00:00:00 2001 From: Ravi Gudimetla Date: Mon, 7 Mar 2022 09:20:45 -0500 Subject: [PATCH] API Server Changes This commit includes all the changes needed for APIServer. Instead of modifying the existing signatures for the methods which either generate or return stopChannel, we generate a context from the channel and use the generated context to be passed to the controllers which are started in APIServer. This ensures we don't have to touch APIServer dependencies. --- cmd/kubelet/app/auth.go | 13 ++++++++++- ...cluster_authentication_trust_controller.go | 12 +++++----- pkg/controlplane/instance.go | 21 +++++++++++++---- .../headerrequest/requestheader_controller.go | 14 +++++------ .../requestheader_controller_test.go | 4 +++- .../configmap_cafile_content.go | 15 ++++++------ .../dynamic_cafile_content.go | 17 +++++++------- .../dynamic_serving_content.go | 15 ++++++------ .../dynamiccertificates/union_content.go | 9 ++++---- .../pkg/server/options/authentication.go | 6 ++++- .../authentication_dynamic_request_header.go | 15 ++++++------ .../apiserver/pkg/server/secure_serving.go | 23 +++++++++++++------ .../pkg/apiserver/apiserver.go | 19 ++++++++++++--- .../pkg/apiserver/handler_proxy_test.go | 5 ++-- 14 files changed, 122 insertions(+), 66 deletions(-) diff --git a/cmd/kubelet/app/auth.go b/cmd/kubelet/app/auth.go index 8b8b2e9047f..fcb8afe1946 100644 --- a/cmd/kubelet/app/auth.go +++ b/cmd/kubelet/app/auth.go @@ -17,6 +17,7 @@ limitations under the License. package app import ( + "context" "errors" "fmt" "reflect" @@ -95,8 +96,18 @@ func BuildAuthn(client authenticationclient.AuthenticationV1Interface, authn kub } return authenticator, func(stopCh <-chan struct{}) { + // generate a context from stopCh. This is to avoid modifying files which are relying on this method + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-stopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() if dynamicCAContentFromFile != nil { - go dynamicCAContentFromFile.Run(1, stopCh) + go dynamicCAContentFromFile.Run(ctx, 1) } }, err } diff --git a/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go b/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go index 630474d6bd6..dcfb67d3887 100644 --- a/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go +++ b/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go @@ -432,7 +432,7 @@ func (c *Controller) Enqueue() { } // Run the controller until stopped. -func (c *Controller) Run(workers int, stopCh <-chan struct{}) { +func (c *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() // make sure the work queue is shutdown which will trigger workers to end defer c.queue.ShutDown() @@ -441,25 +441,25 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) { defer klog.Infof("Shutting down cluster_authentication_trust_controller controller") // we have a personal informer that is narrowly scoped, start it. - go c.kubeSystemConfigMapInformer.Run(stopCh) + go c.kubeSystemConfigMapInformer.Run(ctx.Done()) // wait for your secondary caches to fill before starting your work - if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", stopCh, c.preRunCaches...) { + if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", ctx.Done(), c.preRunCaches...) { return } // only run one worker - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // checks are cheap. run once a minute just to be sure we stay in sync in case fsnotify fails again // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. _ = wait.PollImmediateUntil(1*time.Minute, func() (bool, error) { c.queue.Add(keyFn()) return false, nil - }, stopCh) + }, ctx.Done()) // wait until we're told to stop - <-stopCh + <-ctx.Done() } func (c *Controller) runWorker() { diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index fddbf1bb604..6fcc4023747 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -452,29 +452,40 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) } controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient) + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-hookContext.StopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() + // prime values and start listeners if m.ClusterAuthenticationInfo.ClientCA != nil { m.ClusterAuthenticationInfo.ClientCA.AddListener(controller) if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok { // runonce to be sure that we have a value. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { runtime.HandleError(err) } - go controller.Run(1, hookContext.StopCh) + go controller.Run(ctx, 1) } } if m.ClusterAuthenticationInfo.RequestHeaderCA != nil { m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller) if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok { // runonce to be sure that we have a value. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { runtime.HandleError(err) } - go controller.Run(1, hookContext.StopCh) + go controller.Run(ctx, 1) } } - go controller.Run(1, hookContext.StopCh) + go controller.Run(ctx, 1) return nil }) diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go index 561b6fba9ba..d8c4090b12a 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go @@ -162,29 +162,29 @@ func (c *RequestHeaderAuthRequestController) AllowedClientNames() []string { } // Run starts RequestHeaderAuthRequestController controller and blocks until stopCh is closed. -func (c *RequestHeaderAuthRequestController) Run(workers int, stopCh <-chan struct{}) { +func (c *RequestHeaderAuthRequestController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Infof("Starting %s", c.name) defer klog.Infof("Shutting down %s", c.name) - go c.configmapInformer.Run(stopCh) + go c.configmapInformer.Run(ctx.Done()) // wait for caches to fill before starting your work - if !cache.WaitForNamedCacheSync(c.name, stopCh, c.configmapInformerSynced) { + if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.configmapInformerSynced) { return } // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) - <-stopCh + <-ctx.Done() } // // RunOnce runs a single sync loop -func (c *RequestHeaderAuthRequestController) RunOnce() error { - configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(context.TODO(), c.configmapName, metav1.GetOptions{}) +func (c *RequestHeaderAuthRequestController) RunOnce(ctx context.Context) error { + configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(ctx, c.configmapName, metav1.GetOptions{}) switch { case errors.IsNotFound(err): // ignore, authConfigMap is nil now diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go index 2577d2635b2..36dfbf1ec29 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package headerrequest import ( + "context" "encoding/json" "k8s.io/apimachinery/pkg/api/equality" "testing" @@ -221,7 +222,8 @@ func TestRequestHeaderAuthRequestControllerSyncOnce(t *testing.T) { target.client = fakeKubeClient // act - err := target.RunOnce() + ctx := context.TODO() + err := target.RunOnce(ctx) if err != nil && !scenario.expectErr { t.Errorf("got unexpected error %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go index b09474bc4fe..428fd66bae7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "fmt" "sync/atomic" @@ -186,7 +187,7 @@ func (c *ConfigMapCAController) hasCAChanged(caBundle []byte) bool { } // RunOnce runs a single sync loop -func (c *ConfigMapCAController) RunOnce() error { +func (c *ConfigMapCAController) RunOnce(ctx context.Context) error { // Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for // a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures. _ = c.loadCABundle() @@ -194,7 +195,7 @@ func (c *ConfigMapCAController) RunOnce() error { } // Run starts the kube-apiserver and blocks until stopCh is closed. -func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) { +func (c *ConfigMapCAController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -202,23 +203,23 @@ func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // we have a personal informer that is narrowly scoped, start it. - go c.configMapInformer.Run(stopCh) + go c.configMapInformer.Run(ctx.Done()) // wait for your secondary caches to fill before starting your work - if !cache.WaitForNamedCacheSync(c.name, stopCh, c.preRunCaches...) { + if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.preRunCaches...) { return } // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) { c.queue.Add(workItemKey) return false, nil - }, stopCh) + }, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *ConfigMapCAController) runWorker() { diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go index fb1515c182a..58761acd925 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "fmt" "io/ioutil" @@ -39,10 +40,10 @@ var FileRefreshDuration = 1 * time.Minute // ControllerRunner is a generic interface for starting a controller type ControllerRunner interface { // RunOnce runs the sync loop a single time. This useful for synchronous priming - RunOnce() error + RunOnce(ctx context.Context) error // Run should be called a go .Run - Run(workers int, stopCh <-chan struct{}) + Run(ctx context.Context, workers int) } // DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content @@ -144,12 +145,12 @@ func (c *DynamicFileCAContent) hasCAChanged(caBundle []byte) bool { } // RunOnce runs a single sync loop -func (c *DynamicFileCAContent) RunOnce() error { +func (c *DynamicFileCAContent) RunOnce(ctx context.Context) error { return c.loadCABundle() } // Run starts the controller and blocks until stopCh is closed. -func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { +func (c *DynamicFileCAContent) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -157,16 +158,16 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start the loop that watches the CA file until stopCh is closed. go wait.Until(func() { - if err := c.watchCAFile(stopCh); err != nil { + if err := c.watchCAFile(ctx.Done()); err != nil { klog.ErrorS(err, "Failed to watch CA file, will retry later") } - }, time.Minute, stopCh) + }, time.Minute, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error { diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go index 00117176b0a..9ff1abb6494 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go @@ -17,6 +17,7 @@ limitations under the License. package dynamiccertificates import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -119,12 +120,12 @@ func (c *DynamicCertKeyPairContent) loadCertKeyPair() error { } // RunOnce runs a single sync loop -func (c *DynamicCertKeyPairContent) RunOnce() error { +func (c *DynamicCertKeyPairContent) RunOnce(ctx context.Context) error { return c.loadCertKeyPair() } -// Run starts the controller and blocks until stopCh is closed. -func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { +// Run starts the controller and blocks until context is killed. +func (c *DynamicCertKeyPairContent) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -132,16 +133,16 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start the loop that watches the cert and key files until stopCh is closed. go wait.Until(func() { - if err := c.watchCertKeyFile(stopCh); err != nil { + if err := c.watchCertKeyFile(ctx.Done()); err != nil { klog.ErrorS(err, "Failed to watch cert and key file, will retry later") } - }, time.Minute, stopCh) + }, time.Minute, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error { diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go index e10b112bc07..57622bd34ec 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "strings" @@ -81,11 +82,11 @@ func (c unionCAContent) AddListener(listener Listener) { } // AddListener adds a listener to be notified when the CA content changes. -func (c unionCAContent) RunOnce() error { +func (c unionCAContent) RunOnce(ctx context.Context) error { errors := []error{} for _, curr := range c { if controller, ok := curr.(ControllerRunner); ok { - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { errors = append(errors, err) } } @@ -95,10 +96,10 @@ func (c unionCAContent) RunOnce() error { } // Run runs the controller -func (c unionCAContent) Run(workers int, stopCh <-chan struct{}) { +func (c unionCAContent) Run(ctx context.Context, workers int) { for _, curr := range c { if controller, ok := curr.(ControllerRunner); ok { - go controller.Run(workers, stopCh) + go controller.Run(ctx, workers) } } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go b/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go index a82b4a7391d..8ff771af080 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "strings" "time" @@ -387,7 +388,10 @@ func (s *DelegatingAuthenticationOptions) createRequestHeaderConfig(client kuber } // look up authentication configuration in the cluster and in case of an err defer to authentication-tolerate-lookup-failure flag - if err := dynamicRequestHeaderProvider.RunOnce(); err != nil { + // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the + // context is not used at all. So passing a empty context shouldn't be a problem + ctx := context.TODO() + if err := dynamicRequestHeaderProvider.RunOnce(ctx); err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go b/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go index e2beb5c2382..0dac3402187 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "k8s.io/apimachinery/pkg/util/errors" @@ -64,15 +65,15 @@ func newDynamicRequestHeaderController(client kubernetes.Interface) (*DynamicReq }, nil } -func (c *DynamicRequestHeaderController) RunOnce() error { +func (c *DynamicRequestHeaderController) RunOnce(ctx context.Context) error { errs := []error{} - errs = append(errs, c.ConfigMapCAController.RunOnce()) - errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce()) + errs = append(errs, c.ConfigMapCAController.RunOnce(ctx)) + errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce(ctx)) return errors.NewAggregate(errs) } -func (c *DynamicRequestHeaderController) Run(workers int, stopCh <-chan struct{}) { - go c.ConfigMapCAController.Run(workers, stopCh) - go c.RequestHeaderAuthRequestController.Run(workers, stopCh) - <-stopCh +func (c *DynamicRequestHeaderController) Run(ctx context.Context, workers int) { + go c.ConfigMapCAController.Run(ctx, workers) + go c.RequestHeaderAuthRequestController.Run(ctx, workers) + <-ctx.Done() } diff --git a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go index d4caa08d36a..64bcc87ebf1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go +++ b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go @@ -93,36 +93,45 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro if s.Cert != nil { s.Cert.AddListener(dynamicCertificateController) } - + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-stopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() // start controllers if possible if controller, ok := s.ClientCA.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of client CA failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } if controller, ok := s.Cert.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of default serving certificate failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } for _, sniCert := range s.SNICerts { sniCert.AddListener(dynamicCertificateController) if controller, ok := sniCert.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of SNI serving certificate failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 45223e53cd8..dbbae01637c 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -244,14 +244,27 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg if err != nil { return nil, err } - if err := aggregatorProxyCerts.RunOnce(); err != nil { + // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the + // context is not used at all. So passing a empty context shouldn't be a problem + ctx := context.TODO() + if err := aggregatorProxyCerts.RunOnce(ctx); err != nil { return nil, err } aggregatorProxyCerts.AddListener(apiserviceRegistrationController) s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent - s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(context genericapiserver.PostStartHookContext) error { - go aggregatorProxyCerts.Run(1, context.StopCh) + s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error { + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-postStartHookContext.StopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() + go aggregatorProxyCerts.Run(ctx, 1) return nil }) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 3f607d74c64..fdc30b96c35 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -847,7 +847,8 @@ func TestProxyCertReload(t *testing.T) { if err != nil { t.Fatalf("Unable to create dynamic certificates: %v", err) } - err = certProvider.RunOnce() + ctx := context.TODO() + err = certProvider.RunOnce(ctx) if err != nil { t.Fatalf("Unable to load dynamic certificates: %v", err) } @@ -886,7 +887,7 @@ func TestProxyCertReload(t *testing.T) { // STEP 3: swap the certificate used by the aggregator to auth against the backend server and verify the request passes // note that this step uses the certificate that can be validated by the backend server with clientCaCrt() writeCerts(certFile, keyFile, clientCert(), clientKey(), t) - err = certProvider.RunOnce() + err = certProvider.RunOnce(ctx) if err != nil { t.Fatalf("Expected no error when refreshing dynamic certs, got %v", err) }