Merge pull request #70842 from mikedanese/simplify

combine syncs in rootcacertpublisher
This commit is contained in:
k8s-ci-robot 2018-11-12 12:01:45 -08:00 committed by GitHub
commit 08784ad9af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 149 deletions

View File

@ -143,7 +143,6 @@ func startRootCACertPublisher(ctx ControllerContext) (http.Handler, bool, error)
sac, err := rootcacertpublisher.NewPublisher( sac, err := rootcacertpublisher.NewPublisher(
ctx.InformerFactory.Core().V1().ConfigMaps(), ctx.InformerFactory.Core().V1().ConfigMaps(),
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ClientBuilder.ClientOrDie("root-ca-cert-publisher"), ctx.ClientBuilder.ClientOrDie("root-ca-cert-publisher"),
rootCA, rootCA,
) )

View File

@ -44,8 +44,8 @@ go_test(
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
], ],
) )

View File

@ -36,23 +36,18 @@ import (
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
) )
// RootCACertCofigMapName is name of the configmap which stores certificates to access api-server // RootCACertCofigMapName is name of the configmap which stores certificates to
// access api-server
const RootCACertCofigMapName = "kube-root-ca.crt" const RootCACertCofigMapName = "kube-root-ca.crt"
// NewPublisher construct a new controller which would manage the configmap which stores // NewPublisher construct a new controller which would manage the configmap
// certificates in each namespace. It will make sure certificate configmap exists in each namespace. // which stores certificates in each namespace. It will make sure certificate
func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, rootCA []byte) (*Publisher, error) { // configmap exists in each namespace.
func NewPublisher(cmInformer coreinformers.ConfigMapInformer, cl clientset.Interface, rootCA []byte) (*Publisher, error) {
e := &Publisher{ e := &Publisher{
client: cl, client: cl,
configMap: v1.ConfigMap{ rootCA: rootCA,
ObjectMeta: metav1.ObjectMeta{ queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root-ca-cert-publisher"),
Name: RootCACertCofigMapName,
},
Data: map[string]string{
"ca.crt": string(rootCA),
},
},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root-ca-cert-publisher"),
} }
if cl.CoreV1().RESTClient().GetRateLimiter() != nil { if cl.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("root_ca_cert_publisher", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil { if err := metrics.RegisterMetricAndTrackRateLimiterUsage("root_ca_cert_publisher", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
@ -67,13 +62,6 @@ func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinf
e.cmLister = cmInformer.Lister() e.cmLister = cmInformer.Lister()
e.cmListerSynced = cmInformer.Informer().HasSynced e.cmListerSynced = cmInformer.Informer().HasSynced
nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.namespaceAdded,
UpdateFunc: e.namespaceUpdated,
})
e.nsLister = nsInformer.Lister()
e.nsListerSynced = nsInformer.Informer().HasSynced
e.syncHandler = e.syncNamespace e.syncHandler = e.syncNamespace
return e, nil return e, nil
@ -82,8 +70,8 @@ func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinf
// Publisher manages certificate ConfigMap objects inside Namespaces // Publisher manages certificate ConfigMap objects inside Namespaces
type Publisher struct { type Publisher struct {
client clientset.Interface client clientset.Interface
configMap v1.ConfigMap rootCA []byte
// To allow injection for testing. // To allow injection for testing.
syncHandler func(key string) error syncHandler func(key string) error
@ -91,9 +79,6 @@ type Publisher struct {
cmLister corelisters.ConfigMapLister cmLister corelisters.ConfigMapLister
cmListerSynced cache.InformerSynced cmListerSynced cache.InformerSynced
nsLister corelisters.NamespaceLister
nsListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
} }
@ -105,7 +90,7 @@ func (c *Publisher) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting root CA certificate configmap publisher") klog.Infof("Starting root CA certificate configmap publisher")
defer klog.Infof("Shutting down root CA certificate configmap publisher") defer klog.Infof("Shutting down root CA certificate configmap publisher")
if !controller.WaitForCacheSync("crt configmap", stopCh, c.cmListerSynced, c.nsListerSynced) { if !controller.WaitForCacheSync("crt configmap", stopCh, c.cmListerSynced) {
return return
} }
@ -129,24 +114,15 @@ func (c *Publisher) configMapDeleted(obj interface{}) {
} }
func (c *Publisher) configMapUpdated(_, newObj interface{}) { func (c *Publisher) configMapUpdated(_, newObj interface{}) {
newConfigMap, err := convertToCM(newObj) cm, err := convertToCM(newObj)
if err != nil { if err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
return return
} }
if newConfigMap.Name != RootCACertCofigMapName { if cm.Name != RootCACertCofigMapName {
return return
} }
c.queue.Add(cm.Namespace)
if reflect.DeepEqual(c.configMap.Data, newConfigMap.Data) {
return
}
newConfigMap.Data = make(map[string]string)
newConfigMap.Data["ca.crt"] = c.configMap.Data["ca.crt"]
if _, err := c.client.CoreV1().ConfigMaps(newConfigMap.Namespace).Update(newConfigMap); err != nil && !apierrs.IsAlreadyExists(err) {
utilruntime.HandleError(fmt.Errorf("configmap creation failure:%v", err))
}
} }
func (c *Publisher) namespaceAdded(obj interface{}) { func (c *Publisher) namespaceAdded(obj interface{}) {
@ -167,7 +143,8 @@ func (c *Publisher) runWorker() {
} }
} }
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. // processNextWorkItem deals with one key off the queue. It returns false when
// it's time to quit.
func (c *Publisher) processNextWorkItem() bool { func (c *Publisher) processNextWorkItem() bool {
key, quit := c.queue.Get() key, quit := c.queue.Get()
if quit { if quit {
@ -175,45 +152,50 @@ func (c *Publisher) processNextWorkItem() bool {
} }
defer c.queue.Done(key) defer c.queue.Done(key)
err := c.syncHandler(key.(string)) if err := c.syncHandler(key.(string)); err != nil {
if err == nil { utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err))
c.queue.Forget(key) c.queue.AddRateLimited(key)
return true return true
} }
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) c.queue.Forget(key)
c.queue.AddRateLimited(key)
return true return true
} }
func (c *Publisher) syncNamespace(key string) error { func (c *Publisher) syncNamespace(ns string) error {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
klog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Since(startTime)) klog.V(4).Infof("Finished syncing namespace %q (%v)", ns, time.Since(startTime))
}() }()
ns, err := c.nsLister.Get(key) cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertCofigMapName)
if apierrs.IsNotFound(err) { switch {
return nil
}
if err != nil {
return err
}
switch _, err := c.cmLister.ConfigMaps(ns.Name).Get(c.configMap.Name); {
case err == nil:
return nil
case apierrs.IsNotFound(err): case apierrs.IsNotFound(err):
_, err := c.client.CoreV1().ConfigMaps(ns).Create(&v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: RootCACertCofigMapName,
},
Data: map[string]string{
"ca.crt": string(c.rootCA),
},
})
return err
case err != nil: case err != nil:
return err return err
} }
cm := c.configMap.DeepCopy() data := map[string]string{
if _, err := c.client.CoreV1().ConfigMaps(ns.Name).Create(cm); err != nil && !apierrs.IsAlreadyExists(err) { "ca.crt": string(c.rootCA),
return err
} }
return nil
if reflect.DeepEqual(cm.Data, data) {
return nil
}
cm.Data = data
_, err = c.client.CoreV1().ConfigMaps(ns).Update(cm)
return err
} }
func convertToCM(obj interface{}) (*v1.ConfigMap, error) { func convertToCM(obj interface{}) (*v1.ConfigMap, error) {

View File

@ -17,14 +17,14 @@ limitations under the License.
package rootcacertpublisher package rootcacertpublisher
import ( import (
"reflect"
"testing" "testing"
"time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
@ -76,134 +76,92 @@ func TestConfigMapCreation(t *testing.T) {
name string name string
} }
testcases := map[string]struct { testcases := map[string]struct {
ExistingNamespace *v1.Namespace
ExistingConfigMaps []*v1.ConfigMap ExistingConfigMaps []*v1.ConfigMap
AddedNamespace *v1.Namespace AddedNamespace *v1.Namespace
UpdatedNamespace *v1.Namespace UpdatedNamespace *v1.Namespace
DeletedConfigMap *v1.ConfigMap DeletedConfigMap *v1.ConfigMap
UpdatedConfigMap []*v1.ConfigMap UpdatedConfigMap *v1.ConfigMap
ExpectActions []action ExpectActions []action
}{ }{
"create new namesapce": { "create new namesapce": {
AddedNamespace: newNs, AddedNamespace: newNs,
ExpectActions: []action{{verb: "create", name: RootCACertCofigMapName}}, ExpectActions: []action{{verb: "create", name: RootCACertCofigMapName}},
}, },
"delete other configmap": { "delete other configmap": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{otherConfigMap, caConfigMap}, ExistingConfigMaps: []*v1.ConfigMap{otherConfigMap, caConfigMap},
DeletedConfigMap: otherConfigMap, DeletedConfigMap: otherConfigMap,
}, },
"delete ca configmap": { "delete ca configmap": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{otherConfigMap, caConfigMap}, ExistingConfigMaps: []*v1.ConfigMap{otherConfigMap, caConfigMap},
DeletedConfigMap: caConfigMap, DeletedConfigMap: caConfigMap,
ExpectActions: []action{{verb: "create", name: RootCACertCofigMapName}}, ExpectActions: []action{{verb: "create", name: RootCACertCofigMapName}},
}, },
"update ca configmap with adding field": { "update ca configmap with adding field": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{caConfigMap}, ExistingConfigMaps: []*v1.ConfigMap{caConfigMap},
UpdatedConfigMap: []*v1.ConfigMap{caConfigMap, addFieldCM}, UpdatedConfigMap: addFieldCM,
ExpectActions: []action{{verb: "update", name: RootCACertCofigMapName}}, ExpectActions: []action{{verb: "update", name: RootCACertCofigMapName}},
}, },
"update ca configmap with modifying field": { "update ca configmap with modifying field": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{caConfigMap}, ExistingConfigMaps: []*v1.ConfigMap{caConfigMap},
UpdatedConfigMap: []*v1.ConfigMap{caConfigMap, modifyFieldCM}, UpdatedConfigMap: modifyFieldCM,
ExpectActions: []action{{verb: "update", name: RootCACertCofigMapName}}, ExpectActions: []action{{verb: "update", name: RootCACertCofigMapName}},
}, },
"update with other configmap": { "update with other configmap": {
ExistingNamespace: existNS,
ExistingConfigMaps: []*v1.ConfigMap{caConfigMap, otherConfigMap}, ExistingConfigMaps: []*v1.ConfigMap{caConfigMap, otherConfigMap},
UpdatedConfigMap: []*v1.ConfigMap{otherConfigMap, updateOtherConfigMap}, UpdatedConfigMap: updateOtherConfigMap,
}, },
"update namespace with terminating state": { "update namespace with terminating state": {
ExistingNamespace: existNS, UpdatedNamespace: terminatingNS,
UpdatedNamespace: terminatingNS,
}, },
} }
for k, tc := range testcases { for k, tc := range testcases {
client := fake.NewSimpleClientset(caConfigMap, existNS) t.Run(k, func(t *testing.T) {
informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc()) client := fake.NewSimpleClientset(caConfigMap, existNS)
cmInformer := informers.Core().V1().ConfigMaps() informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc())
nsInformer := informers.Core().V1().Namespaces() cmInformer := informers.Core().V1().ConfigMaps()
controller, err := NewPublisher(cmInformer, nsInformer, client, fakeRootCA) controller, err := NewPublisher(cmInformer, client, fakeRootCA)
if err != nil {
t.Fatalf("error creating ServiceAccounts controller: %v", err)
}
controller.cmListerSynced = alwaysReady
controller.nsListerSynced = alwaysReady
cmStore := cmInformer.Informer().GetStore()
nsStore := nsInformer.Informer().GetStore()
syncCalls := make(chan struct{})
controller.syncHandler = func(key string) error {
err := controller.syncNamespace(key)
if err != nil { if err != nil {
t.Logf("%s: %v", k, err) t.Fatalf("error creating ServiceAccounts controller: %v", err)
} }
syncCalls <- struct{}{}
return err
}
stopCh := make(chan struct{})
defer close(stopCh)
go controller.Run(1, stopCh)
if tc.ExistingNamespace != nil { cmStore := cmInformer.Informer().GetStore()
nsStore.Add(tc.ExistingNamespace)
}
for _, s := range tc.ExistingConfigMaps {
cmStore.Add(s)
}
if tc.AddedNamespace != nil { controller.syncHandler = controller.syncNamespace
nsStore.Add(tc.AddedNamespace)
controller.namespaceAdded(tc.AddedNamespace)
}
if tc.UpdatedNamespace != nil {
controller.namespaceUpdated(tc.ExistingNamespace, tc.UpdatedNamespace)
}
if tc.DeletedConfigMap != nil { for _, s := range tc.ExistingConfigMaps {
cmStore.Delete(tc.DeletedConfigMap) cmStore.Add(s)
controller.configMapDeleted(tc.DeletedConfigMap)
}
if tc.UpdatedConfigMap != nil {
old := tc.UpdatedConfigMap[0]
new := tc.UpdatedConfigMap[1]
controller.configMapUpdated(old, new)
}
// wait to be called
select {
case <-syncCalls:
case <-time.After(5 * time.Second):
}
actions := client.Actions()
if len(tc.ExpectActions) != len(actions) {
t.Errorf("%s: Expected to create configmap %#v. Actual actions were: %#v", k, tc.ExpectActions, actions)
continue
}
for i, expectAction := range tc.ExpectActions {
action := actions[i]
if !action.Matches(expectAction.verb, "configmaps") {
t.Errorf("%s: Unexpected action %s", k, action)
break
} }
cm := action.(core.CreateAction).GetObject().(*v1.ConfigMap)
if cm.Name != expectAction.name { if tc.AddedNamespace != nil {
t.Errorf("%s: Expected %s to be %s, got %s be %s", k, expectAction.name, expectAction.verb, cm.Name, action.GetVerb()) controller.namespaceAdded(tc.AddedNamespace)
} }
} if tc.UpdatedNamespace != nil {
controller.namespaceUpdated(nil, tc.UpdatedNamespace)
}
if tc.DeletedConfigMap != nil {
cmStore.Delete(tc.DeletedConfigMap)
controller.configMapDeleted(tc.DeletedConfigMap)
}
if tc.UpdatedConfigMap != nil {
cmStore.Add(tc.UpdatedConfigMap)
controller.configMapUpdated(nil, tc.UpdatedConfigMap)
}
for controller.queue.Len() != 0 {
controller.processNextWorkItem()
}
actions := client.Actions()
if reflect.DeepEqual(actions, tc.ExpectActions) {
t.Errorf("Unexpected actions:\n%s", diff.ObjectGoPrintDiff(actions, tc.ExpectActions))
}
})
} }
} }
var alwaysReady = func() bool { return true }
func defaultCrtConfigMapPtr(rootCA []byte) *v1.ConfigMap { func defaultCrtConfigMapPtr(rootCA []byte) *v1.ConfigMap {
tmp := v1.ConfigMap{ tmp := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{