From 206f5892a7b884be3335472d37e0a0ad65c7c4dd Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Thu, 8 Nov 2018 19:14:12 -0800 Subject: [PATCH] combine syncs in rootcacertpublisher and some misc simplifications. --- .../app/certificates.go | 1 - .../certificates/rootcacertpublisher/BUILD | 2 +- .../rootcacertpublisher/publisher.go | 106 ++++++--------- .../rootcacertpublisher/publisher_test.go | 128 ++++++------------ 4 files changed, 88 insertions(+), 149 deletions(-) diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go index f254362d54c..f914e12010f 100644 --- a/cmd/kube-controller-manager/app/certificates.go +++ b/cmd/kube-controller-manager/app/certificates.go @@ -143,7 +143,6 @@ func startRootCACertPublisher(ctx ControllerContext) (http.Handler, bool, error) sac, err := rootcacertpublisher.NewPublisher( ctx.InformerFactory.Core().V1().ConfigMaps(), - ctx.InformerFactory.Core().V1().Namespaces(), ctx.ClientBuilder.ClientOrDie("root-ca-cert-publisher"), rootCA, ) diff --git a/pkg/controller/certificates/rootcacertpublisher/BUILD b/pkg/controller/certificates/rootcacertpublisher/BUILD index 27e8395ea8d..343843a7e64 100644 --- a/pkg/controller/certificates/rootcacertpublisher/BUILD +++ b/pkg/controller/certificates/rootcacertpublisher/BUILD @@ -44,8 +44,8 @@ go_test( "//pkg/controller:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", - "//staging/src/k8s.io/client-go/testing:go_default_library", ], ) diff --git a/pkg/controller/certificates/rootcacertpublisher/publisher.go b/pkg/controller/certificates/rootcacertpublisher/publisher.go index f512bb18667..2da38db463e 100644 --- a/pkg/controller/certificates/rootcacertpublisher/publisher.go +++ b/pkg/controller/certificates/rootcacertpublisher/publisher.go @@ -36,23 +36,18 @@ import ( "k8s.io/kubernetes/pkg/util/metrics" ) -// RootCACertCofigMapName is name of the configmap which stores certificates to access api-server +// RootCACertCofigMapName is name of the configmap which stores certificates to +// access api-server const RootCACertCofigMapName = "kube-root-ca.crt" -// NewPublisher construct a new controller which would manage the configmap which stores -// certificates in each namespace. It will make sure certificate configmap exists in each namespace. -func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, rootCA []byte) (*Publisher, error) { +// NewPublisher construct a new controller which would manage the configmap +// which stores certificates in each namespace. It will make sure certificate +// configmap exists in each namespace. +func NewPublisher(cmInformer coreinformers.ConfigMapInformer, cl clientset.Interface, rootCA []byte) (*Publisher, error) { e := &Publisher{ client: cl, - configMap: v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: RootCACertCofigMapName, - }, - Data: map[string]string{ - "ca.crt": string(rootCA), - }, - }, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root-ca-cert-publisher"), + rootCA: rootCA, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root-ca-cert-publisher"), } if cl.CoreV1().RESTClient().GetRateLimiter() != nil { if err := metrics.RegisterMetricAndTrackRateLimiterUsage("root_ca_cert_publisher", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil { @@ -67,13 +62,6 @@ func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinf e.cmLister = cmInformer.Lister() e.cmListerSynced = cmInformer.Informer().HasSynced - nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: e.namespaceAdded, - UpdateFunc: e.namespaceUpdated, - }) - e.nsLister = nsInformer.Lister() - e.nsListerSynced = nsInformer.Informer().HasSynced - e.syncHandler = e.syncNamespace return e, nil @@ -82,8 +70,8 @@ func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinf // Publisher manages certificate ConfigMap objects inside Namespaces type Publisher struct { - client clientset.Interface - configMap v1.ConfigMap + client clientset.Interface + rootCA []byte // To allow injection for testing. syncHandler func(key string) error @@ -91,9 +79,6 @@ type Publisher struct { cmLister corelisters.ConfigMapLister cmListerSynced cache.InformerSynced - nsLister corelisters.NamespaceLister - nsListerSynced cache.InformerSynced - queue workqueue.RateLimitingInterface } @@ -105,7 +90,7 @@ func (c *Publisher) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting root CA certificate configmap publisher") defer klog.Infof("Shutting down root CA certificate configmap publisher") - if !controller.WaitForCacheSync("crt configmap", stopCh, c.cmListerSynced, c.nsListerSynced) { + if !controller.WaitForCacheSync("crt configmap", stopCh, c.cmListerSynced) { return } @@ -129,24 +114,15 @@ func (c *Publisher) configMapDeleted(obj interface{}) { } func (c *Publisher) configMapUpdated(_, newObj interface{}) { - newConfigMap, err := convertToCM(newObj) + cm, err := convertToCM(newObj) if err != nil { utilruntime.HandleError(err) return } - if newConfigMap.Name != RootCACertCofigMapName { + if cm.Name != RootCACertCofigMapName { return } - - if reflect.DeepEqual(c.configMap.Data, newConfigMap.Data) { - return - } - - newConfigMap.Data = make(map[string]string) - newConfigMap.Data["ca.crt"] = c.configMap.Data["ca.crt"] - if _, err := c.client.CoreV1().ConfigMaps(newConfigMap.Namespace).Update(newConfigMap); err != nil && !apierrs.IsAlreadyExists(err) { - utilruntime.HandleError(fmt.Errorf("configmap creation failure:%v", err)) - } + c.queue.Add(cm.Namespace) } func (c *Publisher) namespaceAdded(obj interface{}) { @@ -167,7 +143,8 @@ func (c *Publisher) runWorker() { } } -// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +// processNextWorkItem deals with one key off the queue. It returns false when +// it's time to quit. func (c *Publisher) processNextWorkItem() bool { key, quit := c.queue.Get() if quit { @@ -175,45 +152,50 @@ func (c *Publisher) processNextWorkItem() bool { } defer c.queue.Done(key) - err := c.syncHandler(key.(string)) - if err == nil { - c.queue.Forget(key) + if err := c.syncHandler(key.(string)); err != nil { + utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err)) + c.queue.AddRateLimited(key) return true } - utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) - c.queue.AddRateLimited(key) - + c.queue.Forget(key) return true } -func (c *Publisher) syncNamespace(key string) error { +func (c *Publisher) syncNamespace(ns string) error { startTime := time.Now() defer func() { - klog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Since(startTime)) + klog.V(4).Infof("Finished syncing namespace %q (%v)", ns, time.Since(startTime)) }() - ns, err := c.nsLister.Get(key) - if apierrs.IsNotFound(err) { - return nil - } - if err != nil { - return err - } - - switch _, err := c.cmLister.ConfigMaps(ns.Name).Get(c.configMap.Name); { - case err == nil: - return nil + cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertCofigMapName) + switch { case apierrs.IsNotFound(err): + _, err := c.client.CoreV1().ConfigMaps(ns).Create(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: RootCACertCofigMapName, + }, + Data: map[string]string{ + "ca.crt": string(c.rootCA), + }, + }) + return err case err != nil: return err } - cm := c.configMap.DeepCopy() - if _, err := c.client.CoreV1().ConfigMaps(ns.Name).Create(cm); err != nil && !apierrs.IsAlreadyExists(err) { - return err + data := map[string]string{ + "ca.crt": string(c.rootCA), } - return nil + + if reflect.DeepEqual(cm.Data, data) { + return nil + } + + cm.Data = data + + _, err = c.client.CoreV1().ConfigMaps(ns).Update(cm) + return err } func convertToCM(obj interface{}) (*v1.ConfigMap, error) { diff --git a/pkg/controller/certificates/rootcacertpublisher/publisher_test.go b/pkg/controller/certificates/rootcacertpublisher/publisher_test.go index 97a4043baf4..2bf9c909ff3 100644 --- a/pkg/controller/certificates/rootcacertpublisher/publisher_test.go +++ b/pkg/controller/certificates/rootcacertpublisher/publisher_test.go @@ -17,14 +17,14 @@ limitations under the License. package rootcacertpublisher import ( + "reflect" "testing" - "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/diff" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - core "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/controller" ) @@ -76,134 +76,92 @@ func TestConfigMapCreation(t *testing.T) { name string } testcases := map[string]struct { - ExistingNamespace *v1.Namespace ExistingConfigMaps []*v1.ConfigMap AddedNamespace *v1.Namespace UpdatedNamespace *v1.Namespace DeletedConfigMap *v1.ConfigMap - UpdatedConfigMap []*v1.ConfigMap + UpdatedConfigMap *v1.ConfigMap ExpectActions []action }{ "create new namesapce": { AddedNamespace: newNs, ExpectActions: []action{{verb: "create", name: RootCACertCofigMapName}}, }, - "delete other configmap": { - ExistingNamespace: existNS, ExistingConfigMaps: []*v1.ConfigMap{otherConfigMap, caConfigMap}, DeletedConfigMap: otherConfigMap, }, "delete ca configmap": { - ExistingNamespace: existNS, ExistingConfigMaps: []*v1.ConfigMap{otherConfigMap, caConfigMap}, DeletedConfigMap: caConfigMap, ExpectActions: []action{{verb: "create", name: RootCACertCofigMapName}}, }, "update ca configmap with adding field": { - ExistingNamespace: existNS, ExistingConfigMaps: []*v1.ConfigMap{caConfigMap}, - UpdatedConfigMap: []*v1.ConfigMap{caConfigMap, addFieldCM}, + UpdatedConfigMap: addFieldCM, ExpectActions: []action{{verb: "update", name: RootCACertCofigMapName}}, }, "update ca configmap with modifying field": { - ExistingNamespace: existNS, ExistingConfigMaps: []*v1.ConfigMap{caConfigMap}, - UpdatedConfigMap: []*v1.ConfigMap{caConfigMap, modifyFieldCM}, + UpdatedConfigMap: modifyFieldCM, ExpectActions: []action{{verb: "update", name: RootCACertCofigMapName}}, }, "update with other configmap": { - ExistingNamespace: existNS, ExistingConfigMaps: []*v1.ConfigMap{caConfigMap, otherConfigMap}, - UpdatedConfigMap: []*v1.ConfigMap{otherConfigMap, updateOtherConfigMap}, + UpdatedConfigMap: updateOtherConfigMap, }, "update namespace with terminating state": { - ExistingNamespace: existNS, - UpdatedNamespace: terminatingNS, + UpdatedNamespace: terminatingNS, }, } for k, tc := range testcases { - client := fake.NewSimpleClientset(caConfigMap, existNS) - informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc()) - cmInformer := informers.Core().V1().ConfigMaps() - nsInformer := informers.Core().V1().Namespaces() - controller, err := NewPublisher(cmInformer, nsInformer, client, fakeRootCA) - if err != nil { - t.Fatalf("error creating ServiceAccounts controller: %v", err) - } - controller.cmListerSynced = alwaysReady - controller.nsListerSynced = alwaysReady - - cmStore := cmInformer.Informer().GetStore() - nsStore := nsInformer.Informer().GetStore() - - syncCalls := make(chan struct{}) - controller.syncHandler = func(key string) error { - err := controller.syncNamespace(key) + t.Run(k, func(t *testing.T) { + client := fake.NewSimpleClientset(caConfigMap, existNS) + informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc()) + cmInformer := informers.Core().V1().ConfigMaps() + controller, err := NewPublisher(cmInformer, client, fakeRootCA) if err != nil { - t.Logf("%s: %v", k, err) + t.Fatalf("error creating ServiceAccounts controller: %v", err) } - syncCalls <- struct{}{} - return err - } - stopCh := make(chan struct{}) - defer close(stopCh) - go controller.Run(1, stopCh) - if tc.ExistingNamespace != nil { - nsStore.Add(tc.ExistingNamespace) - } - for _, s := range tc.ExistingConfigMaps { - cmStore.Add(s) - } + cmStore := cmInformer.Informer().GetStore() - if tc.AddedNamespace != nil { - nsStore.Add(tc.AddedNamespace) - controller.namespaceAdded(tc.AddedNamespace) - } - if tc.UpdatedNamespace != nil { - controller.namespaceUpdated(tc.ExistingNamespace, tc.UpdatedNamespace) - } + controller.syncHandler = controller.syncNamespace - if tc.DeletedConfigMap != nil { - cmStore.Delete(tc.DeletedConfigMap) - controller.configMapDeleted(tc.DeletedConfigMap) - } - - if tc.UpdatedConfigMap != nil { - old := tc.UpdatedConfigMap[0] - new := tc.UpdatedConfigMap[1] - controller.configMapUpdated(old, new) - } - - // wait to be called - select { - case <-syncCalls: - case <-time.After(5 * time.Second): - } - - actions := client.Actions() - if len(tc.ExpectActions) != len(actions) { - t.Errorf("%s: Expected to create configmap %#v. Actual actions were: %#v", k, tc.ExpectActions, actions) - continue - } - for i, expectAction := range tc.ExpectActions { - action := actions[i] - if !action.Matches(expectAction.verb, "configmaps") { - t.Errorf("%s: Unexpected action %s", k, action) - break + for _, s := range tc.ExistingConfigMaps { + cmStore.Add(s) } - cm := action.(core.CreateAction).GetObject().(*v1.ConfigMap) - if cm.Name != expectAction.name { - t.Errorf("%s: Expected %s to be %s, got %s be %s", k, expectAction.name, expectAction.verb, cm.Name, action.GetVerb()) + + if tc.AddedNamespace != nil { + controller.namespaceAdded(tc.AddedNamespace) } - } + if tc.UpdatedNamespace != nil { + controller.namespaceUpdated(nil, tc.UpdatedNamespace) + } + + if tc.DeletedConfigMap != nil { + cmStore.Delete(tc.DeletedConfigMap) + controller.configMapDeleted(tc.DeletedConfigMap) + } + + if tc.UpdatedConfigMap != nil { + cmStore.Add(tc.UpdatedConfigMap) + controller.configMapUpdated(nil, tc.UpdatedConfigMap) + } + + for controller.queue.Len() != 0 { + controller.processNextWorkItem() + } + + actions := client.Actions() + if reflect.DeepEqual(actions, tc.ExpectActions) { + t.Errorf("Unexpected actions:\n%s", diff.ObjectGoPrintDiff(actions, tc.ExpectActions)) + } + }) } } -var alwaysReady = func() bool { return true } - func defaultCrtConfigMapPtr(rootCA []byte) *v1.ConfigMap { tmp := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{