diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go index ca3e6c9f1a9..8508f003e92 100644 --- a/cmd/kube-controller-manager/app/certificates.go +++ b/cmd/kube-controller-manager/app/certificates.go @@ -52,7 +52,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err) } - go kubeletServingSigner.Run(5, ctx.Done()) + go kubeletServingSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving") } @@ -62,7 +62,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err) } - go kubeletClientSigner.Run(5, ctx.Done()) + go kubeletClientSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet") } @@ -72,7 +72,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err) } - go kubeAPIServerClientSigner.Run(5, ctx.Done()) + go kubeAPIServerClientSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client") } @@ -82,7 +82,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller if err != nil { return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err) } - go legacyUnknownSigner.Run(5, ctx.Done()) + go legacyUnknownSigner.Run(ctx, 5) } else { klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown") } @@ -153,7 +153,7 @@ func startCSRApprovingController(ctx context.Context, controllerContext Controll controllerContext.ClientBuilder.ClientOrDie("certificate-controller"), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go approver.Run(5, ctx.Done()) + go approver.Run(ctx, 5) return nil, true, nil } @@ -163,7 +163,7 @@ func startCSRCleanerController(ctx context.Context, controllerContext Controller controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(), controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(), ) - go cleaner.Run(1, ctx.Done()) + go cleaner.Run(ctx, 1) return nil, true, nil } @@ -189,6 +189,6 @@ func startRootCACertPublisher(ctx context.Context, controllerContext ControllerC if err != nil { return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err) } - go sac.Run(1, ctx.Done()) + go sac.Run(ctx, 1) return nil, true, nil } diff --git a/pkg/controller/certificates/approver/sarapprove.go b/pkg/controller/certificates/approver/sarapprove.go index 397a75f5689..d739fc783b3 100644 --- a/pkg/controller/certificates/approver/sarapprove.go +++ b/pkg/controller/certificates/approver/sarapprove.go @@ -75,7 +75,7 @@ func recognizers() []csrRecognizer { return recognizers } -func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error { +func (a *sarApprover) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error { if len(csr.Status.Certificate) != 0 { return nil } @@ -96,13 +96,13 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error { tried = append(tried, r.permission.Subresource) - approved, err := a.authorize(csr, r.permission) + approved, err := a.authorize(ctx, csr, r.permission) if err != nil { return err } if approved { appendApprovalCondition(csr, r.successMessage) - _, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(context.Background(), csr.Name, csr, metav1.UpdateOptions{}) + _, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error updating approval for csr: %v", err) } @@ -117,7 +117,7 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error { return nil } -func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) { +func (a *sarApprover) authorize(ctx context.Context, csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) { extra := make(map[string]authorization.ExtraValue) for k, v := range csr.Spec.Extra { extra[k] = authorization.ExtraValue(v) @@ -132,7 +132,7 @@ func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs auth ResourceAttributes: &rattrs, }, } - sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), sar, metav1.CreateOptions{}) + sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{}) if err != nil { return false, err } diff --git a/pkg/controller/certificates/approver/sarapprove_test.go b/pkg/controller/certificates/approver/sarapprove_test.go index e3c02bc0923..9bbbcf114b1 100644 --- a/pkg/controller/certificates/approver/sarapprove_test.go +++ b/pkg/controller/certificates/approver/sarapprove_test.go @@ -17,6 +17,7 @@ limitations under the License. package approver import ( + "context" "crypto/ecdsa" "crypto/elliptic" "crypto/x509" @@ -130,7 +131,8 @@ func TestHandle(t *testing.T) { }, } csr := makeTestCsr() - if err := approver.handle(csr); err != nil && !c.err { + ctx := context.TODO() + if err := approver.handle(ctx, csr); err != nil && !c.err { t.Errorf("unexpected err: %v", err) } c.verify(t, client.Actions()) diff --git a/pkg/controller/certificates/certificate_controller.go b/pkg/controller/certificates/certificate_controller.go index e1fbd420142..7bb8b2fdb1a 100644 --- a/pkg/controller/certificates/certificate_controller.go +++ b/pkg/controller/certificates/certificate_controller.go @@ -19,6 +19,7 @@ limitations under the License. package certificates import ( + "context" "fmt" "time" @@ -48,7 +49,7 @@ type CertificateController struct { csrLister certificateslisters.CertificateSigningRequestLister csrsSynced cache.InformerSynced - handler func(*certificates.CertificateSigningRequest) error + handler func(context.Context, *certificates.CertificateSigningRequest) error queue workqueue.RateLimitingInterface } @@ -57,7 +58,7 @@ func NewCertificateController( name string, kubeClient clientset.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, - handler func(*certificates.CertificateSigningRequest) error, + handler func(context.Context, *certificates.CertificateSigningRequest) error, ) *CertificateController { // Send events to the apiserver eventBroadcaster := record.NewBroadcaster() @@ -111,39 +112,39 @@ func NewCertificateController( } // Run the main goroutine responsible for watching and syncing jobs. -func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) { +func (cc *CertificateController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer cc.queue.ShutDown() klog.Infof("Starting certificate controller %q", cc.name) defer klog.Infof("Shutting down certificate controller %q", cc.name) - if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), stopCh, cc.csrsSynced) { + if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), ctx.Done(), cc.csrsSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(cc.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, cc.worker, time.Second) } - <-stopCh + <-ctx.Done() } // worker runs a thread that dequeues CSRs, handles them, and marks them done. -func (cc *CertificateController) worker() { - for cc.processNextWorkItem() { +func (cc *CertificateController) worker(ctx context.Context) { + for cc.processNextWorkItem(ctx) { } } // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. -func (cc *CertificateController) processNextWorkItem() bool { +func (cc *CertificateController) processNextWorkItem(ctx context.Context) bool { cKey, quit := cc.queue.Get() if quit { return false } defer cc.queue.Done(cKey) - if err := cc.syncFunc(cKey.(string)); err != nil { + if err := cc.syncFunc(ctx, cKey.(string)); err != nil { cc.queue.AddRateLimited(cKey) if _, ignorable := err.(ignorableError); !ignorable { utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err)) @@ -167,7 +168,7 @@ func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) { cc.queue.Add(key) } -func (cc *CertificateController) syncFunc(key string) error { +func (cc *CertificateController) syncFunc(ctx context.Context, key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Since(startTime)) @@ -188,7 +189,7 @@ func (cc *CertificateController) syncFunc(key string) error { // need to operate on a copy so we don't mutate the csr in the shared cache csr = csr.DeepCopy() - return cc.handler(csr) + return cc.handler(ctx, csr) } // IgnorableError returns an error that we shouldn't handle (i.e. log) because diff --git a/pkg/controller/certificates/certificate_controller_test.go b/pkg/controller/certificates/certificate_controller_test.go index e8be06e334f..c9912e3ef2d 100644 --- a/pkg/controller/certificates/certificate_controller_test.go +++ b/pkg/controller/certificates/certificate_controller_test.go @@ -41,8 +41,7 @@ func TestCertificateController(t *testing.T) { client := fake.NewSimpleClientset(csr) informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(csr), controller.NoResyncPeriodFunc()) - - handler := func(csr *certificates.CertificateSigningRequest) error { + handler := func(ctx context.Context, csr *certificates.CertificateSigningRequest) error { csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{ Type: certificates.CertificateApproved, Reason: "test reason", @@ -70,8 +69,8 @@ func TestCertificateController(t *testing.T) { wait.PollUntil(10*time.Millisecond, func() (bool, error) { return controller.queue.Len() >= 1, nil }, stopCh) - - controller.processNextWorkItem() + ctx := context.TODO() + controller.processNextWorkItem(ctx) actions := client.Actions() if len(actions) != 1 { diff --git a/pkg/controller/certificates/cleaner/cleaner.go b/pkg/controller/certificates/cleaner/cleaner.go index 191c7974435..38956f82952 100644 --- a/pkg/controller/certificates/cleaner/cleaner.go +++ b/pkg/controller/certificates/cleaner/cleaner.go @@ -76,36 +76,36 @@ func NewCSRCleanerController( } // Run the main goroutine responsible for watching and syncing jobs. -func (ccc *CSRCleanerController) Run(workers int, stopCh <-chan struct{}) { +func (ccc *CSRCleanerController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() klog.Infof("Starting CSR cleaner controller") defer klog.Infof("Shutting down CSR cleaner controller") for i := 0; i < workers; i++ { - go wait.Until(ccc.worker, pollingInterval, stopCh) + go wait.UntilWithContext(ctx, ccc.worker, pollingInterval) } - <-stopCh + <-ctx.Done() } // worker runs a thread that dequeues CSRs, handles them, and marks them done. -func (ccc *CSRCleanerController) worker() { +func (ccc *CSRCleanerController) worker(ctx context.Context) { csrs, err := ccc.csrLister.List(labels.Everything()) if err != nil { klog.Errorf("Unable to list CSRs: %v", err) return } for _, csr := range csrs { - if err := ccc.handle(csr); err != nil { + if err := ccc.handle(ctx, csr); err != nil { klog.Errorf("Error while attempting to clean CSR %q: %v", csr.Name, err) } } } -func (ccc *CSRCleanerController) handle(csr *capi.CertificateSigningRequest) error { +func (ccc *CSRCleanerController) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error { if isIssuedPastDeadline(csr) || isDeniedPastDeadline(csr) || isFailedPastDeadline(csr) || isPendingPastDeadline(csr) || isIssuedExpired(csr) { - if err := ccc.csrClient.Delete(context.TODO(), csr.Name, metav1.DeleteOptions{}); err != nil { + if err := ccc.csrClient.Delete(ctx, csr.Name, metav1.DeleteOptions{}); err != nil { return fmt.Errorf("unable to delete CSR %q: %v", csr.Name, err) } } diff --git a/pkg/controller/certificates/cleaner/cleaner_test.go b/pkg/controller/certificates/cleaner/cleaner_test.go index 202aac94533..6faeeb7bdf0 100644 --- a/pkg/controller/certificates/cleaner/cleaner_test.go +++ b/pkg/controller/certificates/cleaner/cleaner_test.go @@ -17,6 +17,7 @@ limitations under the License. package cleaner import ( + "context" "testing" "time" @@ -225,8 +226,8 @@ func TestCleanerWithApprovedExpiredCSR(t *testing.T) { s := &CSRCleanerController{ csrClient: client.CertificatesV1().CertificateSigningRequests(), } - - err := s.handle(csr) + ctx := context.TODO() + err := s.handle(ctx, csr) if err != nil { t.Fatalf("failed to clean CSR: %v", err) } diff --git a/pkg/controller/certificates/rootcacertpublisher/publisher.go b/pkg/controller/certificates/rootcacertpublisher/publisher.go index a044779bdb7..cfd9f844c53 100644 --- a/pkg/controller/certificates/rootcacertpublisher/publisher.go +++ b/pkg/controller/certificates/rootcacertpublisher/publisher.go @@ -89,7 +89,7 @@ type Publisher struct { rootCA []byte // To allow injection for testing. - syncHandler func(key string) error + syncHandler func(ctx context.Context, key string) error cmLister corelisters.ConfigMapLister cmListerSynced cache.InformerSynced @@ -100,22 +100,22 @@ type Publisher struct { } // Run starts process -func (c *Publisher) Run(workers int, stopCh <-chan struct{}) { +func (c *Publisher) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Infof("Starting root CA certificate configmap publisher") defer klog.Infof("Shutting down root CA certificate configmap publisher") - if !cache.WaitForNamedCacheSync("crt configmap", stopCh, c.cmListerSynced) { + if !cache.WaitForNamedCacheSync("crt configmap", ctx.Done(), c.cmListerSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, c.runWorker, time.Second) } - <-stopCh + <-ctx.Done() } func (c *Publisher) configMapDeleted(obj interface{}) { @@ -155,21 +155,21 @@ func (c *Publisher) namespaceUpdated(oldObj interface{}, newObj interface{}) { c.queue.Add(newNamespace.Name) } -func (c *Publisher) runWorker() { - for c.processNextWorkItem() { +func (c *Publisher) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { } } // processNextWorkItem deals with one key off the queue. It returns false when // it's time to quit. -func (c *Publisher) processNextWorkItem() bool { +func (c *Publisher) processNextWorkItem(ctx context.Context) bool { key, quit := c.queue.Get() if quit { return false } defer c.queue.Done(key) - if err := c.syncHandler(key.(string)); err != nil { + if err := c.syncHandler(ctx, key.(string)); err != nil { utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err)) c.queue.AddRateLimited(key) return true @@ -179,7 +179,7 @@ func (c *Publisher) processNextWorkItem() bool { return true } -func (c *Publisher) syncNamespace(ns string) (err error) { +func (c *Publisher) syncNamespace(ctx context.Context, ns string) (err error) { startTime := time.Now() defer func() { recordMetrics(startTime, err) @@ -189,7 +189,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) { cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertConfigMapName) switch { case apierrors.IsNotFound(err): - _, err = c.client.CoreV1().ConfigMaps(ns).Create(context.TODO(), &v1.ConfigMap{ + _, err = c.client.CoreV1().ConfigMaps(ns).Create(ctx, &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: RootCACertConfigMapName, Annotations: map[string]string{DescriptionAnnotation: Description}, @@ -224,7 +224,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) { } cm.Annotations[DescriptionAnnotation] = Description - _, err = c.client.CoreV1().ConfigMaps(ns).Update(context.TODO(), cm, metav1.UpdateOptions{}) + _, err = c.client.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{}) return err } diff --git a/pkg/controller/certificates/rootcacertpublisher/publisher_test.go b/pkg/controller/certificates/rootcacertpublisher/publisher_test.go index c416894e595..cb5fb132e26 100644 --- a/pkg/controller/certificates/rootcacertpublisher/publisher_test.go +++ b/pkg/controller/certificates/rootcacertpublisher/publisher_test.go @@ -17,6 +17,7 @@ limitations under the License. package rootcacertpublisher import ( + "context" "reflect" "testing" @@ -154,9 +155,9 @@ func TestConfigMapCreation(t *testing.T) { cmStore.Add(tc.UpdatedConfigMap) controller.configMapUpdated(nil, tc.UpdatedConfigMap) } - + ctx := context.TODO() for controller.queue.Len() != 0 { - controller.processNextWorkItem() + controller.processNextWorkItem(ctx) } actions := client.Actions() @@ -263,8 +264,8 @@ func TestConfigMapUpdateNoHotLoop(t *testing.T) { cmListerSynced: func() bool { return true }, nsListerSynced: func() bool { return true }, } - - err := controller.syncNamespace("default") + ctx := context.TODO() + err := controller.syncNamespace(ctx, "default") if err != nil { t.Fatal(err) } diff --git a/pkg/controller/certificates/signer/signer.go b/pkg/controller/certificates/signer/signer.go index d4799a0fdd4..013f0b7cbf6 100644 --- a/pkg/controller/certificates/signer/signer.go +++ b/pkg/controller/certificates/signer/signer.go @@ -106,10 +106,10 @@ func NewCSRSigningController( } // Run the main goroutine responsible for watching and syncing jobs. -func (c *CSRSigningController) Run(workers int, stopCh <-chan struct{}) { - go c.dynamicCertReloader.Run(workers, stopCh) +func (c *CSRSigningController) Run(ctx context.Context, workers int) { + go c.dynamicCertReloader.Run(ctx, workers) - c.certificateController.Run(workers, stopCh) + c.certificateController.Run(ctx, workers) } type isRequestForSignerFunc func(req *x509.CertificateRequest, usages []capi.KeyUsage, signerName string) (bool, error) @@ -144,7 +144,7 @@ func newSigner(signerName, caFile, caKeyFile string, client clientset.Interface, return ret, nil } -func (s *signer) handle(csr *capi.CertificateSigningRequest) error { +func (s *signer) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error { // Ignore unapproved or failed requests if !certificates.IsCertificateRequestApproved(csr) || certificates.HasTrueCondition(csr, capi.CertificateFailed) { return nil @@ -167,7 +167,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error { Message: err.Error(), LastUpdateTime: metav1.Now(), }) - _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{}) + _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error adding failure condition for csr: %v", err) } @@ -181,7 +181,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error { return fmt.Errorf("error auto signing csr: %v", err) } csr.Status.Certificate = cert - _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{}) + _, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error updating signature for csr: %v", err) } diff --git a/pkg/controller/certificates/signer/signer_test.go b/pkg/controller/certificates/signer/signer_test.go index af5ec3e546c..d590e060fb0 100644 --- a/pkg/controller/certificates/signer/signer_test.go +++ b/pkg/controller/certificates/signer/signer_test.go @@ -17,6 +17,7 @@ limitations under the License. package signer import ( + "context" "crypto/ecdsa" "crypto/elliptic" "crypto/x509" @@ -294,7 +295,8 @@ func TestHandle(t *testing.T) { } csr := makeTestCSR(csrBuilder{cn: c.commonName, signerName: c.signerName, approved: c.approved, failed: c.failed, usages: c.usages, org: c.org, dnsNames: c.dnsNames}) - if err := s.handle(csr); err != nil && !c.err { + ctx := context.TODO() + if err := s.handle(ctx, csr); err != nil && !c.err { t.Errorf("unexpected err: %v", err) } c.verify(t, client.Actions()) diff --git a/test/integration/certificates/controller_approval_test.go b/test/integration/certificates/controller_approval_test.go index 380adffea4b..7f00df42eeb 100644 --- a/test/integration/certificates/controller_approval_test.go +++ b/test/integration/certificates/controller_approval_test.go @@ -98,10 +98,10 @@ func TestController_AutoApproval(t *testing.T) { // Register the controller c := approver.NewCSRApprovingController(client, informers.Certificates().V1().CertificateSigningRequests()) // Start the controller & informers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go c.Run(1, stopCh) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + informers.Start(ctx.Done()) + go c.Run(ctx, 1) // Configure appropriate permissions if test.grantNodeClient { diff --git a/test/integration/certificates/duration_test.go b/test/integration/certificates/duration_test.go index 033d0d211b2..12543e0dedf 100644 --- a/test/integration/certificates/duration_test.go +++ b/test/integration/certificates/duration_test.go @@ -115,13 +115,8 @@ func TestCSRDuration(t *testing.T) { t.Fatal(err) } - stopCh := make(chan struct{}) - t.Cleanup(func() { - close(stopCh) - }) - - informerFactory.Start(stopCh) - go c.Run(1, stopCh) + informerFactory.Start(ctx.Done()) + go c.Run(ctx, 1) tests := []struct { name, csrName string