From 99313cc3943ba30e04220273c5493bc204d6d096 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 17 Feb 2017 12:34:27 -0500 Subject: [PATCH] Switch namespace controller to shared informer --- cmd/kube-controller-manager/app/core.go | 9 ++- pkg/controller/namespace/BUILD | 5 +- .../namespace/namespace_controller.go | 59 ++++++++++--------- test/e2e_node/services/BUILD | 1 + .../e2e_node/services/namespace_controller.go | 11 +++- 5 files changed, 52 insertions(+), 33 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 665a198a7d3..5ff56b91495 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -127,7 +127,14 @@ func startNamespaceController(ctx ControllerContext) (bool, error) { return snapshot, nil } } - namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, ctx.Options.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) + namespaceController := namespacecontroller.NewNamespaceController( + namespaceKubeClient, + namespaceClientPool, + discoverResourcesFn, + ctx.NewInformerFactory.Core().V1().Namespaces(), + ctx.Options.NamespaceSyncPeriod.Duration, + v1.FinalizerKubernetes, + ) go namespaceController.Run(int(ctx.Options.ConcurrentNamespaceSyncs), ctx.Stop) return true, nil diff --git a/pkg/controller/namespace/BUILD b/pkg/controller/namespace/BUILD index a9a20c31626..5e787a5535f 100644 --- a/pkg/controller/namespace/BUILD +++ b/pkg/controller/namespace/BUILD @@ -17,15 +17,16 @@ go_library( deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/namespace/deletion:go_default_library", "//pkg/util/metrics:go_default_library", "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/dynamic", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/util/workqueue", diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 7de456523c3..734ddb2fd2d 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -17,18 +17,20 @@ limitations under the License. package namespace import ( + "fmt" "time" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/namespace/deletion" "k8s.io/kubernetes/pkg/util/metrics" @@ -52,10 +54,10 @@ type NamespaceController struct { kubeClient clientset.Interface // clientPool manages a pool of dynamic clients clientPool dynamic.ClientPool - // store that holds the namespaces - store cache.Store - // controller that observes the namespaces - controller cache.Controller + // lister that can list namespaces from a shared cache + lister corelisters.NamespaceLister + // returns true when the namespace cache is ready + listerSynced cache.InformerSynced // namespaces that have been queued up for processing by workers queue workqueue.RateLimitingInterface // function to list of preferred resources for namespace deletion @@ -71,6 +73,7 @@ func NewNamespaceController( kubeClient clientset.Interface, clientPool dynamic.ClientPool, discoverResourcesFn func() ([]*metav1.APIResourceList, error), + namespaceInformer coreinformers.NamespaceInformer, resyncPeriod time.Duration, finalizerToken v1.FinalizerName) *NamespaceController { @@ -88,18 +91,8 @@ func NewNamespaceController( metrics.RegisterMetricAndTrackRateLimiterUsage("namespace_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } - // configure the backing store/controller - store, controller := cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return kubeClient.Core().Namespaces().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return kubeClient.Core().Namespaces().Watch(options) - }, - }, - &v1.Namespace{}, - resyncPeriod, + // configure the namespace informer event handlers + namespaceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { namespace := obj.(*v1.Namespace) @@ -110,10 +103,11 @@ func NewNamespaceController( namespaceController.enqueueNamespace(namespace) }, }, + resyncPeriod, ) + namespaceController.lister = namespaceInformer.Lister() + namespaceController.listerSynced = namespaceInformer.Informer().HasSynced - namespaceController.store = store - namespaceController.controller = controller return namespaceController } @@ -122,7 +116,7 @@ func NewNamespaceController( func (nm *NamespaceController) enqueueNamespace(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } // delay processing namespace events to allow HA api servers to observe namespace deletion, @@ -175,28 +169,35 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) { startTime := time.Now() defer glog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Now().Sub(startTime)) - obj, exists, err := nm.store.GetByKey(key) - if !exists { + namespace, err := nm.lister.Get(key) + if errors.IsNotFound(err) { glog.Infof("Namespace has been deleted %v", key) return nil } if err != nil { - glog.Errorf("Unable to retrieve namespace %v from store: %v", key, err) - nm.queue.Add(key) + utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err)) return err } - namespace := obj.(*v1.Namespace) return nm.namespacedResourcesDeleter.Delete(namespace.Name) } // Run starts observing the system with the specified number of workers. func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - go nm.controller.Run(stopCh) + defer nm.queue.ShutDown() + + glog.Info("Starting the NamespaceController") + + if !cache.WaitForCacheSync(stopCh, nm.listerSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + for i := 0; i < workers; i++ { go wait.Until(nm.worker, time.Second, stopCh) } + <-stopCh - glog.Infof("Shutting down NamespaceController") - nm.queue.ShutDown() + + glog.Info("Shutting down NamespaceController") } diff --git a/test/e2e_node/services/BUILD b/test/e2e_node/services/BUILD index 09bcd4829ce..1a9ddb2590a 100644 --- a/test/e2e_node/services/BUILD +++ b/test/e2e_node/services/BUILD @@ -26,6 +26,7 @@ go_library( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/controller/namespace:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e_node/builder:go_default_library", diff --git a/test/e2e_node/services/namespace_controller.go b/test/e2e_node/services/namespace_controller.go index d13d1e38a8d..17036d92aed 100644 --- a/test/e2e_node/services/namespace_controller.go +++ b/test/e2e_node/services/namespace_controller.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/test/e2e/framework" ) @@ -57,7 +58,15 @@ func (n *NamespaceController) Start() error { } clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) discoverResourcesFn := client.Discovery().ServerPreferredNamespacedResources - nc := namespacecontroller.NewNamespaceController(client, clientPool, discoverResourcesFn, ncResyncPeriod, v1.FinalizerKubernetes) + informerFactory := informers.NewSharedInformerFactory(client, ncResyncPeriod) + nc := namespacecontroller.NewNamespaceController( + client, + clientPool, + discoverResourcesFn, + informerFactory.Core().V1().Namespaces(), + ncResyncPeriod, v1.FinalizerKubernetes, + ) + informerFactory.Start(n.stopCh) go nc.Run(ncConcurrency, n.stopCh) return nil }