Merge pull request #41084 from ncdc/shared-informers-03-certs

Automatic merge from submit-queue (batch tested with PRs 41037, 40118, 40959, 41084, 41092)

Switch CSR controller to use shared informer

Switch the CSR controller to use a shared informer. Originally part of #40097 but I'm splitting that up into multiple PRs.

I have added a test to try to ensure we don't mutate the cache. It could use some fleshing out for additional coverage but it gets the initial job done, I think.

cc @mikedanese @deads2k @liggitt @sttts @kubernetes/sig-scalability-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-02-09 16:44:43 -08:00 committed by GitHub
commit d2ada4bbd3
4 changed files with 293 additions and 60 deletions

View File

@ -31,11 +31,10 @@ func startCSRController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "certificates.k8s.io", Version: "v1beta1", Resource: "certificatesigningrequests"}] {
return false, nil
}
resyncPeriod := ResyncPeriod(&ctx.Options)()
c := ctx.ClientBuilder.ClientOrDie("certificate-controller")
certController, err := certcontroller.NewCertificateController(
c,
resyncPeriod,
ctx.NewInformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
ctx.Options.ClusterSigningCertFile,
ctx.Options.ClusterSigningKeyFile,
certcontroller.NewGroupApprover(ctx.Options.ApproveAllKubeletCSRsForGroup),

View File

@ -21,19 +21,19 @@ go_library(
deps = [
"//pkg/apis/certificates/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/informers/informers_generated/certificates/v1beta1:go_default_library",
"//pkg/client/listers/certificates/v1beta1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:github.com/cloudflare/cfssl/config",
"//vendor:github.com/cloudflare/cfssl/helpers",
"//vendor:github.com/cloudflare/cfssl/signer",
"//vendor:github.com/cloudflare/cfssl/signer/local",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/client-go/util/workqueue",
@ -56,6 +56,7 @@ filegroup(
go_test(
name = "go_default_test",
srcs = [
"certificate_controller_test.go",
"cfssl_signer_test.go",
"groupapprove_test.go",
],
@ -68,6 +69,14 @@ go_test(
tags = ["automanaged"],
deps = [
"//pkg/apis/certificates/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/cert",
"//vendor:k8s.io/client-go/util/cert/triple",
],
)

View File

@ -20,18 +20,18 @@ import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
certificatesinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/certificates/v1beta1"
certificateslisters "k8s.io/kubernetes/pkg/client/listers/certificates/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
@ -52,9 +52,8 @@ type Signer interface {
type CertificateController struct {
kubeClient clientset.Interface
// CSR framework and store
csrController cache.Controller
csrStore listers.StoreToCertificateRequestLister
csrLister certificateslisters.CertificateSigningRequestLister
csrsSynced cache.InformerSynced
syncHandler func(csrKey string) error
@ -64,7 +63,7 @@ type CertificateController struct {
queue workqueue.RateLimitingInterface
}
func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Duration, caCertFile, caKeyFile string, approver AutoApprover) (*CertificateController, error) {
func NewCertificateController(kubeClient clientset.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, caCertFile, caKeyFile string, approver AutoApprover) (*CertificateController, error) {
// Send events to the apiserver
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
@ -83,47 +82,37 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du
}
// Manage the addition/update of certificate requests
cc.csrStore.Store, cc.csrController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return cc.kubeClient.Certificates().CertificateSigningRequests().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return cc.kubeClient.Certificates().CertificateSigningRequests().Watch(options)
},
csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
csr := obj.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Adding certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
&certificates.CertificateSigningRequest{},
syncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
csr := obj.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Adding certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
UpdateFunc: func(old, new interface{}) {
oldCSR := old.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Updating certificate request %s", oldCSR.Name)
cc.enqueueCertificateRequest(new)
},
DeleteFunc: func(obj interface{}) {
csr, ok := obj.(*certificates.CertificateSigningRequest)
UpdateFunc: func(old, new interface{}) {
oldCSR := old.(*certificates.CertificateSigningRequest)
glog.V(4).Infof("Updating certificate request %s", oldCSR.Name)
cc.enqueueCertificateRequest(new)
},
DeleteFunc: func(obj interface{}) {
csr, ok := obj.(*certificates.CertificateSigningRequest)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest)
if !ok {
glog.V(2).Infof("Tombstone contained object that is not a CSR: %#v", obj)
return
}
glog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
glog.V(4).Infof("Deleting certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest)
if !ok {
glog.V(2).Infof("Tombstone contained object that is not a CSR: %#v", obj)
return
}
}
glog.V(4).Infof("Deleting certificate request %s", csr.Name)
cc.enqueueCertificateRequest(obj)
},
)
})
cc.csrLister = csrInformer.Lister()
cc.csrsSynced = csrInformer.Informer().HasSynced
cc.syncHandler = cc.maybeSignCertificate
return cc, nil
}
@ -133,9 +122,13 @@ func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()
go cc.csrController.Run(stopCh)
glog.Infof("Starting certificate controller manager")
if !cache.WaitForCacheSync(stopCh, cc.csrsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}
@ -186,15 +179,26 @@ func (cc *CertificateController) maybeSignCertificate(key string) error {
defer func() {
glog.V(4).Infof("Finished syncing certificate request %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := cc.csrStore.Store.GetByKey(key)
if err != nil {
return err
}
if !exists {
csr, err := cc.csrLister.Get(key)
if errors.IsNotFound(err) {
glog.V(3).Infof("csr has been deleted: %v", key)
return nil
}
csr := obj.(*certificates.CertificateSigningRequest)
if err != nil {
return err
}
if csr.Status.Certificate != nil {
// no need to do anything because it already has a cert
return nil
}
// need to operate on a copy so we don't mutate the csr in the shared cache
copy, err := api.Scheme.DeepCopy(csr)
if err != nil {
return err
}
csr = copy.(*certificates.CertificateSigningRequest)
if cc.approver != nil {
csr, err = cc.approver.AutoApprove(csr)
@ -212,7 +216,7 @@ func (cc *CertificateController) maybeSignCertificate(key string) error {
// 2. Generate a signed certificate
// 3. Update the Status subresource
if cc.signer != nil && csr.Status.Certificate == nil && IsCertificateRequestApproved(csr) {
if cc.signer != nil && IsCertificateRequestApproved(csr) {
csr, err := cc.signer.Sign(csr)
if err != nil {
return fmt.Errorf("error auto signing csr: %v", err)

View File

@ -0,0 +1,221 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package certificates
import (
"bytes"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"io/ioutil"
"os"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/cert/triple"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
"k8s.io/kubernetes/pkg/controller"
)
type testController struct {
*CertificateController
certFile string
keyFile string
csrStore cache.Store
informerFactory informers.SharedInformerFactory
approver *fakeAutoApprover
}
func alwaysReady() bool { return true }
func newController(csrs ...runtime.Object) (*testController, error) {
client := fake.NewSimpleClientset(csrs...)
informerFactory := informers.NewSharedInformerFactory(nil, client, controller.NoResyncPeriodFunc())
certFile, keyFile, err := createTestCertFiles()
if err != nil {
return nil, err
}
approver := &fakeAutoApprover{make(chan *certificates.CertificateSigningRequest, 1)}
controller, err := NewCertificateController(
client,
informerFactory.Certificates().V1beta1().CertificateSigningRequests(),
certFile,
keyFile,
approver,
)
if err != nil {
return nil, err
}
controller.csrsSynced = alwaysReady
return &testController{
controller,
certFile,
keyFile,
informerFactory.Certificates().V1beta1().CertificateSigningRequests().Informer().GetStore(),
informerFactory,
approver,
}, nil
}
func (c *testController) cleanup() {
os.Remove(c.certFile)
os.Remove(c.keyFile)
}
func createTestCertFiles() (string, string, error) {
keyPair, err := triple.NewCA("test-ca")
if err != nil {
return "", "", err
}
// Generate cert
certBuffer := bytes.Buffer{}
if err := pem.Encode(&certBuffer, &pem.Block{Type: "CERTIFICATE", Bytes: keyPair.Cert.Raw}); err != nil {
return "", "", err
}
// Generate key
keyBuffer := bytes.Buffer{}
if err := pem.Encode(&keyBuffer, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(keyPair.Key)}); err != nil {
return "", "", err
}
dir, err := ioutil.TempDir("", "")
if err != nil {
return "", "", err
}
certFile, err := ioutil.TempFile(dir, "cert")
if err != nil {
return "", "", err
}
keyFile, err := ioutil.TempFile(dir, "key")
if err != nil {
return "", "", err
}
_, err = certFile.Write(certBuffer.Bytes())
if err != nil {
return "", "", err
}
certFile.Close()
_, err = keyFile.Write(keyBuffer.Bytes())
if err != nil {
return "", "", err
}
keyFile.Close()
return certFile.Name(), keyFile.Name(), nil
}
type fakeAutoApprover struct {
csr chan *certificates.CertificateSigningRequest
}
func (f *fakeAutoApprover) AutoApprove(csr *certificates.CertificateSigningRequest) (*certificates.CertificateSigningRequest, error) {
csr.Status.Conditions = append(csr.Status.Conditions, certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved,
Reason: "test reason",
Message: "test message",
})
f.csr <- csr
return csr, nil
}
// TODO flesh this out to cover things like not being able to find the csr in the cache, not
// auto-approving, etc.
func TestCertificateController(t *testing.T) {
csrKey, err := cert.NewPrivateKey()
if err != nil {
t.Fatalf("error creating private key for csr: %v", err)
}
subject := &pkix.Name{
Organization: []string{"test org"},
CommonName: "test cn",
}
csrBytes, err := cert.MakeCSR(csrKey, subject, nil, nil)
if err != nil {
t.Fatalf("error creating csr: %v", err)
}
csr := &certificates.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: "test-csr",
},
Spec: certificates.CertificateSigningRequestSpec{
Request: csrBytes,
Usages: []certificates.KeyUsage{
certificates.UsageDigitalSignature,
certificates.UsageKeyEncipherment,
certificates.UsageClientAuth,
},
},
}
controller, err := newController(csr)
if err != nil {
t.Fatalf("error creating controller: %v", err)
}
defer controller.cleanup()
received := make(chan struct{})
controllerSyncHandler := controller.syncHandler
controller.syncHandler = func(key string) error {
defer close(received)
return controllerSyncHandler(key)
}
stopCh := make(chan struct{})
defer close(stopCh)
go controller.Run(1, stopCh)
go controller.informerFactory.Start(stopCh)
select {
case <-received:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out")
}
csr = <-controller.approver.csr
if e, a := 1, len(csr.Status.Conditions); e != a {
t.Fatalf("expected %d status condition, got %d", e, a)
}
if e, a := certificates.CertificateApproved, csr.Status.Conditions[0].Type; e != a {
t.Errorf("type: expected %v, got %v", e, a)
}
if e, a := "test reason", csr.Status.Conditions[0].Reason; e != a {
t.Errorf("reason: expected %v, got %v", e, a)
}
if e, a := "test message", csr.Status.Conditions[0].Message; e != a {
t.Errorf("message: expected %v, got %v", e, a)
}
}