diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go index ca3e6c9f1a9..8508f003e92 100644 --- a/cmd/kube-controller-manager/app/certificates.go +++ b/cmd/kube-controller-manager/app/certificates.go @@ -52,7 +52,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err) } - go kubeletServingSigner.Run(5, ctx.Done()) + go kubeletServingSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving") } @@ -62,7 +62,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err) } - go kubeletClientSigner.Run(5, ctx.Done()) + go kubeletClientSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet") } @@ -72,7 +72,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err) } - go kubeAPIServerClientSigner.Run(5, ctx.Done()) + go kubeAPIServerClientSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client") } @@ -82,7 +82,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err) } - go legacyUnknownSigner.Run(5, ctx.Done()) + go legacyUnknownSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown") } @@ -153,7 +153,7 @@ func startCSRApprovingController(ctx context.Context, controllerContext Controll controllerContext.ClientBuilder.ClientOrDie("certificate-controller"), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go approver.Run(5, ctx.Done()) + go approver.Run(ctx, 5) return nil, true, nil } @@ -163,7 +163,7 @@ func startCSRCleanerController(ctx context.Context, controllerContext Controller controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go cleaner.Run(1, ctx.Done()) + go cleaner.Run(ctx, 1) return nil, true, nil } @@ -189,6 +189,6 @@ func startRootCACertPublisher(ctx context.Context, controllerContext ControllerC if err != nil { return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err) } - go sac.Run(1, ctx.Done()) + go sac.Run(ctx, 1) return nil, true, nil } diff --git a/cmd/kubelet/app/auth.go b/cmd/kubelet/app/auth.go index 8b8b2e9047f..fcb8afe1946 100644 --- a/cmd/kubelet/app/auth.go +++ b/cmd/kubelet/app/auth.go @@ -17,6 +17,7 @@ limitations under the License. package app import ( + "context" "errors" "fmt" "reflect" @@ -95,8 +96,18 @@ func BuildAuthn(client authenticationclient.AuthenticationV1Interface, authn kub } return authenticator, func(stopCh <-chan struct{}) { + // generate a context from stopCh. This is to avoid modifying files which are relying on this method + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-stopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() if dynamicCAContentFromFile != nil { - go dynamicCAContentFromFile.Run(1, stopCh) + go dynamicCAContentFromFile.Run(ctx, 1) } }, err } diff --git a/pkg/controller/certificates/approver/sarapprove.go b/pkg/controller/certificates/approver/sarapprove.go index 397a75f5689..d739fc783b3 100644 --- a/pkg/controller/certificates/approver/sarapprove.go +++ b/pkg/controller/certificates/approver/sarapprove.go @@ -75,7 +75,7 @@ func recognizers() []csrRecognizer { return recognizers } -func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error { +func (a *sarApprover) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error { if len(csr.Status.Certificate) != 0 { return nil } @@ -96,13 +96,13 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error { tried = append(tried, r.permission.Subresource) - approved, err := a.authorize(csr, r.permission) + approved, err := a.authorize(ctx, csr, r.permission) if err != nil { return err } if approved { appendApprovalCondition(csr, r.successMessage) - _, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(context.Background(), csr.Name, csr, metav1.UpdateOptions{}) + _, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error updating approval for csr: %v", err) } @@ -117,7 +117,7 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error { return nil } -func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) { +func (a *sarApprover) authorize(ctx context.Context, csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) { extra := make(map[string]authorization.ExtraValue) for k, v := range csr.Spec.Extra { extra[k] = authorization.ExtraValue(v) @@ -132,7 +132,7 @@ func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs auth ResourceAttributes: &rattrs, }, } - sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), sar, metav1.CreateOptions{}) + sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{}) if err != nil { return false, err } diff --git a/pkg/controller/certificates/approver/sarapprove_test.go b/pkg/controller/certificates/approver/sarapprove_test.go index e3c02bc0923..9bbbcf114b1 100644 --- a/pkg/controller/certificates/approver/sarapprove_test.go +++ b/pkg/controller/certificates/approver/sarapprove_test.go @@ -17,6 +17,7 @@ limitations under the License. package approver import ( + "context" "crypto/ecdsa" "crypto/elliptic" "crypto/x509" @@ -130,7 +131,8 @@ func TestHandle(t *testing.T) { }, } csr := makeTestCsr() - if err := approver.handle(csr); err != nil && !c.err { + ctx := context.TODO() + if err := approver.handle(ctx, csr); err != nil && !c.err { t.Errorf("unexpected err: %v", err) } c.verify(t, client.Actions()) diff --git a/pkg/controller/certificates/certificate_controller.go b/pkg/controller/certificates/certificate_controller.go index e1fbd420142..7bb8b2fdb1a 100644 --- a/pkg/controller/certificates/certificate_controller.go +++ b/pkg/controller/certificates/certificate_controller.go @@ -19,6 +19,7 @@ limitations under the License. package certificates import ( + "context" "fmt" "time" @@ -48,7 +49,7 @@ type CertificateController struct { csrLister certificateslisters.CertificateSigningRequestLister csrsSynced cache.InformerSynced - handler func(*certificates.CertificateSigningRequest) error + handler func(context.Context, *certificates.CertificateSigningRequest) error queue workqueue.RateLimitingInterface } @@ -57,7 +58,7 @@ func NewCertificateController( name string, kubeClient clientset.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, - handler func(*certificates.CertificateSigningRequest) error, + handler func(context.Context, *certificates.CertificateSigningRequest) error, ) *CertificateController { // Send events to the apiserver eventBroadcaster := record.NewBroadcaster() @@ -111,39 +112,39 @@ func NewCertificateController( } // Run the main goroutine responsible for watching and syncing jobs. -func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) { +func (cc *CertificateController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer cc.queue.ShutDown() klog.Infof("Starting certificate controller %q", cc.name) defer klog.Infof("Shutting down certificate controller %q", cc.name) - if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), stopCh, cc.csrsSynced) { + if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), ctx.Done(), cc.csrsSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(cc.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, cc.worker, time.Second) } - <-stopCh + <-ctx.Done() } // worker runs a thread that dequeues CSRs, handles them, and marks them done. -func (cc *CertificateController) worker() { - for cc.processNextWorkItem() { +func (cc *CertificateController) worker(ctx context.Context) { + for cc.processNextWorkItem(ctx) { } } // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. -func (cc *CertificateController) processNextWorkItem() bool { +func (cc *CertificateController) processNextWorkItem(ctx context.Context) bool { cKey, quit := cc.queue.Get() if quit { return false } defer cc.queue.Done(cKey) - if err := cc.syncFunc(cKey.(string)); err != nil { + if err := cc.syncFunc(ctx, cKey.(string)); err != nil { cc.queue.AddRateLimited(cKey) if _, ignorable := err.(ignorableError); !ignorable { utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err)) @@ -167,7 +168,7 @@ func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) { cc.queue.Add(key) } -func (cc *CertificateController) syncFunc(key string) error { +func (cc *CertificateController) syncFunc(ctx context.Context, key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Since(startTime)) @@ -188,7 +189,7 @@ func (cc *CertificateController) syncFunc(key string) error { // need to operate on a copy so we don't mutate the csr in the shared cache csr = csr.DeepCopy() - return cc.handler(csr) + return cc.handler(ctx, csr) } // IgnorableError returns an error that we shouldn't handle (i.e. log) because diff --git a/pkg/controller/certificates/certificate_controller_test.go b/pkg/controller/certificates/certificate_controller_test.go index e8be06e334f..c9912e3ef2d 100644 --- a/pkg/controller/certificates/certificate_controller_test.go +++ b/pkg/controller/certificates/certificate_controller_test.go @@ -41,8 +41,7 @@ func TestCertificateController(t *testing.T) { client := fake.NewSimpleClientset(csr) informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(csr), controller.NoResyncPeriodFunc()) - - handler := func(csr *certificates.CertificateSigningRequest) error { + handler := func(ctx context.Context, csr *certificates.CertificateSigningRequest) error { csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{ Type: certificates.CertificateApproved, Reason: "test reason", @@ -70,8 +69,8 @@ func TestCertificateController(t *testing.T) { wait.PollUntil(10*time.Millisecond, func() (bool, error) { return controller.queue.Len() >= 1, nil }, stopCh) - - controller.processNextWorkItem() + ctx := context.TODO() + controller.processNextWorkItem(ctx) actions := client.Actions() if len(actions) != 1 { diff --git a/pkg/controller/certificates/cleaner/cleaner.go b/pkg/controller/certificates/cleaner/cleaner.go index 191c7974435..38956f82952 100644 --- a/pkg/controller/certificates/cleaner/cleaner.go +++ b/pkg/controller/certificates/cleaner/cleaner.go @@ -76,36 +76,36 @@ func NewCSRCleanerController( } // Run the main goroutine responsible for watching and syncing jobs. -func (ccc *CSRCleanerController) Run(workers int, stopCh <-chan struct{}) { +func (ccc *CSRCleanerController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() klog.Infof("Starting CSR cleaner controller") defer klog.Infof("Shutting down CSR cleaner controller") for i := 0; i < workers; i++ { - go wait.Until(ccc.worker, pollingInterval, stopCh) + go wait.UntilWithContext(ctx, ccc.worker, pollingInterval) } - <-stopCh + <-ctx.Done() } // worker runs a thread that dequeues CSRs, handles them, and marks them done. -func (ccc *CSRCleanerController) worker() { +func (ccc *CSRCleanerController) worker(ctx context.Context) { csrs, err := ccc.csrLister.List(labels.Everything()) if err != nil { klog.Errorf("Unable to list CSRs: %v", err) return } for _, csr := range csrs { - if err := ccc.handle(csr); err != nil { + if err := ccc.handle(ctx, csr); err != nil { klog.Errorf("Error while attempting to clean CSR %q: %v", csr.Name, err) } } } -func (ccc *CSRCleanerController) handle(csr *capi.CertificateSigningRequest) error { +func (ccc *CSRCleanerController) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error { if isIssuedPastDeadline(csr) || isDeniedPastDeadline(csr) || isFailedPastDeadline(csr) || isPendingPastDeadline(csr) || isIssuedExpired(csr) { - if err := ccc.csrClient.Delete(context.TODO(), csr.Name, metav1.DeleteOptions{}); err != nil { + if err := ccc.csrClient.Delete(ctx, csr.Name, metav1.DeleteOptions{}); err != nil { return fmt.Errorf("unable to delete CSR %q: %v", csr.Name, err) } } diff --git a/pkg/controller/certificates/cleaner/cleaner_test.go b/pkg/controller/certificates/cleaner/cleaner_test.go index 202aac94533..6faeeb7bdf0 100644 --- a/pkg/controller/certificates/cleaner/cleaner_test.go +++ b/pkg/controller/certificates/cleaner/cleaner_test.go @@ -17,6 +17,7 @@ limitations under the License. package cleaner import ( + "context" "testing" "time" @@ -225,8 +226,8 @@ func TestCleanerWithApprovedExpiredCSR(t *testing.T) { s := &CSRCleanerController{ csrClient: client.CertificatesV1().CertificateSigningRequests(), } - - err := s.handle(csr) + ctx := context.TODO() + err := s.handle(ctx, csr) if err != nil { t.Fatalf("failed to clean CSR: %v", err) } diff --git a/pkg/controller/certificates/rootcacertpublisher/publisher.go b/pkg/controller/certificates/rootcacertpublisher/publisher.go index a044779bdb7..cfd9f844c53 100644 --- a/pkg/controller/certificates/rootcacertpublisher/publisher.go +++ b/pkg/controller/certificates/rootcacertpublisher/publisher.go @@ -89,7 +89,7 @@ type Publisher struct { rootCA []byte // To allow injection for testing. - syncHandler func(key string) error + syncHandler func(ctx context.Context, key string) error cmLister corelisters.ConfigMapLister cmListerSynced cache.InformerSynced @@ -100,22 +100,22 @@ type Publisher struct { } // Run starts process -func (c *Publisher) Run(workers int, stopCh <-chan struct{}) { +func (c *Publisher) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Infof("Starting root CA certificate configmap publisher") defer klog.Infof("Shutting down root CA certificate configmap publisher") - if !cache.WaitForNamedCacheSync("crt configmap", stopCh, c.cmListerSynced) { + if !cache.WaitForNamedCacheSync("crt configmap", ctx.Done(), c.cmListerSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, c.runWorker, time.Second) } - <-stopCh + <-ctx.Done() } func (c *Publisher) configMapDeleted(obj interface{}) { @@ -155,21 +155,21 @@ func (c *Publisher) namespaceUpdated(oldObj interface{}, newObj interface{}) { c.queue.Add(newNamespace.Name) } -func (c *Publisher) runWorker() { - for c.processNextWorkItem() { +func (c *Publisher) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { } } // processNextWorkItem deals with one key off the queue. It returns false when // it's time to quit. -func (c *Publisher) processNextWorkItem() bool { +func (c *Publisher) processNextWorkItem(ctx context.Context) bool { key, quit := c.queue.Get() if quit { return false } defer c.queue.Done(key) - if err := c.syncHandler(key.(string)); err != nil { + if err := c.syncHandler(ctx, key.(string)); err != nil { utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err)) c.queue.AddRateLimited(key) return true @@ -179,7 +179,7 @@ func (c *Publisher) processNextWorkItem() bool { return true } -func (c *Publisher) syncNamespace(ns string) (err error) { +func (c *Publisher) syncNamespace(ctx context.Context, ns string) (err error) { startTime := time.Now() defer func() { recordMetrics(startTime, err) @@ -189,7 +189,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) { cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertConfigMapName) switch { case apierrors.IsNotFound(err): - _, err = c.client.CoreV1().ConfigMaps(ns).Create(context.TODO(), &v1.ConfigMap{ + _, err = c.client.CoreV1().ConfigMaps(ns).Create(ctx, &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: RootCACertConfigMapName, Annotations: map[string]string{DescriptionAnnotation: Description}, @@ -224,7 +224,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) { } cm.Annotations[DescriptionAnnotation] = Description - _, err = c.client.CoreV1().ConfigMaps(ns).Update(context.TODO(), cm, metav1.UpdateOptions{}) + _, err = c.client.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{}) return err } diff --git a/pkg/controller/certificates/rootcacertpublisher/publisher_test.go b/pkg/controller/certificates/rootcacertpublisher/publisher_test.go index c416894e595..cb5fb132e26 100644 --- a/pkg/controller/certificates/rootcacertpublisher/publisher_test.go +++ b/pkg/controller/certificates/rootcacertpublisher/publisher_test.go @@ -17,6 +17,7 @@ limitations under the License. package rootcacertpublisher import ( + "context" "reflect" "testing" @@ -154,9 +155,9 @@ func TestConfigMapCreation(t *testing.T) { cmStore.Add(tc.UpdatedConfigMap) controller.configMapUpdated(nil, tc.UpdatedConfigMap) } - + ctx := context.TODO() for controller.queue.Len() != 0 { - controller.processNextWorkItem() + controller.processNextWorkItem(ctx) } actions := client.Actions() @@ -263,8 +264,8 @@ func TestConfigMapUpdateNoHotLoop(t *testing.T) { cmListerSynced: func() bool { return true }, nsListerSynced: func() bool { return true }, } - - err := controller.syncNamespace("default") + ctx := context.TODO() + err := controller.syncNamespace(ctx, "default") if err != nil { t.Fatal(err) } diff --git a/pkg/controller/certificates/signer/signer.go b/pkg/controller/certificates/signer/signer.go index b3d726378ae..588912801c8 100644 --- a/pkg/controller/certificates/signer/signer.go +++ b/pkg/controller/certificates/signer/signer.go @@ -104,10 +104,10 @@ func NewCSRSigningController( } // Run the main goroutine responsible for watching and syncing jobs. -func (c *CSRSigningController) Run(workers int, stopCh <-chan struct{}) { - go c.dynamicCertReloader.Run(workers, stopCh) +func (c *CSRSigningController) Run(ctx context.Context, workers int) { + go c.dynamicCertReloader.Run(ctx, workers) - c.certificateController.Run(workers, stopCh) + c.certificateController.Run(ctx, workers) } type isRequestForSignerFunc func(req *x509.CertificateRequest, usages []capi.KeyUsage, signerName string) (bool, error) @@ -142,7 +142,7 @@ func newSigner(signerName, caFile, caKeyFile string, client clientset.Interface, return ret, nil } -func (s *signer) handle(csr *capi.CertificateSigningRequest) error { +func (s *signer) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error { // Ignore unapproved or failed requests if !certificates.IsCertificateRequestApproved(csr) || certificates.HasTrueCondition(csr, capi.CertificateFailed) { return nil @@ -165,7 +165,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error { Message: err.Error(), LastUpdateTime: metav1.Now(), }) - _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{}) + _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error adding failure condition for csr: %v", err) } @@ -179,7 +179,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error { return fmt.Errorf("error auto signing csr: %v", err) } csr.Status.Certificate = cert - _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{}) + _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error updating signature for csr: %v", err) } diff --git a/pkg/controller/certificates/signer/signer_test.go b/pkg/controller/certificates/signer/signer_test.go index 557a44173ed..02bea53ae7a 100644 --- a/pkg/controller/certificates/signer/signer_test.go +++ b/pkg/controller/certificates/signer/signer_test.go @@ -17,6 +17,7 @@ limitations under the License. package signer import ( + "context" "crypto/ecdsa" "crypto/elliptic" "crypto/x509" @@ -291,7 +292,8 @@ func TestHandle(t *testing.T) { } csr := makeTestCSR(csrBuilder{cn: c.commonName, signerName: c.signerName, approved: c.approved, failed: c.failed, usages: c.usages, org: c.org, dnsNames: c.dnsNames}) - if err := s.handle(csr); err != nil && !c.err { + ctx := context.TODO() + if err := s.handle(ctx, csr); err != nil && !c.err { t.Errorf("unexpected err: %v", err) } c.verify(t, client.Actions()) diff --git a/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go b/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go index 630474d6bd6..dcfb67d3887 100644 --- a/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go +++ b/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go @@ -432,7 +432,7 @@ func (c *Controller) Enqueue() { } // Run the controller until stopped. -func (c *Controller) Run(workers int, stopCh <-chan struct{}) { +func (c *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() // make sure the work queue is shutdown which will trigger workers to end defer c.queue.ShutDown() @@ -441,25 +441,25 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) { defer klog.Infof("Shutting down cluster_authentication_trust_controller controller") // we have a personal informer that is narrowly scoped, start it. - go c.kubeSystemConfigMapInformer.Run(stopCh) + go c.kubeSystemConfigMapInformer.Run(ctx.Done()) // wait for your secondary caches to fill before starting your work - if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", stopCh, c.preRunCaches...) { + if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", ctx.Done(), c.preRunCaches...) { return } // only run one worker - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // checks are cheap. run once a minute just to be sure we stay in sync in case fsnotify fails again // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. _ = wait.PollImmediateUntil(1*time.Minute, func() (bool, error) { c.queue.Add(keyFn()) return false, nil - }, stopCh) + }, ctx.Done()) // wait until we're told to stop - <-stopCh + <-ctx.Done() } func (c *Controller) runWorker() { diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 5456849e42a..d82dbd5a278 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -437,29 +437,40 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) } controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient) + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-hookContext.StopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() + // prime values and start listeners if m.ClusterAuthenticationInfo.ClientCA != nil { m.ClusterAuthenticationInfo.ClientCA.AddListener(controller) if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok { // runonce to be sure that we have a value. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { runtime.HandleError(err) } - go controller.Run(1, hookContext.StopCh) + go controller.Run(ctx, 1) } } if m.ClusterAuthenticationInfo.RequestHeaderCA != nil { m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller) if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok { // runonce to be sure that we have a value. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { runtime.HandleError(err) } - go controller.Run(1, hookContext.StopCh) + go controller.Run(ctx, 1) } } - go controller.Run(1, hookContext.StopCh) + go controller.Run(ctx, 1) return nil }) diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go index 561b6fba9ba..d8c4090b12a 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller.go @@ -162,29 +162,29 @@ func (c *RequestHeaderAuthRequestController) AllowedClientNames() []string { } // Run starts RequestHeaderAuthRequestController controller and blocks until stopCh is closed. -func (c *RequestHeaderAuthRequestController) Run(workers int, stopCh <-chan struct{}) { +func (c *RequestHeaderAuthRequestController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Infof("Starting %s", c.name) defer klog.Infof("Shutting down %s", c.name) - go c.configmapInformer.Run(stopCh) + go c.configmapInformer.Run(ctx.Done()) // wait for caches to fill before starting your work - if !cache.WaitForNamedCacheSync(c.name, stopCh, c.configmapInformerSynced) { + if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.configmapInformerSynced) { return } // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) - <-stopCh + <-ctx.Done() } // // RunOnce runs a single sync loop -func (c *RequestHeaderAuthRequestController) RunOnce() error { - configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(context.TODO(), c.configmapName, metav1.GetOptions{}) +func (c *RequestHeaderAuthRequestController) RunOnce(ctx context.Context) error { + configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(ctx, c.configmapName, metav1.GetOptions{}) switch { case errors.IsNotFound(err): // ignore, authConfigMap is nil now diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go index 2577d2635b2..36dfbf1ec29 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/request/headerrequest/requestheader_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package headerrequest import ( + "context" "encoding/json" "k8s.io/apimachinery/pkg/api/equality" "testing" @@ -221,7 +222,8 @@ func TestRequestHeaderAuthRequestControllerSyncOnce(t *testing.T) { target.client = fakeKubeClient // act - err := target.RunOnce() + ctx := context.TODO() + err := target.RunOnce(ctx) if err != nil && !scenario.expectErr { t.Errorf("got unexpected error %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go index b09474bc4fe..428fd66bae7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/configmap_cafile_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "fmt" "sync/atomic" @@ -186,7 +187,7 @@ func (c *ConfigMapCAController) hasCAChanged(caBundle []byte) bool { } // RunOnce runs a single sync loop -func (c *ConfigMapCAController) RunOnce() error { +func (c *ConfigMapCAController) RunOnce(ctx context.Context) error { // Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for // a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures. _ = c.loadCABundle() @@ -194,7 +195,7 @@ func (c *ConfigMapCAController) RunOnce() error { } // Run starts the kube-apiserver and blocks until stopCh is closed. -func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) { +func (c *ConfigMapCAController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -202,23 +203,23 @@ func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // we have a personal informer that is narrowly scoped, start it. - go c.configMapInformer.Run(stopCh) + go c.configMapInformer.Run(ctx.Done()) // wait for your secondary caches to fill before starting your work - if !cache.WaitForNamedCacheSync(c.name, stopCh, c.preRunCaches...) { + if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.preRunCaches...) { return } // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) { c.queue.Add(workItemKey) return false, nil - }, stopCh) + }, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *ConfigMapCAController) runWorker() { diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go index fb1515c182a..58761acd925 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_cafile_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "fmt" "io/ioutil" @@ -39,10 +40,10 @@ var FileRefreshDuration = 1 * time.Minute // ControllerRunner is a generic interface for starting a controller type ControllerRunner interface { // RunOnce runs the sync loop a single time. This useful for synchronous priming - RunOnce() error + RunOnce(ctx context.Context) error // Run should be called a go .Run - Run(workers int, stopCh <-chan struct{}) + Run(ctx context.Context, workers int) } // DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content @@ -144,12 +145,12 @@ func (c *DynamicFileCAContent) hasCAChanged(caBundle []byte) bool { } // RunOnce runs a single sync loop -func (c *DynamicFileCAContent) RunOnce() error { +func (c *DynamicFileCAContent) RunOnce(ctx context.Context) error { return c.loadCABundle() } // Run starts the controller and blocks until stopCh is closed. -func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { +func (c *DynamicFileCAContent) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -157,16 +158,16 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start the loop that watches the CA file until stopCh is closed. go wait.Until(func() { - if err := c.watchCAFile(stopCh); err != nil { + if err := c.watchCAFile(ctx.Done()); err != nil { klog.ErrorS(err, "Failed to watch CA file, will retry later") } - }, time.Minute, stopCh) + }, time.Minute, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error { diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go index 00117176b0a..9ff1abb6494 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamic_serving_content.go @@ -17,6 +17,7 @@ limitations under the License. package dynamiccertificates import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -119,12 +120,12 @@ func (c *DynamicCertKeyPairContent) loadCertKeyPair() error { } // RunOnce runs a single sync loop -func (c *DynamicCertKeyPairContent) RunOnce() error { +func (c *DynamicCertKeyPairContent) RunOnce(ctx context.Context) error { return c.loadCertKeyPair() } -// Run starts the controller and blocks until stopCh is closed. -func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { +// Run starts the controller and blocks until context is killed. +func (c *DynamicCertKeyPairContent) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -132,16 +133,16 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start the loop that watches the cert and key files until stopCh is closed. go wait.Until(func() { - if err := c.watchCertKeyFile(stopCh); err != nil { + if err := c.watchCertKeyFile(ctx.Done()); err != nil { klog.ErrorS(err, "Failed to watch cert and key file, will retry later") } - }, time.Minute, stopCh) + }, time.Minute, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error { diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go index e10b112bc07..57622bd34ec 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/union_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "strings" @@ -81,11 +82,11 @@ func (c unionCAContent) AddListener(listener Listener) { } // AddListener adds a listener to be notified when the CA content changes. -func (c unionCAContent) RunOnce() error { +func (c unionCAContent) RunOnce(ctx context.Context) error { errors := []error{} for _, curr := range c { if controller, ok := curr.(ControllerRunner); ok { - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { errors = append(errors, err) } } @@ -95,10 +96,10 @@ func (c unionCAContent) RunOnce() error { } // Run runs the controller -func (c unionCAContent) Run(workers int, stopCh <-chan struct{}) { +func (c unionCAContent) Run(ctx context.Context, workers int) { for _, curr := range c { if controller, ok := curr.(ControllerRunner); ok { - go controller.Run(workers, stopCh) + go controller.Run(ctx, workers) } } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go b/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go index a82b4a7391d..8ff771af080 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/authentication.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "strings" "time" @@ -387,7 +388,10 @@ func (s *DelegatingAuthenticationOptions) createRequestHeaderConfig(client kuber } // look up authentication configuration in the cluster and in case of an err defer to authentication-tolerate-lookup-failure flag - if err := dynamicRequestHeaderProvider.RunOnce(); err != nil { + // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the + // context is not used at all. So passing a empty context shouldn't be a problem + ctx := context.TODO() + if err := dynamicRequestHeaderProvider.RunOnce(ctx); err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go b/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go index e2beb5c2382..0dac3402187 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/authentication_dynamic_request_header.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "k8s.io/apimachinery/pkg/util/errors" @@ -64,15 +65,15 @@ func newDynamicRequestHeaderController(client kubernetes.Interface) (*DynamicReq }, nil } -func (c *DynamicRequestHeaderController) RunOnce() error { +func (c *DynamicRequestHeaderController) RunOnce(ctx context.Context) error { errs := []error{} - errs = append(errs, c.ConfigMapCAController.RunOnce()) - errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce()) + errs = append(errs, c.ConfigMapCAController.RunOnce(ctx)) + errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce(ctx)) return errors.NewAggregate(errs) } -func (c *DynamicRequestHeaderController) Run(workers int, stopCh <-chan struct{}) { - go c.ConfigMapCAController.Run(workers, stopCh) - go c.RequestHeaderAuthRequestController.Run(workers, stopCh) - <-stopCh +func (c *DynamicRequestHeaderController) Run(ctx context.Context, workers int) { + go c.ConfigMapCAController.Run(ctx, workers) + go c.RequestHeaderAuthRequestController.Run(ctx, workers) + <-ctx.Done() } diff --git a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go index d4caa08d36a..64bcc87ebf1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go +++ b/staging/src/k8s.io/apiserver/pkg/server/secure_serving.go @@ -93,36 +93,45 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro if s.Cert != nil { s.Cert.AddListener(dynamicCertificateController) } - + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-stopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() // start controllers if possible if controller, ok := s.ClientCA.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of client CA failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } if controller, ok := s.Cert.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of default serving certificate failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } for _, sniCert := range s.SNICerts { sniCert.AddListener(dynamicCertificateController) if controller, ok := sniCert.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of SNI serving certificate failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 45223e53cd8..dbbae01637c 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -244,14 +244,27 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg if err != nil { return nil, err } - if err := aggregatorProxyCerts.RunOnce(); err != nil { + // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the + // context is not used at all. So passing a empty context shouldn't be a problem + ctx := context.TODO() + if err := aggregatorProxyCerts.RunOnce(ctx); err != nil { return nil, err } aggregatorProxyCerts.AddListener(apiserviceRegistrationController) s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent - s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(context genericapiserver.PostStartHookContext) error { - go aggregatorProxyCerts.Run(1, context.StopCh) + s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error { + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-postStartHookContext.StopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() + go aggregatorProxyCerts.Run(ctx, 1) return nil }) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 3f607d74c64..fdc30b96c35 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -847,7 +847,8 @@ func TestProxyCertReload(t *testing.T) { if err != nil { t.Fatalf("Unable to create dynamic certificates: %v", err) } - err = certProvider.RunOnce() + ctx := context.TODO() + err = certProvider.RunOnce(ctx) if err != nil { t.Fatalf("Unable to load dynamic certificates: %v", err) } @@ -886,7 +887,7 @@ func TestProxyCertReload(t *testing.T) { // STEP 3: swap the certificate used by the aggregator to auth against the backend server and verify the request passes // note that this step uses the certificate that can be validated by the backend server with clientCaCrt() writeCerts(certFile, keyFile, clientCert(), clientKey(), t) - err = certProvider.RunOnce() + err = certProvider.RunOnce(ctx) if err != nil { t.Fatalf("Expected no error when refreshing dynamic certs, got %v", err) } diff --git a/test/integration/certificates/controller_approval_test.go b/test/integration/certificates/controller_approval_test.go index 380adffea4b..7f00df42eeb 100644 --- a/test/integration/certificates/controller_approval_test.go +++ b/test/integration/certificates/controller_approval_test.go @@ -98,10 +98,10 @@ func TestController_AutoApproval(t *testing.T) { // Register the controller c := approver.NewCSRApprovingController(client, informers.Certificates().V1().CertificateSigningRequests()) // Start the controller & informers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go c.Run(1, stopCh) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + informers.Start(ctx.Done()) + go c.Run(ctx, 1) // Configure appropriate permissions if test.grantNodeClient { diff --git a/test/integration/certificates/duration_test.go b/test/integration/certificates/duration_test.go index 033d0d211b2..12543e0dedf 100644 --- a/test/integration/certificates/duration_test.go +++ b/test/integration/certificates/duration_test.go @@ -115,13 +115,8 @@ func TestCSRDuration(t *testing.T) { t.Fatal(err) } - stopCh := make(chan struct{}) - t.Cleanup(func() { - close(stopCh) - }) - - informerFactory.Start(stopCh) - go c.Run(1, stopCh) + informerFactory.Start(ctx.Done()) + go c.Run(ctx, 1) tests := []struct { name, csrName string