From 8b707016f909e8094ee2d5347b17372632186270 Mon Sep 17 00:00:00 2001 From: deads2k Date: Tue, 19 Apr 2016 08:45:00 -0400 Subject: [PATCH] convert daemonset controller to SharedInformer --- .../app/controllermanager.go | 2 +- .../controllermanager/controllermanager.go | 2 +- pkg/controller/daemon/controller.go | 53 ++++++++++++------- pkg/controller/daemon/controller_test.go | 2 +- 4 files changed, 36 insertions(+), 23 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index eac87640ca6..c52e860ad08 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -323,7 +323,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet). + go daemon.NewDaemonSetsController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet). Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 05c09f90c9d..4bad79bba52 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -248,7 +248,7 @@ func (s *CMServer) Run(_ []string) error { if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, s.LookupCacheSizeForDaemonSet). + go daemon.NewDaemonSetsControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, s.LookupCacheSizeForDaemonSet). Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop) } diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 5302cdda787..84c4c2dafde 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -69,6 +70,13 @@ type DaemonSetsController struct { eventRecorder record.EventRecorder podControl controller.PodControlInterface + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewDaemonSetsController(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedInformer + // An dsc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int @@ -86,7 +94,7 @@ type DaemonSetsController struct { // Watches changes to all daemon sets. dsController *framework.Controller // Watches changes to all pods - podController *framework.Controller + podController framework.ControllerInterface // Watches changes to all nodes. nodeController *framework.Controller // podStoreSynced returns true if the pod store has been synced at least once. @@ -99,7 +107,7 @@ type DaemonSetsController struct { queue *workqueue.Type } -func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { +func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -163,25 +171,18 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro }, }, ) + // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete // more pods until all the effects (expectations) of a daemon set's create/delete have been observed. - dsc.podStore.Store, dsc.podController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return dsc.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dsc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - framework.ResourceEventHandlerFuncs{ - AddFunc: dsc.addPod, - UpdateFunc: dsc.updatePod, - DeleteFunc: dsc.deletePod, - }, - ) + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: dsc.addPod, + UpdateFunc: dsc.updatePod, + DeleteFunc: dsc.deletePod, + }) + dsc.podStore.Store = podInformer.GetStore() + dsc.podController = podInformer.GetController() + dsc.podStoreSynced = podInformer.HasSynced + // Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change, dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer( &cache.ListWatch{ @@ -200,11 +201,18 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro }, ) dsc.syncHandler = dsc.syncDaemonSet - dsc.podStoreSynced = dsc.podController.HasSynced dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return dsc } +func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { + podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod()) + dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize) + dsc.internalPodInformer = podInformer + + return dsc +} + // Run begins watching and syncing daemon sets. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() @@ -215,6 +223,11 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(dsc.worker, time.Second, stopCh) } + + if dsc.internalPodInformer != nil { + go dsc.internalPodInformer.Run(stopCh) + } + <-stopCh glog.Infof("Shutting down Daemon Set Controller") dsc.queue.ShutDown() diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index 61b08c85ca6..9d3a47a6681 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -133,7 +133,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num func newTestController() (*DaemonSetsController, *controller.FakePodControl) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc, 0) + manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0) manager.podStoreSynced = alwaysReady podControl := &controller.FakePodControl{} manager.podControl = podControl