From 2794cf538cd9a22e34d1ab77125c808e20b8b0a3 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 20 Jul 2016 12:25:26 +0200 Subject: [PATCH] Use sharedPodInformer in ReplicaSet controller --- .../app/controllermanager.go | 2 +- .../controllermanager/controllermanager.go | 2 +- pkg/controller/replicaset/replica_set.go | 63 ++++++++++++------- pkg/controller/replicaset/replica_set_test.go | 37 +++++------ 4 files changed, 60 insertions(+), 44 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 5bc186de941..95653e48455 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -366,7 +366,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)). + go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)). Run(int(s.ConcurrentRSSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 7adad2caf79..0c0dc6780ea 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -268,7 +268,7 @@ func (s *CMServer) Run(_ []string) error { if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), s.resyncPeriod, replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)). + go replicaset.NewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), s.resyncPeriod, replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)). Run(int(s.ConcurrentRSSyncs), wait.NeverStop) } } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 6baa4aa4d6c..b879cca648b 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/metrics" @@ -68,6 +69,13 @@ type ReplicaSetController struct { kubeClient clientset.Interface podControl controller.PodControlInterface + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewReplicationManager(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedIndexInformer + // A ReplicaSet is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int @@ -84,7 +92,7 @@ type ReplicaSetController struct { // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all pods - podController *framework.Controller + podController framework.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool @@ -96,11 +104,18 @@ type ReplicaSetController struct { } // NewReplicaSetController creates a new ReplicaSetController. -func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { +func NewReplicaSetController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + return newReplicaSetController( + eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}), + podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) +} + +// newReplicaSetController configures a replica set controller with the specified event recorder +func newReplicaSetController(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) } @@ -109,7 +124,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, - Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}), + Recorder: eventRecorder, }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), @@ -173,27 +188,16 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro }, ) - rsc.podStore.Indexer, rsc.podController = framework.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return rsc.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return rsc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - framework.ResourceEventHandlerFuncs{ - AddFunc: rsc.addPod, - // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like - // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from - // local storage, so it should be ok. - UpdateFunc: rsc.updatePod, - DeleteFunc: rsc.deletePod, - }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: rsc.addPod, + // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like + // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from + // local storage, so it should be ok. + UpdateFunc: rsc.updatePod, + DeleteFunc: rsc.deletePod, + }) + rsc.podStore.Indexer = podInformer.GetIndexer() + rsc.podController = podInformer.GetController() rsc.syncHandler = rsc.syncReplicaSet rsc.podStoreSynced = rsc.podController.HasSynced @@ -201,6 +205,14 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro return rsc } +// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. +func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { + podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) + rsc.internalPodInformer = podInformer + return rsc +} + // SetEventRecorder replaces the event recorder used by the ReplicaSetController // with the given recorder. Only used for testing. func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) { @@ -217,6 +229,9 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(rsc.worker, time.Second, stopCh) } + if rsc.internalPodInformer != nil { + go rsc.internalPodInformer.Run(stopCh) + } <-stopCh glog.Infof("Shutting down ReplicaSet Controller") rsc.queue.ShutDown() diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 72dd401fcb7..be1bdb1f635 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -139,7 +139,7 @@ type serverResponse struct { func TestSyncReplicaSetDoesNothing(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // 2 running pods, a controller with 2 replicas, sync is a no-op @@ -156,7 +156,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { func TestSyncReplicaSetDeletes(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -173,7 +173,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -206,7 +206,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicaSetCreates(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // A controller with 2 replicas and no pods in the store, 2 creates expected @@ -229,7 +229,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // Steady state for the ReplicaSet, no Status.Replicas updates expected @@ -272,7 +272,7 @@ func TestControllerUpdateReplicas(t *testing.T) { defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // Insufficient number of pods in the system, and Status.Replicas is wrong; @@ -318,7 +318,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -366,7 +366,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { } func TestPodControllerLookup(t *testing.T) { - manager := NewReplicaSetController(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady testCases := []struct { inRSs []*extensions.ReplicaSet @@ -434,7 +434,7 @@ func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() client := &fake.Clientset{} client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady var testRSSpec extensions.ReplicaSet @@ -477,7 +477,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() client := &fake.Clientset{} client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady // Put one ReplicaSet and one pod into the controller's stores @@ -505,6 +505,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go manager.podController.Run(stopCh) + go manager.internalPodInformer.Run(stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh) pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec, "pod") @@ -520,7 +521,7 @@ func TestWatchPods(t *testing.T) { } func TestUpdatePods(t *testing.T) { - manager := NewReplicaSetController(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady received := make(chan string) @@ -597,7 +598,7 @@ func TestControllerUpdateRequeue(t *testing.T) { defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.podStoreSynced = alwaysReady labelMap := map[string]string{"foo": "bar"} @@ -678,7 +679,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, burstReplicas, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, burstReplicas, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -830,7 +831,7 @@ func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool { func TestRSSyncExpectations(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -855,7 +856,7 @@ func TestRSSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 10, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0) manager.podStoreSynced = alwaysReady rs := newReplicaSet(1, map[string]string{"foo": "bar"}) @@ -898,7 +899,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { func TestRSManagerNotReady(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 2, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0) manager.podControl = &fakePodControl manager.podStoreSynced = func() bool { return false } @@ -937,7 +938,7 @@ func TestOverlappingRSs(t *testing.T) { labelMap := map[string]string{"foo": "bar"} for i := 0; i < 5; i++ { - manager := NewReplicaSetController(client, controller.NoResyncPeriodFunc, 10, 0) + manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0) manager.podStoreSynced = alwaysReady // Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store @@ -967,7 +968,7 @@ func TestOverlappingRSs(t *testing.T) { func TestDeletionTimestamp(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) labelMap := map[string]string{"foo": "bar"} - manager := NewReplicaSetController(c, controller.NoResyncPeriodFunc, 10, 0) + manager := NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) manager.podStoreSynced = alwaysReady rs := newReplicaSet(1, labelMap)