convert daemonset controller to SharedInformer

This commit is contained in:
deads2k
2016-04-19 08:45:00 -04:00
parent 767fa6913d
commit 8b707016f9
4 changed files with 36 additions and 23 deletions

View File

@@ -323,7 +323,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet).
go daemon.NewDaemonSetsController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet).
Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}

View File

@@ -248,7 +248,7 @@ func (s *CMServer) Run(_ []string) error {
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, s.LookupCacheSizeForDaemonSet).
go daemon.NewDaemonSetsControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, s.LookupCacheSizeForDaemonSet).
Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop)
}

View File

@@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@@ -69,6 +70,13 @@ type DaemonSetsController struct {
eventRecorder record.EventRecorder
podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewDaemonSetsController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer
// An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
@@ -86,7 +94,7 @@ type DaemonSetsController struct {
// Watches changes to all daemon sets.
dsController *framework.Controller
// Watches changes to all pods
podController *framework.Controller
podController framework.ControllerInterface
// Watches changes to all nodes.
nodeController *framework.Controller
// podStoreSynced returns true if the pod store has been synced at least once.
@@ -99,7 +107,7 @@ type DaemonSetsController struct {
queue *workqueue.Type
}
func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
@@ -163,25 +171,18 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro
},
},
)
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
dsc.podStore.Store, dsc.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dsc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dsc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
},
)
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
})
dsc.podStore.Store = podInformer.GetStore()
dsc.podController = podInformer.GetController()
dsc.podStoreSynced = podInformer.HasSynced
// Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change,
dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer(
&cache.ListWatch{
@@ -200,11 +201,18 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro
},
)
dsc.syncHandler = dsc.syncDaemonSet
dsc.podStoreSynced = dsc.podController.HasSynced
dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return dsc
}
func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
dsc.internalPodInformer = podInformer
return dsc
}
// Run begins watching and syncing daemon sets.
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
@@ -215,6 +223,11 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(dsc.worker, time.Second, stopCh)
}
if dsc.internalPodInformer != nil {
go dsc.internalPodInformer.Run(stopCh)
}
<-stopCh
glog.Infof("Shutting down Daemon Set Controller")
dsc.queue.ShutDown()

View File

@@ -133,7 +133,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc, 0)
manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0)
manager.podStoreSynced = alwaysReady
podControl := &controller.FakePodControl{}
manager.podControl = podControl