Wire context for cert controllers

All the controllers should use context for signalling termination of communication with API server. Once kcm cancels context all the cert controllers which are started via kcm should cancel the APIServer request in flight instead of hanging around.
This commit is contained in:
Ravi Gudimetla 2022-03-07 09:23:52 -05:00
parent 8b84a793b3
commit 72a62f47f7
13 changed files with 73 additions and 72 deletions

View File

@ -52,7 +52,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err)
}
go kubeletServingSigner.Run(5, ctx.Done())
go kubeletServingSigner.Run(ctx, 5)
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving")
}
@ -62,7 +62,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err)
}
go kubeletClientSigner.Run(5, ctx.Done())
go kubeletClientSigner.Run(ctx, 5)
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet")
}
@ -72,7 +72,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err)
}
go kubeAPIServerClientSigner.Run(5, ctx.Done())
go kubeAPIServerClientSigner.Run(ctx, 5)
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client")
}
@ -82,7 +82,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err)
}
go legacyUnknownSigner.Run(5, ctx.Done())
go legacyUnknownSigner.Run(ctx, 5)
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown")
}
@ -153,7 +153,7 @@ func startCSRApprovingController(ctx context.Context, controllerContext Controll
controllerContext.ClientBuilder.ClientOrDie("certificate-controller"),
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
)
go approver.Run(5, ctx.Done())
go approver.Run(ctx, 5)
return nil, true, nil
}
@ -163,7 +163,7 @@ func startCSRCleanerController(ctx context.Context, controllerContext Controller
controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(),
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
)
go cleaner.Run(1, ctx.Done())
go cleaner.Run(ctx, 1)
return nil, true, nil
}
@ -189,6 +189,6 @@ func startRootCACertPublisher(ctx context.Context, controllerContext ControllerC
if err != nil {
return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err)
}
go sac.Run(1, ctx.Done())
go sac.Run(ctx, 1)
return nil, true, nil
}

View File

@ -75,7 +75,7 @@ func recognizers() []csrRecognizer {
return recognizers
}
func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error {
func (a *sarApprover) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error {
if len(csr.Status.Certificate) != 0 {
return nil
}
@ -96,13 +96,13 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error {
tried = append(tried, r.permission.Subresource)
approved, err := a.authorize(csr, r.permission)
approved, err := a.authorize(ctx, csr, r.permission)
if err != nil {
return err
}
if approved {
appendApprovalCondition(csr, r.successMessage)
_, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(context.Background(), csr.Name, csr, metav1.UpdateOptions{})
_, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating approval for csr: %v", err)
}
@ -117,7 +117,7 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error {
return nil
}
func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) {
func (a *sarApprover) authorize(ctx context.Context, csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) {
extra := make(map[string]authorization.ExtraValue)
for k, v := range csr.Spec.Extra {
extra[k] = authorization.ExtraValue(v)
@ -132,7 +132,7 @@ func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs auth
ResourceAttributes: &rattrs,
},
}
sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), sar, metav1.CreateOptions{})
sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package approver
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/x509"
@ -130,7 +131,8 @@ func TestHandle(t *testing.T) {
},
}
csr := makeTestCsr()
if err := approver.handle(csr); err != nil && !c.err {
ctx := context.TODO()
if err := approver.handle(ctx, csr); err != nil && !c.err {
t.Errorf("unexpected err: %v", err)
}
c.verify(t, client.Actions())

View File

@ -19,6 +19,7 @@ limitations under the License.
package certificates
import (
"context"
"fmt"
"time"
@ -48,7 +49,7 @@ type CertificateController struct {
csrLister certificateslisters.CertificateSigningRequestLister
csrsSynced cache.InformerSynced
handler func(*certificates.CertificateSigningRequest) error
handler func(context.Context, *certificates.CertificateSigningRequest) error
queue workqueue.RateLimitingInterface
}
@ -57,7 +58,7 @@ func NewCertificateController(
name string,
kubeClient clientset.Interface,
csrInformer certificatesinformers.CertificateSigningRequestInformer,
handler func(*certificates.CertificateSigningRequest) error,
handler func(context.Context, *certificates.CertificateSigningRequest) error,
) *CertificateController {
// Send events to the apiserver
eventBroadcaster := record.NewBroadcaster()
@ -111,39 +112,39 @@ func NewCertificateController(
}
// Run the main goroutine responsible for watching and syncing jobs.
func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
func (cc *CertificateController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()
klog.Infof("Starting certificate controller %q", cc.name)
defer klog.Infof("Shutting down certificate controller %q", cc.name)
if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), stopCh, cc.csrsSynced) {
if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), ctx.Done(), cc.csrsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
go wait.UntilWithContext(ctx, cc.worker, time.Second)
}
<-stopCh
<-ctx.Done()
}
// worker runs a thread that dequeues CSRs, handles them, and marks them done.
func (cc *CertificateController) worker() {
for cc.processNextWorkItem() {
func (cc *CertificateController) worker(ctx context.Context) {
for cc.processNextWorkItem(ctx) {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (cc *CertificateController) processNextWorkItem() bool {
func (cc *CertificateController) processNextWorkItem(ctx context.Context) bool {
cKey, quit := cc.queue.Get()
if quit {
return false
}
defer cc.queue.Done(cKey)
if err := cc.syncFunc(cKey.(string)); err != nil {
if err := cc.syncFunc(ctx, cKey.(string)); err != nil {
cc.queue.AddRateLimited(cKey)
if _, ignorable := err.(ignorableError); !ignorable {
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
@ -167,7 +168,7 @@ func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
cc.queue.Add(key)
}
func (cc *CertificateController) syncFunc(key string) error {
func (cc *CertificateController) syncFunc(ctx context.Context, key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Since(startTime))
@ -188,7 +189,7 @@ func (cc *CertificateController) syncFunc(key string) error {
// need to operate on a copy so we don't mutate the csr in the shared cache
csr = csr.DeepCopy()
return cc.handler(csr)
return cc.handler(ctx, csr)
}
// IgnorableError returns an error that we shouldn't handle (i.e. log) because

View File

@ -41,8 +41,7 @@ func TestCertificateController(t *testing.T) {
client := fake.NewSimpleClientset(csr)
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(csr), controller.NoResyncPeriodFunc())
handler := func(csr *certificates.CertificateSigningRequest) error {
handler := func(ctx context.Context, csr *certificates.CertificateSigningRequest) error {
csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved,
Reason: "test reason",
@ -70,8 +69,8 @@ func TestCertificateController(t *testing.T) {
wait.PollUntil(10*time.Millisecond, func() (bool, error) {
return controller.queue.Len() >= 1, nil
}, stopCh)
controller.processNextWorkItem()
ctx := context.TODO()
controller.processNextWorkItem(ctx)
actions := client.Actions()
if len(actions) != 1 {

View File

@ -76,36 +76,36 @@ func NewCSRCleanerController(
}
// Run the main goroutine responsible for watching and syncing jobs.
func (ccc *CSRCleanerController) Run(workers int, stopCh <-chan struct{}) {
func (ccc *CSRCleanerController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
klog.Infof("Starting CSR cleaner controller")
defer klog.Infof("Shutting down CSR cleaner controller")
for i := 0; i < workers; i++ {
go wait.Until(ccc.worker, pollingInterval, stopCh)
go wait.UntilWithContext(ctx, ccc.worker, pollingInterval)
}
<-stopCh
<-ctx.Done()
}
// worker runs a thread that dequeues CSRs, handles them, and marks them done.
func (ccc *CSRCleanerController) worker() {
func (ccc *CSRCleanerController) worker(ctx context.Context) {
csrs, err := ccc.csrLister.List(labels.Everything())
if err != nil {
klog.Errorf("Unable to list CSRs: %v", err)
return
}
for _, csr := range csrs {
if err := ccc.handle(csr); err != nil {
if err := ccc.handle(ctx, csr); err != nil {
klog.Errorf("Error while attempting to clean CSR %q: %v", csr.Name, err)
}
}
}
func (ccc *CSRCleanerController) handle(csr *capi.CertificateSigningRequest) error {
func (ccc *CSRCleanerController) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error {
if isIssuedPastDeadline(csr) || isDeniedPastDeadline(csr) || isFailedPastDeadline(csr) || isPendingPastDeadline(csr) || isIssuedExpired(csr) {
if err := ccc.csrClient.Delete(context.TODO(), csr.Name, metav1.DeleteOptions{}); err != nil {
if err := ccc.csrClient.Delete(ctx, csr.Name, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("unable to delete CSR %q: %v", csr.Name, err)
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cleaner
import (
"context"
"testing"
"time"
@ -225,8 +226,8 @@ func TestCleanerWithApprovedExpiredCSR(t *testing.T) {
s := &CSRCleanerController{
csrClient: client.CertificatesV1().CertificateSigningRequests(),
}
err := s.handle(csr)
ctx := context.TODO()
err := s.handle(ctx, csr)
if err != nil {
t.Fatalf("failed to clean CSR: %v", err)
}

View File

@ -89,7 +89,7 @@ type Publisher struct {
rootCA []byte
// To allow injection for testing.
syncHandler func(key string) error
syncHandler func(ctx context.Context, key string) error
cmLister corelisters.ConfigMapLister
cmListerSynced cache.InformerSynced
@ -100,22 +100,22 @@ type Publisher struct {
}
// Run starts process
func (c *Publisher) Run(workers int, stopCh <-chan struct{}) {
func (c *Publisher) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("Starting root CA certificate configmap publisher")
defer klog.Infof("Shutting down root CA certificate configmap publisher")
if !cache.WaitForNamedCacheSync("crt configmap", stopCh, c.cmListerSynced) {
if !cache.WaitForNamedCacheSync("crt configmap", ctx.Done(), c.cmListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-stopCh
<-ctx.Done()
}
func (c *Publisher) configMapDeleted(obj interface{}) {
@ -155,21 +155,21 @@ func (c *Publisher) namespaceUpdated(oldObj interface{}, newObj interface{}) {
c.queue.Add(newNamespace.Name)
}
func (c *Publisher) runWorker() {
for c.processNextWorkItem() {
func (c *Publisher) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when
// it's time to quit.
func (c *Publisher) processNextWorkItem() bool {
func (c *Publisher) processNextWorkItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
if err := c.syncHandler(key.(string)); err != nil {
if err := c.syncHandler(ctx, key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err))
c.queue.AddRateLimited(key)
return true
@ -179,7 +179,7 @@ func (c *Publisher) processNextWorkItem() bool {
return true
}
func (c *Publisher) syncNamespace(ns string) (err error) {
func (c *Publisher) syncNamespace(ctx context.Context, ns string) (err error) {
startTime := time.Now()
defer func() {
recordMetrics(startTime, err)
@ -189,7 +189,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) {
cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertConfigMapName)
switch {
case apierrors.IsNotFound(err):
_, err = c.client.CoreV1().ConfigMaps(ns).Create(context.TODO(), &v1.ConfigMap{
_, err = c.client.CoreV1().ConfigMaps(ns).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: RootCACertConfigMapName,
Annotations: map[string]string{DescriptionAnnotation: Description},
@ -224,7 +224,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) {
}
cm.Annotations[DescriptionAnnotation] = Description
_, err = c.client.CoreV1().ConfigMaps(ns).Update(context.TODO(), cm, metav1.UpdateOptions{})
_, err = c.client.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{})
return err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package rootcacertpublisher
import (
"context"
"reflect"
"testing"
@ -154,9 +155,9 @@ func TestConfigMapCreation(t *testing.T) {
cmStore.Add(tc.UpdatedConfigMap)
controller.configMapUpdated(nil, tc.UpdatedConfigMap)
}
ctx := context.TODO()
for controller.queue.Len() != 0 {
controller.processNextWorkItem()
controller.processNextWorkItem(ctx)
}
actions := client.Actions()
@ -263,8 +264,8 @@ func TestConfigMapUpdateNoHotLoop(t *testing.T) {
cmListerSynced: func() bool { return true },
nsListerSynced: func() bool { return true },
}
err := controller.syncNamespace("default")
ctx := context.TODO()
err := controller.syncNamespace(ctx, "default")
if err != nil {
t.Fatal(err)
}

View File

@ -106,10 +106,10 @@ func NewCSRSigningController(
}
// Run the main goroutine responsible for watching and syncing jobs.
func (c *CSRSigningController) Run(workers int, stopCh <-chan struct{}) {
go c.dynamicCertReloader.Run(workers, stopCh)
func (c *CSRSigningController) Run(ctx context.Context, workers int) {
go c.dynamicCertReloader.Run(ctx, workers)
c.certificateController.Run(workers, stopCh)
c.certificateController.Run(ctx, workers)
}
type isRequestForSignerFunc func(req *x509.CertificateRequest, usages []capi.KeyUsage, signerName string) (bool, error)
@ -144,7 +144,7 @@ func newSigner(signerName, caFile, caKeyFile string, client clientset.Interface,
return ret, nil
}
func (s *signer) handle(csr *capi.CertificateSigningRequest) error {
func (s *signer) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error {
// Ignore unapproved or failed requests
if !certificates.IsCertificateRequestApproved(csr) || certificates.HasTrueCondition(csr, capi.CertificateFailed) {
return nil
@ -167,7 +167,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error {
Message: err.Error(),
LastUpdateTime: metav1.Now(),
})
_, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{})
_, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error adding failure condition for csr: %v", err)
}
@ -181,7 +181,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error {
return fmt.Errorf("error auto signing csr: %v", err)
}
csr.Status.Certificate = cert
_, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{})
_, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating signature for csr: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package signer
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/x509"
@ -294,7 +295,8 @@ func TestHandle(t *testing.T) {
}
csr := makeTestCSR(csrBuilder{cn: c.commonName, signerName: c.signerName, approved: c.approved, failed: c.failed, usages: c.usages, org: c.org, dnsNames: c.dnsNames})
if err := s.handle(csr); err != nil && !c.err {
ctx := context.TODO()
if err := s.handle(ctx, csr); err != nil && !c.err {
t.Errorf("unexpected err: %v", err)
}
c.verify(t, client.Actions())

View File

@ -98,10 +98,10 @@ func TestController_AutoApproval(t *testing.T) {
// Register the controller
c := approver.NewCSRApprovingController(client, informers.Certificates().V1().CertificateSigningRequests())
// Start the controller & informers
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
go c.Run(1, stopCh)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
informers.Start(ctx.Done())
go c.Run(ctx, 1)
// Configure appropriate permissions
if test.grantNodeClient {

View File

@ -115,13 +115,8 @@ func TestCSRDuration(t *testing.T) {
t.Fatal(err)
}
stopCh := make(chan struct{})
t.Cleanup(func() {
close(stopCh)
})
informerFactory.Start(stopCh)
go c.Run(1, stopCh)
informerFactory.Start(ctx.Done())
go c.Run(ctx, 1)
tests := []struct {
name, csrName string