From e5fc73a4f186a83d0e72a5401e28067c4cb477b0 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Tue, 7 Feb 2017 13:21:08 -0500 Subject: [PATCH] Switch CSR controller to use shared informer --- .../app/certificates.go | 3 +- pkg/controller/certificates/BUILD | 17 +- .../certificates/certificate_controller.go | 112 ++++----- .../certificate_controller_test.go | 221 ++++++++++++++++++ 4 files changed, 293 insertions(+), 60 deletions(-) create mode 100644 pkg/controller/certificates/certificate_controller_test.go diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go index 58d7f265304..c377af0795b 100644 --- a/cmd/kube-controller-manager/app/certificates.go +++ b/cmd/kube-controller-manager/app/certificates.go @@ -31,11 +31,10 @@ func startCSRController(ctx ControllerContext) (bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] { return false, nil } - resyncPeriod := ResyncPeriod(&ctx.Options)() c := ctx.ClientBuilder.ClientOrDie("certificate-controller") certController, err := certcontroller.NewCertificateController( c, - resyncPeriod, + ctx.NewInformerFactory.Certificates().V1beta1().CertificateSigningRequests(), ctx.Options.ClusterSigningCertFile, ctx.Options.ClusterSigningKeyFile, certcontroller.NewGroupApprover(ctx.Options.ApproveAllKubeletCSRsForGroup), diff --git a/pkg/controller/certificates/BUILD b/pkg/controller/certificates/BUILD index 6113f7815b8..70a92e06bff 100644 --- a/pkg/controller/certificates/BUILD +++ b/pkg/controller/certificates/BUILD @@ -21,19 +21,19 @@ go_library( deps = [ "//pkg/apis/certificates/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/informers/informers_generated/certificates/v1beta1:go_default_library", + "//pkg/client/listers/certificates/v1beta1:go_default_library", "//pkg/controller:go_default_library", "//vendor:github.com/cloudflare/cfssl/config", "//vendor:github.com/cloudflare/cfssl/helpers", "//vendor:github.com/cloudflare/cfssl/signer", "//vendor:github.com/cloudflare/cfssl/signer/local", "//vendor:github.com/golang/glog", - "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", + "//vendor:k8s.io/client-go/pkg/api", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record", "//vendor:k8s.io/client-go/util/workqueue", @@ -56,6 +56,7 @@ filegroup( go_test( name = "go_default_test", srcs = [ + "certificate_controller_test.go", "cfssl_signer_test.go", "groupapprove_test.go", ], @@ -68,6 +69,14 @@ go_test( tags = ["automanaged"], deps = [ "//pkg/apis/certificates/v1beta1:go_default_library", + "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/informers/informers_generated:go_default_library", + "//pkg/controller:go_default_library", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/wait", + "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/util/cert", + "//vendor:k8s.io/client-go/util/cert/triple", ], ) diff --git a/pkg/controller/certificates/certificate_controller.go b/pkg/controller/certificates/certificate_controller.go index f30e3f98faf..ca921331788 100644 --- a/pkg/controller/certificates/certificate_controller.go +++ b/pkg/controller/certificates/certificate_controller.go @@ -20,18 +20,18 @@ import ( "fmt" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/api/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/pkg/api" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" + certificatesinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/certificates/v1beta1" + certificateslisters "k8s.io/kubernetes/pkg/client/listers/certificates/v1beta1" "k8s.io/kubernetes/pkg/controller" "github.com/golang/glog" @@ -52,9 +52,8 @@ type Signer interface { type CertificateController struct { kubeClient clientset.Interface - // CSR framework and store - csrController cache.Controller - csrStore listers.StoreToCertificateRequestLister + csrLister certificateslisters.CertificateSigningRequestLister + csrsSynced cache.InformerSynced syncHandler func(csrKey string) error @@ -64,7 +63,7 @@ type CertificateController struct { queue workqueue.RateLimitingInterface } -func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Duration, caCertFile, caKeyFile string, approver AutoApprover) (*CertificateController, error) { +func NewCertificateController(kubeClient clientset.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, caCertFile, caKeyFile string, approver AutoApprover) (*CertificateController, error) { // Send events to the apiserver eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) @@ -83,47 +82,37 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du } // Manage the addition/update of certificate requests - cc.csrStore.Store, cc.csrController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return cc.kubeClient.Certificates().CertificateSigningRequests().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return cc.kubeClient.Certificates().CertificateSigningRequests().Watch(options) - }, + csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + csr := obj.(*certificates.CertificateSigningRequest) + glog.V(4).Infof("Adding certificate request %s", csr.Name) + cc.enqueueCertificateRequest(obj) }, - &certificates.CertificateSigningRequest{}, - syncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - csr := obj.(*certificates.CertificateSigningRequest) - glog.V(4).Infof("Adding certificate request %s", csr.Name) - cc.enqueueCertificateRequest(obj) - }, - UpdateFunc: func(old, new interface{}) { - oldCSR := old.(*certificates.CertificateSigningRequest) - glog.V(4).Infof("Updating certificate request %s", oldCSR.Name) - cc.enqueueCertificateRequest(new) - }, - DeleteFunc: func(obj interface{}) { - csr, ok := obj.(*certificates.CertificateSigningRequest) + UpdateFunc: func(old, new interface{}) { + oldCSR := old.(*certificates.CertificateSigningRequest) + glog.V(4).Infof("Updating certificate request %s", oldCSR.Name) + cc.enqueueCertificateRequest(new) + }, + DeleteFunc: func(obj interface{}) { + csr, ok := obj.(*certificates.CertificateSigningRequest) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.V(2).Infof("Couldn't get object from tombstone %#v", obj) - return - } - csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest) - if !ok { - glog.V(2).Infof("Tombstone contained object that is not a CSR: %#v", obj) - return - } + glog.V(2).Infof("Couldn't get object from tombstone %#v", obj) + return } - glog.V(4).Infof("Deleting certificate request %s", csr.Name) - cc.enqueueCertificateRequest(obj) - }, + csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest) + if !ok { + glog.V(2).Infof("Tombstone contained object that is not a CSR: %#v", obj) + return + } + } + glog.V(4).Infof("Deleting certificate request %s", csr.Name) + cc.enqueueCertificateRequest(obj) }, - ) + }) + cc.csrLister = csrInformer.Lister() + cc.csrsSynced = csrInformer.Informer().HasSynced cc.syncHandler = cc.maybeSignCertificate return cc, nil } @@ -133,9 +122,13 @@ func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer cc.queue.ShutDown() - go cc.csrController.Run(stopCh) - glog.Infof("Starting certificate controller manager") + + if !cache.WaitForCacheSync(stopCh, cc.csrsSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + for i := 0; i < workers; i++ { go wait.Until(cc.worker, time.Second, stopCh) } @@ -186,15 +179,26 @@ func (cc *CertificateController) maybeSignCertificate(key string) error { defer func() { glog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := cc.csrStore.Store.GetByKey(key) - if err != nil { - return err - } - if !exists { + csr, err := cc.csrLister.Get(key) + if errors.IsNotFound(err) { glog.V(3).Infof("csr has been deleted: %v", key) return nil } - csr := obj.(*certificates.CertificateSigningRequest) + if err != nil { + return err + } + + if csr.Status.Certificate != nil { + // no need to do anything because it already has a cert + return nil + } + + // need to operate on a copy so we don't mutate the csr in the shared cache + copy, err := api.Scheme.DeepCopy(csr) + if err != nil { + return err + } + csr = copy.(*certificates.CertificateSigningRequest) if cc.approver != nil { csr, err = cc.approver.AutoApprove(csr) @@ -212,7 +216,7 @@ func (cc *CertificateController) maybeSignCertificate(key string) error { // 2. Generate a signed certificate // 3. Update the Status subresource - if cc.signer != nil && csr.Status.Certificate == nil && IsCertificateRequestApproved(csr) { + if cc.signer != nil && IsCertificateRequestApproved(csr) { csr, err := cc.signer.Sign(csr) if err != nil { return fmt.Errorf("error auto signing csr: %v", err) diff --git a/pkg/controller/certificates/certificate_controller_test.go b/pkg/controller/certificates/certificate_controller_test.go new file mode 100644 index 00000000000..beb2c30add7 --- /dev/null +++ b/pkg/controller/certificates/certificate_controller_test.go @@ -0,0 +1,221 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package certificates + +import ( + "bytes" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "io/ioutil" + "os" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/cert" + "k8s.io/client-go/util/cert/triple" + certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated" + "k8s.io/kubernetes/pkg/controller" +) + +type testController struct { + *CertificateController + certFile string + keyFile string + csrStore cache.Store + informerFactory informers.SharedInformerFactory + approver *fakeAutoApprover +} + +func alwaysReady() bool { return true } + +func newController(csrs ...runtime.Object) (*testController, error) { + client := fake.NewSimpleClientset(csrs...) + informerFactory := informers.NewSharedInformerFactory(nil, client, controller.NoResyncPeriodFunc()) + + certFile, keyFile, err := createTestCertFiles() + if err != nil { + return nil, err + } + + approver := &fakeAutoApprover{make(chan *certificates.CertificateSigningRequest, 1)} + controller, err := NewCertificateController( + client, + informerFactory.Certificates().V1beta1().CertificateSigningRequests(), + certFile, + keyFile, + approver, + ) + if err != nil { + return nil, err + } + controller.csrsSynced = alwaysReady + + return &testController{ + controller, + certFile, + keyFile, + informerFactory.Certificates().V1beta1().CertificateSigningRequests().Informer().GetStore(), + informerFactory, + approver, + }, nil +} + +func (c *testController) cleanup() { + os.Remove(c.certFile) + os.Remove(c.keyFile) +} + +func createTestCertFiles() (string, string, error) { + keyPair, err := triple.NewCA("test-ca") + if err != nil { + return "", "", err + } + + // Generate cert + certBuffer := bytes.Buffer{} + if err := pem.Encode(&certBuffer, &pem.Block{Type: "CERTIFICATE", Bytes: keyPair.Cert.Raw}); err != nil { + return "", "", err + } + + // Generate key + keyBuffer := bytes.Buffer{} + if err := pem.Encode(&keyBuffer, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(keyPair.Key)}); err != nil { + return "", "", err + } + + dir, err := ioutil.TempDir("", "") + if err != nil { + return "", "", err + } + + certFile, err := ioutil.TempFile(dir, "cert") + if err != nil { + return "", "", err + } + + keyFile, err := ioutil.TempFile(dir, "key") + if err != nil { + return "", "", err + } + + _, err = certFile.Write(certBuffer.Bytes()) + if err != nil { + return "", "", err + } + certFile.Close() + + _, err = keyFile.Write(keyBuffer.Bytes()) + if err != nil { + return "", "", err + } + keyFile.Close() + + return certFile.Name(), keyFile.Name(), nil +} + +type fakeAutoApprover struct { + csr chan *certificates.CertificateSigningRequest +} + +func (f *fakeAutoApprover) AutoApprove(csr *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) { + csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{ + Type: certificates.CertificateApproved, + Reason: "test reason", + Message: "test message", + }) + f.csr <- csr + return csr, nil +} + +// TODO flesh this out to cover things like not being able to find the csr in the cache, not +// auto-approving, etc. +func TestCertificateController(t *testing.T) { + csrKey, err := cert.NewPrivateKey() + if err != nil { + t.Fatalf("error creating private key for csr: %v", err) + } + + subject := &pkix.Name{ + Organization: []string{"test org"}, + CommonName: "test cn", + } + csrBytes, err := cert.MakeCSR(csrKey, subject, nil, nil) + if err != nil { + t.Fatalf("error creating csr: %v", err) + } + + csr := &certificates.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-csr", + }, + Spec: certificates.CertificateSigningRequestSpec{ + Request: csrBytes, + Usages: []certificates.KeyUsage{ + certificates.UsageDigitalSignature, + certificates.UsageKeyEncipherment, + certificates.UsageClientAuth, + }, + }, + } + + controller, err := newController(csr) + if err != nil { + t.Fatalf("error creating controller: %v", err) + } + defer controller.cleanup() + + received := make(chan struct{}) + + controllerSyncHandler := controller.syncHandler + controller.syncHandler = func(key string) error { + defer close(received) + return controllerSyncHandler(key) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(1, stopCh) + go controller.informerFactory.Start(stopCh) + + select { + case <-received: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out") + } + + csr = <-controller.approver.csr + + if e, a := 1, len(csr.Status.Conditions); e != a { + t.Fatalf("expected %d status condition, got %d", e, a) + } + if e, a := certificates.CertificateApproved, csr.Status.Conditions[0].Type; e != a { + t.Errorf("type: expected %v, got %v", e, a) + } + if e, a := "test reason", csr.Status.Conditions[0].Reason; e != a { + t.Errorf("reason: expected %v, got %v", e, a) + } + if e, a := "test message", csr.Status.Conditions[0].Message; e != a { + t.Errorf("message: expected %v, got %v", e, a) + } +}