Switch CSR controller to use shared informer

This commit is contained in:
Andy Goldstein 2017-02-07 13:21:08 -05:00
parent 74cf0484c3
commit e5fc73a4f1
4 changed files with 293 additions and 60 deletions

View File

@ -31,11 +31,10 @@ func startCSRController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] {
return false, nil
}
resyncPeriod := ResyncPeriod(&ctx.Options)()
c := ctx.ClientBuilder.ClientOrDie("certificate-controller")
certController, err := certcontroller.NewCertificateController(
c,
resyncPeriod,
ctx.NewInformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
ctx.Options.ClusterSigningCertFile,
ctx.Options.ClusterSigningKeyFile,
certcontroller.NewGroupApprover(ctx.Options.ApproveAllKubeletCSRsForGroup),

View File

@ -21,19 +21,19 @@ go_library(
deps = [
"//pkg/apis/certificates/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/informers/informers_generated/certificates/v1beta1:go_default_library",
"//pkg/client/listers/certificates/v1beta1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:github.com/cloudflare/cfssl/config",
"//vendor:github.com/cloudflare/cfssl/helpers",
"//vendor:github.com/cloudflare/cfssl/signer",
"//vendor:github.com/cloudflare/cfssl/signer/local",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/client-go/util/workqueue",
@ -56,6 +56,7 @@ filegroup(
go_test(
name = "go_default_test",
srcs = [
"certificate_controller_test.go",
"cfssl_signer_test.go",
"groupapprove_test.go",
],
@ -68,6 +69,14 @@ go_test(
tags = ["automanaged"],
deps = [
"//pkg/apis/certificates/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/cert",
"//vendor:k8s.io/client-go/util/cert/triple",
],
)

View File

@ -20,18 +20,18 @@ import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
certificatesinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/certificates/v1beta1"
certificateslisters "k8s.io/kubernetes/pkg/client/listers/certificates/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
@ -52,9 +52,8 @@ type Signer interface {
type CertificateController struct {
kubeClient clientset.Interface
// CSR framework and store
csrController cache.Controller
csrStore listers.StoreToCertificateRequestLister
csrLister certificateslisters.CertificateSigningRequestLister
csrsSynced cache.InformerSynced
syncHandler func(csrKey string) error
@ -64,7 +63,7 @@ type CertificateController struct {
queue workqueue.RateLimitingInterface
}
func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Duration, caCertFile, caKeyFile string, approver AutoApprover) (*CertificateController, error) {
func NewCertificateController(kubeClient clientset.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, caCertFile, caKeyFile string, approver AutoApprover) (*CertificateController, error) {
// Send events to the apiserver
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
@ -83,47 +82,37 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du
}
// Manage the addition/update of certificate requests
cc.csrStore.Store, cc.csrController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return cc.kubeClient.Certificates().CertificateSigningRequests().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cc.kubeClient.Certificates().CertificateSigningRequests().Watch(options)
},
csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
csr := obj.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Adding certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
&certificates.CertificateSigningRequest{},
syncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
csr := obj.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Adding certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
UpdateFunc: func(old, new interface{}) {
oldCSR := old.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Updating certificate request %s", oldCSR.Name)
cc.enqueueCertificateRequest(new)
},
DeleteFunc: func(obj interface{}) {
csr, ok := obj.(*certificates.CertificateSigningRequest)
UpdateFunc: func(old, new interface{}) {
oldCSR := old.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Updating certificate request %s", oldCSR.Name)
cc.enqueueCertificateRequest(new)
},
DeleteFunc: func(obj interface{}) {
csr, ok := obj.(*certificates.CertificateSigningRequest)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest)
if !ok {
glog.V(2).Infof("Tombstone contained object that is not a CSR: %#v", obj)
return
}
glog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
glog.V(4).Infof("Deleting certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest)
if !ok {
glog.V(2).Infof("Tombstone contained object that is not a CSR: %#v", obj)
return
}
}
glog.V(4).Infof("Deleting certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
)
})
cc.csrLister = csrInformer.Lister()
cc.csrsSynced = csrInformer.Informer().HasSynced
cc.syncHandler = cc.maybeSignCertificate
return cc, nil
}
@ -133,9 +122,13 @@ func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()
go cc.csrController.Run(stopCh)
glog.Infof("Starting certificate controller manager")
if !cache.WaitForCacheSync(stopCh, cc.csrsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}
@ -186,15 +179,26 @@ func (cc *CertificateController) maybeSignCertificate(key string) error {
defer func() {
glog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := cc.csrStore.Store.GetByKey(key)
if err != nil {
return err
}
if !exists {
csr, err := cc.csrLister.Get(key)
if errors.IsNotFound(err) {
glog.V(3).Infof("csr has been deleted: %v", key)
return nil
}
csr := obj.(*certificates.CertificateSigningRequest)
if err != nil {
return err
}
if csr.Status.Certificate != nil {
// no need to do anything because it already has a cert
return nil
}
// need to operate on a copy so we don't mutate the csr in the shared cache
copy, err := api.Scheme.DeepCopy(csr)
if err != nil {
return err
}
csr = copy.(*certificates.CertificateSigningRequest)
if cc.approver != nil {
csr, err = cc.approver.AutoApprove(csr)
@ -212,7 +216,7 @@ func (cc *CertificateController) maybeSignCertificate(key string) error {
// 2. Generate a signed certificate
// 3. Update the Status subresource
if cc.signer != nil && csr.Status.Certificate == nil && IsCertificateRequestApproved(csr) {
if cc.signer != nil && IsCertificateRequestApproved(csr) {
csr, err := cc.signer.Sign(csr)
if err != nil {
return fmt.Errorf("error auto signing csr: %v", err)

View File

@ -0,0 +1,221 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package certificates
import (
"bytes"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"io/ioutil"
"os"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/cert/triple"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
"k8s.io/kubernetes/pkg/controller"
)
type testController struct {
*CertificateController
certFile string
keyFile string
csrStore cache.Store
informerFactory informers.SharedInformerFactory
approver *fakeAutoApprover
}
func alwaysReady() bool { return true }
func newController(csrs ...runtime.Object) (*testController, error) {
client := fake.NewSimpleClientset(csrs...)
informerFactory := informers.NewSharedInformerFactory(nil, client, controller.NoResyncPeriodFunc())
certFile, keyFile, err := createTestCertFiles()
if err != nil {
return nil, err
}
approver := &fakeAutoApprover{make(chan *certificates.CertificateSigningRequest, 1)}
controller, err := NewCertificateController(
client,
informerFactory.Certificates().V1beta1().CertificateSigningRequests(),
certFile,
keyFile,
approver,
)
if err != nil {
return nil, err
}
controller.csrsSynced = alwaysReady
return &testController{
controller,
certFile,
keyFile,
informerFactory.Certificates().V1beta1().CertificateSigningRequests().Informer().GetStore(),
informerFactory,
approver,
}, nil
}
func (c *testController) cleanup() {
os.Remove(c.certFile)
os.Remove(c.keyFile)
}
func createTestCertFiles() (string, string, error) {
keyPair, err := triple.NewCA("test-ca")
if err != nil {
return "", "", err
}
// Generate cert
certBuffer := bytes.Buffer{}
if err := pem.Encode(&certBuffer, &pem.Block{Type: "CERTIFICATE", Bytes: keyPair.Cert.Raw}); err != nil {
return "", "", err
}
// Generate key
keyBuffer := bytes.Buffer{}
if err := pem.Encode(&keyBuffer, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(keyPair.Key)}); err != nil {
return "", "", err
}
dir, err := ioutil.TempDir("", "")
if err != nil {
return "", "", err
}
certFile, err := ioutil.TempFile(dir, "cert")
if err != nil {
return "", "", err
}
keyFile, err := ioutil.TempFile(dir, "key")
if err != nil {
return "", "", err
}
_, err = certFile.Write(certBuffer.Bytes())
if err != nil {
return "", "", err
}
certFile.Close()
_, err = keyFile.Write(keyBuffer.Bytes())
if err != nil {
return "", "", err
}
keyFile.Close()
return certFile.Name(), keyFile.Name(), nil
}
type fakeAutoApprover struct {
csr chan *certificates.CertificateSigningRequest
}
func (f *fakeAutoApprover) AutoApprove(csr *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) {
csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved,
Reason: "test reason",
Message: "test message",
})
f.csr <- csr
return csr, nil
}
// TODO flesh this out to cover things like not being able to find the csr in the cache, not
// auto-approving, etc.
func TestCertificateController(t *testing.T) {
csrKey, err := cert.NewPrivateKey()
if err != nil {
t.Fatalf("error creating private key for csr: %v", err)
}
subject := &pkix.Name{
Organization: []string{"test org"},
CommonName: "test cn",
}
csrBytes, err := cert.MakeCSR(csrKey, subject, nil, nil)
if err != nil {
t.Fatalf("error creating csr: %v", err)
}
csr := &certificates.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: "test-csr",
},
Spec: certificates.CertificateSigningRequestSpec{
Request: csrBytes,
Usages: []certificates.KeyUsage{
certificates.UsageDigitalSignature,
certificates.UsageKeyEncipherment,
certificates.UsageClientAuth,
},
},
}
controller, err := newController(csr)
if err != nil {
t.Fatalf("error creating controller: %v", err)
}
defer controller.cleanup()
received := make(chan struct{})
controllerSyncHandler := controller.syncHandler
controller.syncHandler = func(key string) error {
defer close(received)
return controllerSyncHandler(key)
}
stopCh := make(chan struct{})
defer close(stopCh)
go controller.Run(1, stopCh)
go controller.informerFactory.Start(stopCh)
select {
case <-received:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out")
}
csr = <-controller.approver.csr
if e, a := 1, len(csr.Status.Conditions); e != a {
t.Fatalf("expected %d status condition, got %d", e, a)
}
if e, a := certificates.CertificateApproved, csr.Status.Conditions[0].Type; e != a {
t.Errorf("type: expected %v, got %v", e, a)
}
if e, a := "test reason", csr.Status.Conditions[0].Reason; e != a {
t.Errorf("reason: expected %v, got %v", e, a)
}
if e, a := "test message", csr.Status.Conditions[0].Message; e != a {
t.Errorf("message: expected %v, got %v", e, a)
}
}