Merge pull request #108191 from ravisantoshgudimetla/wire-cert-contexts

Wire cert contexts
This commit is contained in:
Kubernetes Prow Robot 2022-03-23 11:20:17 -07:00 committed by GitHub
commit 14e8db067e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 195 additions and 138 deletions

View File

@ -52,7 +52,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kubelet-serving certificate controller: %v", err)
}
go kubeletServingSigner.Run(5, ctx.Done())
go kubeletServingSigner.Run(ctx, 5)
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kubelet-serving")
}
@ -62,7 +62,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client-kubelet certificate controller: %v", err)
}
go kubeletClientSigner.Run(5, ctx.Done())
go kubeletClientSigner.Run(ctx, 5)
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client-kubelet")
}
@ -72,7 +72,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/kube-apiserver-client certificate controller: %v", err)
}
go kubeAPIServerClientSigner.Run(5, ctx.Done())
go kubeAPIServerClientSigner.Run(ctx, 5)
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/kube-apiserver-client")
}
@ -82,7 +82,7 @@ func startCSRSigningController(ctx context.Context, controllerContext Controller
if err != nil {
return nil, false, fmt.Errorf("failed to start kubernetes.io/legacy-unknown certificate controller: %v", err)
}
go legacyUnknownSigner.Run(5, ctx.Done())
go legacyUnknownSigner.Run(ctx, 5)
} else {
klog.V(2).Infof("skipping CSR signer controller %q because specific files were specified for other signers and not this one.", "kubernetes.io/legacy-unknown")
}
@ -153,7 +153,7 @@ func startCSRApprovingController(ctx context.Context, controllerContext Controll
controllerContext.ClientBuilder.ClientOrDie("certificate-controller"),
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
)
go approver.Run(5, ctx.Done())
go approver.Run(ctx, 5)
return nil, true, nil
}
@ -163,7 +163,7 @@ func startCSRCleanerController(ctx context.Context, controllerContext Controller
controllerContext.ClientBuilder.ClientOrDie("certificate-controller").CertificatesV1().CertificateSigningRequests(),
controllerContext.InformerFactory.Certificates().V1().CertificateSigningRequests(),
)
go cleaner.Run(1, ctx.Done())
go cleaner.Run(ctx, 1)
return nil, true, nil
}
@ -189,6 +189,6 @@ func startRootCACertPublisher(ctx context.Context, controllerContext ControllerC
if err != nil {
return nil, true, fmt.Errorf("error creating root CA certificate publisher: %v", err)
}
go sac.Run(1, ctx.Done())
go sac.Run(ctx, 1)
return nil, true, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package app
import (
"context"
"errors"
"fmt"
"reflect"
@ -95,8 +96,18 @@ func BuildAuthn(client authenticationclient.AuthenticationV1Interface, authn kub
}
return authenticator, func(stopCh <-chan struct{}) {
// generate a context from stopCh. This is to avoid modifying files which are relying on this method
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-stopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
if dynamicCAContentFromFile != nil {
go dynamicCAContentFromFile.Run(1, stopCh)
go dynamicCAContentFromFile.Run(ctx, 1)
}
}, err
}

View File

@ -75,7 +75,7 @@ func recognizers() []csrRecognizer {
return recognizers
}
func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error {
func (a *sarApprover) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error {
if len(csr.Status.Certificate) != 0 {
return nil
}
@ -96,13 +96,13 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error {
tried = append(tried, r.permission.Subresource)
approved, err := a.authorize(csr, r.permission)
approved, err := a.authorize(ctx, csr, r.permission)
if err != nil {
return err
}
if approved {
appendApprovalCondition(csr, r.successMessage)
_, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(context.Background(), csr.Name, csr, metav1.UpdateOptions{})
_, err = a.client.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating approval for csr: %v", err)
}
@ -117,7 +117,7 @@ func (a *sarApprover) handle(csr *capi.CertificateSigningRequest) error {
return nil
}
func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) {
func (a *sarApprover) authorize(ctx context.Context, csr *capi.CertificateSigningRequest, rattrs authorization.ResourceAttributes) (bool, error) {
extra := make(map[string]authorization.ExtraValue)
for k, v := range csr.Spec.Extra {
extra[k] = authorization.ExtraValue(v)
@ -132,7 +132,7 @@ func (a *sarApprover) authorize(csr *capi.CertificateSigningRequest, rattrs auth
ResourceAttributes: &rattrs,
},
}
sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), sar, metav1.CreateOptions{})
sar, err := a.client.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package approver
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/x509"
@ -130,7 +131,8 @@ func TestHandle(t *testing.T) {
},
}
csr := makeTestCsr()
if err := approver.handle(csr); err != nil && !c.err {
ctx := context.TODO()
if err := approver.handle(ctx, csr); err != nil && !c.err {
t.Errorf("unexpected err: %v", err)
}
c.verify(t, client.Actions())

View File

@ -19,6 +19,7 @@ limitations under the License.
package certificates
import (
"context"
"fmt"
"time"
@ -48,7 +49,7 @@ type CertificateController struct {
csrLister certificateslisters.CertificateSigningRequestLister
csrsSynced cache.InformerSynced
handler func(*certificates.CertificateSigningRequest) error
handler func(context.Context, *certificates.CertificateSigningRequest) error
queue workqueue.RateLimitingInterface
}
@ -57,7 +58,7 @@ func NewCertificateController(
name string,
kubeClient clientset.Interface,
csrInformer certificatesinformers.CertificateSigningRequestInformer,
handler func(*certificates.CertificateSigningRequest) error,
handler func(context.Context, *certificates.CertificateSigningRequest) error,
) *CertificateController {
// Send events to the apiserver
eventBroadcaster := record.NewBroadcaster()
@ -111,39 +112,39 @@ func NewCertificateController(
}
// Run the main goroutine responsible for watching and syncing jobs.
func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
func (cc *CertificateController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()
klog.Infof("Starting certificate controller %q", cc.name)
defer klog.Infof("Shutting down certificate controller %q", cc.name)
if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), stopCh, cc.csrsSynced) {
if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), ctx.Done(), cc.csrsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
go wait.UntilWithContext(ctx, cc.worker, time.Second)
}
<-stopCh
<-ctx.Done()
}
// worker runs a thread that dequeues CSRs, handles them, and marks them done.
func (cc *CertificateController) worker() {
for cc.processNextWorkItem() {
func (cc *CertificateController) worker(ctx context.Context) {
for cc.processNextWorkItem(ctx) {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (cc *CertificateController) processNextWorkItem() bool {
func (cc *CertificateController) processNextWorkItem(ctx context.Context) bool {
cKey, quit := cc.queue.Get()
if quit {
return false
}
defer cc.queue.Done(cKey)
if err := cc.syncFunc(cKey.(string)); err != nil {
if err := cc.syncFunc(ctx, cKey.(string)); err != nil {
cc.queue.AddRateLimited(cKey)
if _, ignorable := err.(ignorableError); !ignorable {
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
@ -167,7 +168,7 @@ func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
cc.queue.Add(key)
}
func (cc *CertificateController) syncFunc(key string) error {
func (cc *CertificateController) syncFunc(ctx context.Context, key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Since(startTime))
@ -188,7 +189,7 @@ func (cc *CertificateController) syncFunc(key string) error {
// need to operate on a copy so we don't mutate the csr in the shared cache
csr = csr.DeepCopy()
return cc.handler(csr)
return cc.handler(ctx, csr)
}
// IgnorableError returns an error that we shouldn't handle (i.e. log) because

View File

@ -41,8 +41,7 @@ func TestCertificateController(t *testing.T) {
client := fake.NewSimpleClientset(csr)
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(csr), controller.NoResyncPeriodFunc())
handler := func(csr *certificates.CertificateSigningRequest) error {
handler := func(ctx context.Context, csr *certificates.CertificateSigningRequest) error {
csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved,
Reason: "test reason",
@ -70,8 +69,8 @@ func TestCertificateController(t *testing.T) {
wait.PollUntil(10*time.Millisecond, func() (bool, error) {
return controller.queue.Len() >= 1, nil
}, stopCh)
controller.processNextWorkItem()
ctx := context.TODO()
controller.processNextWorkItem(ctx)
actions := client.Actions()
if len(actions) != 1 {

View File

@ -76,36 +76,36 @@ func NewCSRCleanerController(
}
// Run the main goroutine responsible for watching and syncing jobs.
func (ccc *CSRCleanerController) Run(workers int, stopCh <-chan struct{}) {
func (ccc *CSRCleanerController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
klog.Infof("Starting CSR cleaner controller")
defer klog.Infof("Shutting down CSR cleaner controller")
for i := 0; i < workers; i++ {
go wait.Until(ccc.worker, pollingInterval, stopCh)
go wait.UntilWithContext(ctx, ccc.worker, pollingInterval)
}
<-stopCh
<-ctx.Done()
}
// worker runs a thread that dequeues CSRs, handles them, and marks them done.
func (ccc *CSRCleanerController) worker() {
func (ccc *CSRCleanerController) worker(ctx context.Context) {
csrs, err := ccc.csrLister.List(labels.Everything())
if err != nil {
klog.Errorf("Unable to list CSRs: %v", err)
return
}
for _, csr := range csrs {
if err := ccc.handle(csr); err != nil {
if err := ccc.handle(ctx, csr); err != nil {
klog.Errorf("Error while attempting to clean CSR %q: %v", csr.Name, err)
}
}
}
func (ccc *CSRCleanerController) handle(csr *capi.CertificateSigningRequest) error {
func (ccc *CSRCleanerController) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error {
if isIssuedPastDeadline(csr) || isDeniedPastDeadline(csr) || isFailedPastDeadline(csr) || isPendingPastDeadline(csr) || isIssuedExpired(csr) {
if err := ccc.csrClient.Delete(context.TODO(), csr.Name, metav1.DeleteOptions{}); err != nil {
if err := ccc.csrClient.Delete(ctx, csr.Name, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("unable to delete CSR %q: %v", csr.Name, err)
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cleaner
import (
"context"
"testing"
"time"
@ -225,8 +226,8 @@ func TestCleanerWithApprovedExpiredCSR(t *testing.T) {
s := &CSRCleanerController{
csrClient: client.CertificatesV1().CertificateSigningRequests(),
}
err := s.handle(csr)
ctx := context.TODO()
err := s.handle(ctx, csr)
if err != nil {
t.Fatalf("failed to clean CSR: %v", err)
}

View File

@ -89,7 +89,7 @@ type Publisher struct {
rootCA []byte
// To allow injection for testing.
syncHandler func(key string) error
syncHandler func(ctx context.Context, key string) error
cmLister corelisters.ConfigMapLister
cmListerSynced cache.InformerSynced
@ -100,22 +100,22 @@ type Publisher struct {
}
// Run starts process
func (c *Publisher) Run(workers int, stopCh <-chan struct{}) {
func (c *Publisher) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("Starting root CA certificate configmap publisher")
defer klog.Infof("Shutting down root CA certificate configmap publisher")
if !cache.WaitForNamedCacheSync("crt configmap", stopCh, c.cmListerSynced) {
if !cache.WaitForNamedCacheSync("crt configmap", ctx.Done(), c.cmListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-stopCh
<-ctx.Done()
}
func (c *Publisher) configMapDeleted(obj interface{}) {
@ -155,21 +155,21 @@ func (c *Publisher) namespaceUpdated(oldObj interface{}, newObj interface{}) {
c.queue.Add(newNamespace.Name)
}
func (c *Publisher) runWorker() {
for c.processNextWorkItem() {
func (c *Publisher) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when
// it's time to quit.
func (c *Publisher) processNextWorkItem() bool {
func (c *Publisher) processNextWorkItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
if err := c.syncHandler(key.(string)); err != nil {
if err := c.syncHandler(ctx, key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err))
c.queue.AddRateLimited(key)
return true
@ -179,7 +179,7 @@ func (c *Publisher) processNextWorkItem() bool {
return true
}
func (c *Publisher) syncNamespace(ns string) (err error) {
func (c *Publisher) syncNamespace(ctx context.Context, ns string) (err error) {
startTime := time.Now()
defer func() {
recordMetrics(startTime, err)
@ -189,7 +189,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) {
cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertConfigMapName)
switch {
case apierrors.IsNotFound(err):
_, err = c.client.CoreV1().ConfigMaps(ns).Create(context.TODO(), &v1.ConfigMap{
_, err = c.client.CoreV1().ConfigMaps(ns).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: RootCACertConfigMapName,
Annotations: map[string]string{DescriptionAnnotation: Description},
@ -224,7 +224,7 @@ func (c *Publisher) syncNamespace(ns string) (err error) {
}
cm.Annotations[DescriptionAnnotation] = Description
_, err = c.client.CoreV1().ConfigMaps(ns).Update(context.TODO(), cm, metav1.UpdateOptions{})
_, err = c.client.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{})
return err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package rootcacertpublisher
import (
"context"
"reflect"
"testing"
@ -154,9 +155,9 @@ func TestConfigMapCreation(t *testing.T) {
cmStore.Add(tc.UpdatedConfigMap)
controller.configMapUpdated(nil, tc.UpdatedConfigMap)
}
ctx := context.TODO()
for controller.queue.Len() != 0 {
controller.processNextWorkItem()
controller.processNextWorkItem(ctx)
}
actions := client.Actions()
@ -263,8 +264,8 @@ func TestConfigMapUpdateNoHotLoop(t *testing.T) {
cmListerSynced: func() bool { return true },
nsListerSynced: func() bool { return true },
}
err := controller.syncNamespace("default")
ctx := context.TODO()
err := controller.syncNamespace(ctx, "default")
if err != nil {
t.Fatal(err)
}

View File

@ -104,10 +104,10 @@ func NewCSRSigningController(
}
// Run the main goroutine responsible for watching and syncing jobs.
func (c *CSRSigningController) Run(workers int, stopCh <-chan struct{}) {
go c.dynamicCertReloader.Run(workers, stopCh)
func (c *CSRSigningController) Run(ctx context.Context, workers int) {
go c.dynamicCertReloader.Run(ctx, workers)
c.certificateController.Run(workers, stopCh)
c.certificateController.Run(ctx, workers)
}
type isRequestForSignerFunc func(req *x509.CertificateRequest, usages []capi.KeyUsage, signerName string) (bool, error)
@ -142,7 +142,7 @@ func newSigner(signerName, caFile, caKeyFile string, client clientset.Interface,
return ret, nil
}
func (s *signer) handle(csr *capi.CertificateSigningRequest) error {
func (s *signer) handle(ctx context.Context, csr *capi.CertificateSigningRequest) error {
// Ignore unapproved or failed requests
if !certificates.IsCertificateRequestApproved(csr) || certificates.HasTrueCondition(csr, capi.CertificateFailed) {
return nil
@ -165,7 +165,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error {
Message: err.Error(),
LastUpdateTime: metav1.Now(),
})
_, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{})
_, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error adding failure condition for csr: %v", err)
}
@ -179,7 +179,7 @@ func (s *signer) handle(csr *capi.CertificateSigningRequest) error {
return fmt.Errorf("error auto signing csr: %v", err)
}
csr.Status.Certificate = cert
_, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{})
_, err = s.client.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating signature for csr: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package signer
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/x509"
@ -291,7 +292,8 @@ func TestHandle(t *testing.T) {
}
csr := makeTestCSR(csrBuilder{cn: c.commonName, signerName: c.signerName, approved: c.approved, failed: c.failed, usages: c.usages, org: c.org, dnsNames: c.dnsNames})
if err := s.handle(csr); err != nil && !c.err {
ctx := context.TODO()
if err := s.handle(ctx, csr); err != nil && !c.err {
t.Errorf("unexpected err: %v", err)
}
c.verify(t, client.Actions())

View File

@ -432,7 +432,7 @@ func (c *Controller) Enqueue() {
}
// Run the controller until stopped.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
func (c *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
@ -441,25 +441,25 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer klog.Infof("Shutting down cluster_authentication_trust_controller controller")
// we have a personal informer that is narrowly scoped, start it.
go c.kubeSystemConfigMapInformer.Run(stopCh)
go c.kubeSystemConfigMapInformer.Run(ctx.Done())
// wait for your secondary caches to fill before starting your work
if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", stopCh, c.preRunCaches...) {
if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", ctx.Done(), c.preRunCaches...) {
return
}
// only run one worker
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
// checks are cheap. run once a minute just to be sure we stay in sync in case fsnotify fails again
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
_ = wait.PollImmediateUntil(1*time.Minute, func() (bool, error) {
c.queue.Add(keyFn())
return false, nil
}, stopCh)
}, ctx.Done())
// wait until we're told to stop
<-stopCh
<-ctx.Done()
}
func (c *Controller) runWorker() {

View File

@ -437,29 +437,40 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
}
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-hookContext.StopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
// prime values and start listeners
if m.ClusterAuthenticationInfo.ClientCA != nil {
m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(1, hookContext.StopCh)
go controller.Run(ctx, 1)
}
}
if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(1, hookContext.StopCh)
go controller.Run(ctx, 1)
}
}
go controller.Run(1, hookContext.StopCh)
go controller.Run(ctx, 1)
return nil
})

View File

@ -162,29 +162,29 @@ func (c *RequestHeaderAuthRequestController) AllowedClientNames() []string {
}
// Run starts RequestHeaderAuthRequestController controller and blocks until stopCh is closed.
func (c *RequestHeaderAuthRequestController) Run(workers int, stopCh <-chan struct{}) {
func (c *RequestHeaderAuthRequestController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("Starting %s", c.name)
defer klog.Infof("Shutting down %s", c.name)
go c.configmapInformer.Run(stopCh)
go c.configmapInformer.Run(ctx.Done())
// wait for caches to fill before starting your work
if !cache.WaitForNamedCacheSync(c.name, stopCh, c.configmapInformerSynced) {
if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.configmapInformerSynced) {
return
}
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
<-stopCh
<-ctx.Done()
}
// // RunOnce runs a single sync loop
func (c *RequestHeaderAuthRequestController) RunOnce() error {
configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(context.TODO(), c.configmapName, metav1.GetOptions{})
func (c *RequestHeaderAuthRequestController) RunOnce(ctx context.Context) error {
configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(ctx, c.configmapName, metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
// ignore, authConfigMap is nil now

View File

@ -17,6 +17,7 @@ limitations under the License.
package headerrequest
import (
"context"
"encoding/json"
"k8s.io/apimachinery/pkg/api/equality"
"testing"
@ -221,7 +222,8 @@ func TestRequestHeaderAuthRequestControllerSyncOnce(t *testing.T) {
target.client = fakeKubeClient
// act
err := target.RunOnce()
ctx := context.TODO()
err := target.RunOnce(ctx)
if err != nil && !scenario.expectErr {
t.Errorf("got unexpected error %v", err)

View File

@ -18,6 +18,7 @@ package dynamiccertificates
import (
"bytes"
"context"
"crypto/x509"
"fmt"
"sync/atomic"
@ -186,7 +187,7 @@ func (c *ConfigMapCAController) hasCAChanged(caBundle []byte) bool {
}
// RunOnce runs a single sync loop
func (c *ConfigMapCAController) RunOnce() error {
func (c *ConfigMapCAController) RunOnce(ctx context.Context) error {
// Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for
// a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures.
_ = c.loadCABundle()
@ -194,7 +195,7 @@ func (c *ConfigMapCAController) RunOnce() error {
}
// Run starts the kube-apiserver and blocks until stopCh is closed.
func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) {
func (c *ConfigMapCAController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
@ -202,23 +203,23 @@ func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) {
defer klog.InfoS("Shutting down controller", "name", c.name)
// we have a personal informer that is narrowly scoped, start it.
go c.configMapInformer.Run(stopCh)
go c.configMapInformer.Run(ctx.Done())
// wait for your secondary caches to fill before starting your work
if !cache.WaitForNamedCacheSync(c.name, stopCh, c.preRunCaches...) {
if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.preRunCaches...) {
return
}
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) {
c.queue.Add(workItemKey)
return false, nil
}, stopCh)
}, ctx.Done())
<-stopCh
<-ctx.Done()
}
func (c *ConfigMapCAController) runWorker() {

View File

@ -18,6 +18,7 @@ package dynamiccertificates
import (
"bytes"
"context"
"crypto/x509"
"fmt"
"io/ioutil"
@ -39,10 +40,10 @@ var FileRefreshDuration = 1 * time.Minute
// ControllerRunner is a generic interface for starting a controller
type ControllerRunner interface {
// RunOnce runs the sync loop a single time. This useful for synchronous priming
RunOnce() error
RunOnce(ctx context.Context) error
// Run should be called a go .Run
Run(workers int, stopCh <-chan struct{})
Run(ctx context.Context, workers int)
}
// DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content
@ -144,12 +145,12 @@ func (c *DynamicFileCAContent) hasCAChanged(caBundle []byte) bool {
}
// RunOnce runs a single sync loop
func (c *DynamicFileCAContent) RunOnce() error {
func (c *DynamicFileCAContent) RunOnce(ctx context.Context) error {
return c.loadCABundle()
}
// Run starts the controller and blocks until stopCh is closed.
func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
func (c *DynamicFileCAContent) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
@ -157,16 +158,16 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
defer klog.InfoS("Shutting down controller", "name", c.name)
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
// start the loop that watches the CA file until stopCh is closed.
go wait.Until(func() {
if err := c.watchCAFile(stopCh); err != nil {
if err := c.watchCAFile(ctx.Done()); err != nil {
klog.ErrorS(err, "Failed to watch CA file, will retry later")
}
}, time.Minute, stopCh)
}, time.Minute, ctx.Done())
<-stopCh
<-ctx.Done()
}
func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error {

View File

@ -17,6 +17,7 @@ limitations under the License.
package dynamiccertificates
import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
@ -119,12 +120,12 @@ func (c *DynamicCertKeyPairContent) loadCertKeyPair() error {
}
// RunOnce runs a single sync loop
func (c *DynamicCertKeyPairContent) RunOnce() error {
func (c *DynamicCertKeyPairContent) RunOnce(ctx context.Context) error {
return c.loadCertKeyPair()
}
// Run starts the controller and blocks until stopCh is closed.
func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) {
// Run starts the controller and blocks until context is killed.
func (c *DynamicCertKeyPairContent) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
@ -132,16 +133,16 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) {
defer klog.InfoS("Shutting down controller", "name", c.name)
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
// start the loop that watches the cert and key files until stopCh is closed.
go wait.Until(func() {
if err := c.watchCertKeyFile(stopCh); err != nil {
if err := c.watchCertKeyFile(ctx.Done()); err != nil {
klog.ErrorS(err, "Failed to watch cert and key file, will retry later")
}
}, time.Minute, stopCh)
}, time.Minute, ctx.Done())
<-stopCh
<-ctx.Done()
}
func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error {

View File

@ -18,6 +18,7 @@ package dynamiccertificates
import (
"bytes"
"context"
"crypto/x509"
"strings"
@ -81,11 +82,11 @@ func (c unionCAContent) AddListener(listener Listener) {
}
// AddListener adds a listener to be notified when the CA content changes.
func (c unionCAContent) RunOnce() error {
func (c unionCAContent) RunOnce(ctx context.Context) error {
errors := []error{}
for _, curr := range c {
if controller, ok := curr.(ControllerRunner); ok {
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
errors = append(errors, err)
}
}
@ -95,10 +96,10 @@ func (c unionCAContent) RunOnce() error {
}
// Run runs the controller
func (c unionCAContent) Run(workers int, stopCh <-chan struct{}) {
func (c unionCAContent) Run(ctx context.Context, workers int) {
for _, curr := range c {
if controller, ok := curr.(ControllerRunner); ok {
go controller.Run(workers, stopCh)
go controller.Run(ctx, workers)
}
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package options
import (
"context"
"fmt"
"strings"
"time"
@ -387,7 +388,10 @@ func (s *DelegatingAuthenticationOptions) createRequestHeaderConfig(client kuber
}
// look up authentication configuration in the cluster and in case of an err defer to authentication-tolerate-lookup-failure flag
if err := dynamicRequestHeaderProvider.RunOnce(); err != nil {
// We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the
// context is not used at all. So passing a empty context shouldn't be a problem
ctx := context.TODO()
if err := dynamicRequestHeaderProvider.RunOnce(ctx); err != nil {
return nil, err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package options
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/errors"
@ -64,15 +65,15 @@ func newDynamicRequestHeaderController(client kubernetes.Interface) (*DynamicReq
}, nil
}
func (c *DynamicRequestHeaderController) RunOnce() error {
func (c *DynamicRequestHeaderController) RunOnce(ctx context.Context) error {
errs := []error{}
errs = append(errs, c.ConfigMapCAController.RunOnce())
errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce())
errs = append(errs, c.ConfigMapCAController.RunOnce(ctx))
errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce(ctx))
return errors.NewAggregate(errs)
}
func (c *DynamicRequestHeaderController) Run(workers int, stopCh <-chan struct{}) {
go c.ConfigMapCAController.Run(workers, stopCh)
go c.RequestHeaderAuthRequestController.Run(workers, stopCh)
<-stopCh
func (c *DynamicRequestHeaderController) Run(ctx context.Context, workers int) {
go c.ConfigMapCAController.Run(ctx, workers)
go c.RequestHeaderAuthRequestController.Run(ctx, workers)
<-ctx.Done()
}

View File

@ -93,36 +93,45 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro
if s.Cert != nil {
s.Cert.AddListener(dynamicCertificateController)
}
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-stopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
// start controllers if possible
if controller, ok := s.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
klog.Warningf("Initial population of client CA failed: %v", err)
}
go controller.Run(1, stopCh)
go controller.Run(ctx, 1)
}
if controller, ok := s.Cert.(dynamiccertificates.ControllerRunner); ok {
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
klog.Warningf("Initial population of default serving certificate failed: %v", err)
}
go controller.Run(1, stopCh)
go controller.Run(ctx, 1)
}
for _, sniCert := range s.SNICerts {
sniCert.AddListener(dynamicCertificateController)
if controller, ok := sniCert.(dynamiccertificates.ControllerRunner); ok {
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
klog.Warningf("Initial population of SNI serving certificate failed: %v", err)
}
go controller.Run(1, stopCh)
go controller.Run(ctx, 1)
}
}

View File

@ -244,14 +244,27 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
if err != nil {
return nil, err
}
if err := aggregatorProxyCerts.RunOnce(); err != nil {
// We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the
// context is not used at all. So passing a empty context shouldn't be a problem
ctx := context.TODO()
if err := aggregatorProxyCerts.RunOnce(ctx); err != nil {
return nil, err
}
aggregatorProxyCerts.AddListener(apiserviceRegistrationController)
s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent
s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(context genericapiserver.PostStartHookContext) error {
go aggregatorProxyCerts.Run(1, context.StopCh)
s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error {
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-postStartHookContext.StopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
go aggregatorProxyCerts.Run(ctx, 1)
return nil
})
}

View File

@ -847,7 +847,8 @@ func TestProxyCertReload(t *testing.T) {
if err != nil {
t.Fatalf("Unable to create dynamic certificates: %v", err)
}
err = certProvider.RunOnce()
ctx := context.TODO()
err = certProvider.RunOnce(ctx)
if err != nil {
t.Fatalf("Unable to load dynamic certificates: %v", err)
}
@ -886,7 +887,7 @@ func TestProxyCertReload(t *testing.T) {
// STEP 3: swap the certificate used by the aggregator to auth against the backend server and verify the request passes
// note that this step uses the certificate that can be validated by the backend server with clientCaCrt()
writeCerts(certFile, keyFile, clientCert(), clientKey(), t)
err = certProvider.RunOnce()
err = certProvider.RunOnce(ctx)
if err != nil {
t.Fatalf("Expected no error when refreshing dynamic certs, got %v", err)
}

View File

@ -98,10 +98,10 @@ func TestController_AutoApproval(t *testing.T) {
// Register the controller
c := approver.NewCSRApprovingController(client, informers.Certificates().V1().CertificateSigningRequests())
// Start the controller & informers
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
go c.Run(1, stopCh)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
informers.Start(ctx.Done())
go c.Run(ctx, 1)
// Configure appropriate permissions
if test.grantNodeClient {

View File

@ -115,13 +115,8 @@ func TestCSRDuration(t *testing.T) {
t.Fatal(err)
}
stopCh := make(chan struct{})
t.Cleanup(func() {
close(stopCh)
})
informerFactory.Start(stopCh)
go c.Run(1, stopCh)
informerFactory.Start(ctx.Done())
go c.Run(ctx, 1)
tests := []struct {
name, csrName string