From b471398f1f0d5908fee6dbdac06ebbeeefa34faf Mon Sep 17 00:00:00 2001 From: deads2k Date: Fri, 7 Oct 2016 16:31:34 -0400 Subject: [PATCH] convert replica set controller to shared informer --- .../app/controllermanager.go | 2 +- pkg/controller/replicaset/replica_set.go | 117 ++--- pkg/controller/replicaset/replica_set_test.go | 468 ++++++++++-------- .../replicaset/replica_set_utils.go | 11 + pkg/util/testing/fake_handler.go | 16 + .../integration/replicaset/replicaset_test.go | 29 +- 6 files changed, 348 insertions(+), 295 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index c258ab7ceba..d7ab91c8a18 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -422,7 +422,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), client("replicaset-controller"), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). + go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). Run(int(s.ConcurrentRSSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 5a8acb98e9e..4ad5b6314ef 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -38,13 +38,11 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" - "k8s.io/kubernetes/pkg/watch" ) const ( @@ -75,13 +73,6 @@ type ReplicaSetController struct { kubeClient clientset.Interface podControl controller.PodControlInterface - // internalPodInformer is used to hold a personal informer. If we're using - // a normal shared informer, then the informer will be started for us. If - // we have a personal informer, we must start it ourselves. If you start - // the controller using NewReplicationManager(passing SharedInformer), this - // will be null - internalPodInformer cache.SharedIndexInformer - // A ReplicaSet is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int @@ -92,16 +83,12 @@ type ReplicaSetController struct { expectations *controller.UIDTrackingControllerExpectations // A store of ReplicaSets, populated by the rsController - rsStore cache.StoreToReplicaSetLister - // Watches changes to all ReplicaSets - rsController *cache.Controller + rsLister *cache.StoreToReplicaSetLister // A store of pods, populated by the podController - podStore cache.StoreToPodLister - // Watches changes to all pods - podController cache.ControllerInterface - // podStoreSynced returns true if the pod store has been synced at least once. + podLister *cache.StoreToPodLister + // podListerSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. - podStoreSynced func() bool + podListerSynced cache.InformerSynced lookupCache *controller.MatchingCache @@ -113,28 +100,20 @@ type ReplicaSetController struct { garbageCollectorEnabled bool } -// NewReplicaSetController creates a new ReplicaSetController. -func NewReplicaSetController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) - - return newReplicaSetController( - eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}), - podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) -} - -// newReplicaSetController configures a replica set controller with the specified event recorder -func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController { +// NewReplicaSetController configures a replica set controller with the specified event recorder +func NewReplicaSetController(rsInformer informers.ReplicaSetInformer, podInformer informers.PodInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) } + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) rsc := &ReplicaSetController{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, - Recorder: eventRecorder, + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}), }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), @@ -142,30 +121,15 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac garbageCollectorEnabled: garbageCollectorEnabled, } - rsc.rsStore.Indexer, rsc.rsController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).Watch(options) - }, - }, - &extensions.ReplicaSet{}, - // TODO: Can we have much longer period here? - FullControllerResyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: rsc.enqueueReplicaSet, - UpdateFunc: rsc.updateRS, - // This will enter the sync loop and no-op, because the replica set has been deleted from the store. - // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended - // way of achieving this is by performing a `stop` operation on the replica set. - DeleteFunc: rsc.enqueueReplicaSet, - }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: rsc.enqueueReplicaSet, + UpdateFunc: rsc.updateRS, + // This will enter the sync loop and no-op, because the replica set has been deleted from the store. + // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended + // way of achieving this is by performing a `stop` operation on the replica set. + DeleteFunc: rsc.enqueueReplicaSet, + }) + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rsc.addPod, // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from @@ -173,24 +137,15 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac UpdateFunc: rsc.updatePod, DeleteFunc: rsc.deletePod, }) - rsc.podStore.Indexer = podInformer.GetIndexer() - rsc.podController = podInformer.GetController() rsc.syncHandler = rsc.syncReplicaSet - rsc.podStoreSynced = rsc.podController.HasSynced + rsc.rsLister = rsInformer.Lister() + rsc.podLister = podInformer.Lister() + rsc.podListerSynced = podInformer.Informer().HasSynced rsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rsc } -// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. -func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { - podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) - garbageCollectorEnabled := false - rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) - rsc.internalPodInformer = podInformer - return rsc -} - // SetEventRecorder replaces the event recorder used by the ReplicaSetController // with the given recorder. Only used for testing. func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) { @@ -204,16 +159,16 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer rsc.queue.ShutDown() - go rsc.rsController.Run(stopCh) - go rsc.podController.Run(stopCh) + glog.Infof("Starting ReplicaSet controller") + + if !cache.WaitForCacheSync(stopCh, rsc.podListerSynced) { + return + } for i := 0; i < workers; i++ { go wait.Until(rsc.worker, time.Second, stopCh) } - if rsc.internalPodInformer != nil { - go rsc.internalPodInformer.Run(stopCh) - } <-stopCh glog.Infof("Shutting down ReplicaSet Controller") } @@ -236,7 +191,7 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl } // if not cached or cached value is invalid, search all the rs to find the matching one, and update cache - rss, err := rsc.rsStore.GetPodReplicaSets(pod) + rss, err := rsc.rsLister.GetPodReplicaSets(pod) if err != nil { glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name) return nil @@ -300,7 +255,7 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { // isCacheValid check if the cache is valid func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool { - _, err := rsc.rsStore.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name) + _, err := rsc.rsLister.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name) // rs has been deleted or updated, cache is invalid if err != nil || !isReplicaSetMatch(pod, cachedRS) { return false @@ -582,15 +537,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime)) }() - if !rsc.podStoreSynced() { - // Sleep so we give the pod reflector goroutine a chance to run. - time.Sleep(PodStoreSyncedPollPeriod) - glog.Infof("Waiting for pods controller to sync, requeuing ReplicaSet %v", key) - rsc.queue.Add(key) - return nil - } - - obj, exists, err := rsc.rsStore.Indexer.GetByKey(key) + obj, exists, err := rsc.rsLister.Indexer.GetByKey(key) if !exists { glog.V(4).Infof("ReplicaSet has been deleted %v", key) rsc.expectations.DeleteExpectations(key) @@ -624,7 +571,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { if rsc.garbageCollectorEnabled { // list all pods to include the pods that don't match the rs`s selector // anymore but has the stale controller ref. - pods, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything()) + pods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) if err != nil { return err } @@ -659,7 +606,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { return aggregate } } else { - pods, err := rsc.podStore.Pods(rs.Namespace).List(selector) + pods, err := rsc.podLister.Pods(rs.Namespace).List(selector) if err != nil { return err } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index ac082db6680..37e2da2702b 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -22,6 +22,7 @@ import ( "fmt" "math/rand" "net/http/httptest" + "net/url" "strings" "testing" "time" @@ -38,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/util/sets" @@ -47,6 +49,39 @@ import ( "k8s.io/kubernetes/pkg/watch" ) +func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int, lookupCacheSize int) *ReplicaSetController { + informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + ret := NewReplicaSetController(informers.ReplicaSets(), informers.Pods(), client, burstReplicas, lookupCacheSize, false) + informers.Start(stopCh) + return ret +} + +func filterInformerActions(actions []core.Action) []core.Action { + ret := []core.Action{} + for _, action := range actions { + if len(action.GetNamespace()) == 0 && + (action.Matches("list", "pods") || + action.Matches("list", "replicasets") || + action.Matches("watch", "pods") || + action.Matches("watch", "replicasets")) { + continue + } + ret = append(ret, action) + } + + return ret +} + +func skipListerFunc(verb string, url url.URL) bool { + if verb != "GET" { + return false + } + if strings.HasSuffix(url.Path, "/pods") || strings.HasSuffix(url.Path, "/replicasets") { + return true + } + return false +} + var alwaysReady = func() bool { return true } func getKey(rs *extensions.ReplicaSet, t *testing.T) string { @@ -161,14 +196,16 @@ type serverResponse struct { func TestSyncReplicaSetDoesNothing(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady // 2 running pods, a controller with 2 replicas, sync is a no-op labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(2, labelMap) - manager.rsStore.Indexer.Add(rsSpec) - newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") + manager.rsLister.Indexer.Add(rsSpec) + newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") manager.podControl = &fakePodControl manager.syncReplicaSet(getKey(rsSpec, t)) @@ -178,15 +215,17 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { func TestSyncReplicaSetDeletes(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(1, labelMap) - manager.rsStore.Indexer.Add(rsSpec) - newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") + manager.rsLister.Indexer.Add(rsSpec) + newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") manager.syncReplicaSet(getKey(rsSpec, t)) validateSyncReplicaSet(t, &fakePodControl, 0, 1, 0) @@ -195,8 +234,10 @@ func TestSyncReplicaSetDeletes(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady manager.podControl = &fakePodControl received := make(chan string) @@ -209,7 +250,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { // the controller matching the selectors of the deleted pod into the work queue. labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(1, labelMap) - manager.rsStore.Indexer.Add(rsSpec) + manager.rsLister.Indexer.Add(rsSpec) pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec, "pod") manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) @@ -228,13 +269,15 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicaSetCreates(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady // A controller with 2 replicas and no pods in the store, 2 creates expected labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager.rsStore.Indexer.Add(rs) + manager.rsLister.Indexer.Add(rs) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -245,22 +288,27 @@ func TestSyncReplicaSetCreates(t *testing.T) { func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { // Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "{}", + StatusCode: 200, + ResponseBody: "{}", + SkipRequestFn: skipListerFunc, } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady + manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} // Steady state for the ReplicaSet, no Status.Replicas updates expected activePods := 5 labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(activePods, labelMap) - manager.rsStore.Indexer.Add(rs) + manager.rsLister.Indexer.Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)} - newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod") + newPodList(manager.podLister.Indexer, activePods, api.PodRunning, labelMap, rs, "pod") fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -274,7 +322,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { // This response body is just so we don't err out decoding the http response, all // we care about is the request body sent below. response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) - fakeHandler.ResponseBody = response + fakeHandler.SetResponseBody(response) rs.Generation = rs.Generation + 1 manager.syncReplicaSet(getKey(rs, t)) @@ -287,15 +335,20 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { func TestControllerUpdateReplicas(t *testing.T) { // This is a happy server just to record the PUT request we expect for status.Replicas fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "{}", + StatusCode: 200, + ResponseBody: "{}", + SkipRequestFn: skipListerFunc, } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady + manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. @@ -303,15 +356,15 @@ func TestControllerUpdateReplicas(t *testing.T) { extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"} rs := newReplicaSet(5, labelMap) rs.Spec.Template.Labels = extraLabelMap - manager.rsStore.Indexer.Add(rs) + manager.rsLister.Indexer.Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0} rs.Generation = 1 - newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") - newPodList(manager.podStore.Indexer, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel") + newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rs, "pod") + newPodList(manager.podLister.Indexer, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel") // This response body is just so we don't err out decoding the http response response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) - fakeHandler.ResponseBody = response + fakeHandler.SetResponseBody(response) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -332,22 +385,27 @@ func TestControllerUpdateReplicas(t *testing.T) { func TestSyncReplicaSetDormancy(t *testing.T) { // Setup a test server so we can lie about the current state of pods fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "{}", + StatusCode: 200, + ResponseBody: "{}", + SkipRequestFn: skipListerFunc, } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady + manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} manager.podControl = &fakePodControl labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(2, labelMap) - manager.rsStore.Indexer.Add(rsSpec) - newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod") + manager.rsLister.Indexer.Add(rsSpec) + newPodList(manager.podLister.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod") // Creates a replica and sets expectations rsSpec.Status.Replicas = 1 @@ -394,8 +452,10 @@ func TestSyncReplicaSetDormancy(t *testing.T) { } func TestPodControllerLookup(t *testing.T) { - manager := NewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}), stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady testCases := []struct { inRSs []*extensions.ReplicaSet pod *api.Pod @@ -441,7 +501,7 @@ func TestPodControllerLookup(t *testing.T) { } for _, c := range testCases { for _, r := range c.inRSs { - manager.rsStore.Indexer.Add(r) + manager.rsLister.Indexer.Add(r) } if rs := manager.getPodReplicaSet(c.pod); rs != nil { if c.outRSName != rs.Name { @@ -461,9 +521,11 @@ type FakeWatcher struct { func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() client := &fake.Clientset{} - client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + client.AddWatchReactor("replicasets", core.DefaultWatchReactor(fakeWatch, nil)) + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady var testRSSpec extensions.ReplicaSet received := make(chan string) @@ -472,8 +534,7 @@ func TestWatchControllers(t *testing.T) { // and eventually into the syncHandler. The handler validates the received controller // and closes the received channel to indicate that the test can finish. manager.syncHandler = func(key string) error { - - obj, exists, err := manager.rsStore.Indexer.GetByKey(key) + obj, exists, err := manager.rsLister.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find replica set under key %v", key) } @@ -486,9 +547,6 @@ func TestWatchControllers(t *testing.T) { } // Start only the ReplicaSet watcher and the workqueue, send a watch event, // and make sure it hits the sync method. - stopCh := make(chan struct{}) - defer close(stopCh) - go manager.rsController.Run(stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh) testRSSpec.Name = "foo" @@ -504,20 +562,23 @@ func TestWatchControllers(t *testing.T) { func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() client := &fake.Clientset{} - client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + client.AddWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady + manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} // Put one ReplicaSet and one pod into the controller's stores labelMap := map[string]string{"foo": "bar"} testRSSpec := newReplicaSet(1, labelMap) - manager.rsStore.Indexer.Add(testRSSpec) + manager.rsLister.Indexer.Add(testRSSpec) received := make(chan string) // The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and // send it into the syncHandler. manager.syncHandler = func(key string) error { - - obj, exists, err := manager.rsStore.Indexer.GetByKey(key) + obj, exists, err := manager.rsLister.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find replica set under key %v", key) } @@ -530,10 +591,6 @@ func TestWatchPods(t *testing.T) { } // Start only the pod watcher and the workqueue, send a watch event, // and make sure it hits the sync method for the right ReplicaSet. - stopCh := make(chan struct{}) - defer close(stopCh) - go manager.podController.Run(stopCh) - go manager.internalPodInformer.Run(stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh) pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec, "pod") @@ -549,13 +606,15 @@ func TestWatchPods(t *testing.T) { } func TestUpdatePods(t *testing.T) { - manager := NewReplicaSetControllerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady received := make(chan string) manager.syncHandler = func(key string) error { - obj, exists, err := manager.rsStore.Indexer.GetByKey(key) + obj, exists, err := manager.rsLister.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find replica set under key %v", key) } @@ -563,24 +622,22 @@ func TestUpdatePods(t *testing.T) { return nil } - stopCh := make(chan struct{}) - defer close(stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh) // Put 2 ReplicaSets and one pod into the controller's stores labelMap1 := map[string]string{"foo": "bar"} testRSSpec1 := newReplicaSet(1, labelMap1) - manager.rsStore.Indexer.Add(testRSSpec1) + manager.rsLister.Indexer.Add(testRSSpec1) testRSSpec2 := *testRSSpec1 labelMap2 := map[string]string{"bar": "foo"} testRSSpec2.Spec.Selector = &unversioned.LabelSelector{MatchLabels: labelMap2} testRSSpec2.Name = "barfoo" - manager.rsStore.Indexer.Add(&testRSSpec2) + manager.rsLister.Indexer.Add(&testRSSpec2) - // case 1: We put in the podStore a pod with labels matching testRSSpec1, + // case 1: We put in the podLister a pod with labels matching testRSSpec1, // then update its labels to match testRSSpec2. We expect to receive a sync // request for both replica sets. - pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1 := newPodList(manager.podLister.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] pod1.ResourceVersion = "1" pod2 := pod1 pod2.Labels = labelMap2 @@ -599,7 +656,7 @@ func TestUpdatePods(t *testing.T) { } } - // case 2: pod1 in the podStore has labels matching testRSSpec1. We update + // case 2: pod1 in the podLister has labels matching testRSSpec1. We update // its labels to match no replica set. We expect to receive a sync request // for testRSSpec1. pod2.Labels = make(map[string]string) @@ -622,21 +679,26 @@ func TestUpdatePods(t *testing.T) { func TestControllerUpdateRequeue(t *testing.T) { // This server should force a requeue of the controller because it fails to update status.Replicas. fakeHandler := utiltesting.FakeHandler{ - StatusCode: 500, - ResponseBody: "{}", + StatusCode: 500, + ResponseBody: "{}", + SkipRequestFn: skipListerFunc, } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() + stopCh := make(chan struct{}) + defer close(stopCh) client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady + manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(1, labelMap) - manager.rsStore.Indexer.Add(rs) + manager.rsLister.Indexer.Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: 2} - newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rs, "pod") + newPodList(manager.podLister.Indexer, 1, api.PodRunning, labelMap, rs, "pod") fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -698,13 +760,15 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, burstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas, 0) + manager.podListerSynced = alwaysReady manager.podControl = &fakePodControl labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(numReplicas, labelMap) - manager.rsStore.Indexer.Add(rsSpec) + manager.rsLister.Indexer.Add(rsSpec) expectedPods := int32(0) pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec, "pod") @@ -718,14 +782,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) for _, replicas := range []int32{int32(numReplicas), 0} { rsSpec.Spec.Replicas = replicas - manager.rsStore.Indexer.Add(rsSpec) + manager.rsLister.Indexer.Add(rsSpec) for i := 0; i < numReplicas; i += burstReplicas { manager.syncReplicaSet(getKey(rsSpec, t)) // The store accrues active pods. It's also used by the ReplicaSet to determine how many // replicas to create. - activePods := int32(len(manager.podStore.Indexer.List())) + activePods := int32(len(manager.podLister.Indexer.List())) if replicas != 0 { // This is the number of pods currently "in flight". They were created by the // ReplicaSet controller above, which then puts the ReplicaSet to sleep till @@ -740,7 +804,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // This simulates the watch events for all but 1 of the expected pods. // None of these should wake the controller because it has expectations==BurstReplicas. for i := int32(0); i < expectedPods-1; i++ { - manager.podStore.Indexer.Add(&pods.Items[i]) + manager.podLister.Indexer.Add(&pods.Items[i]) manager.addPod(&pods.Items[i]) } @@ -776,7 +840,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // has exactly one expectation at the end, to verify that we // don't double delete. for i := range podsToDelete[1:] { - manager.podStore.Indexer.Delete(podsToDelete[i]) + manager.podLister.Indexer.Delete(podsToDelete[i]) manager.deletePod(podsToDelete[i]) } podExp, exists, err := manager.expectations.GetExpectations(rsKey) @@ -797,7 +861,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // The last add pod will decrease the expectation of the ReplicaSet to 0, // which will cause it to create/delete the remaining replicas up to burstReplicas. if replicas != 0 { - manager.podStore.Indexer.Add(&pods.Items[expectedPods-1]) + manager.podLister.Indexer.Add(&pods.Items[expectedPods-1]) manager.addPod(&pods.Items[expectedPods-1]) } else { expectedDel := manager.expectations.GetUIDs(getKey(rsSpec, t)) @@ -812,14 +876,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) Labels: rsSpec.Spec.Selector.MatchLabels, }, } - manager.podStore.Indexer.Delete(lastPod) + manager.podLister.Indexer.Delete(lastPod) manager.deletePod(lastPod) } pods.Items = pods.Items[expectedPods:] } // Confirm that we've created the right number of replicas - activePods := int32(len(manager.podStore.Indexer.List())) + activePods := int32(len(manager.podLister.Indexer.List())) if activePods != rsSpec.Spec.Replicas { t.Fatalf("Unexpected number of active pods, expected %d, got %d", rsSpec.Spec.Replicas, activePods) } @@ -850,15 +914,17 @@ func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool { func TestRSSyncExpectations(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, 2, 0) + manager.podListerSynced = alwaysReady manager.podControl = &fakePodControl labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(2, labelMap) - manager.rsStore.Indexer.Add(rsSpec) + manager.rsLister.Indexer.Add(rsSpec) pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod") - manager.podStore.Indexer.Add(&pods.Items[0]) + manager.podLister.Indexer.Add(&pods.Items[0]) postExpectationsPod := pods.Items[1] manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{ @@ -866,7 +932,7 @@ func TestRSSyncExpectations(t *testing.T) { // If we check active pods before checking expectataions, the // ReplicaSet will create a new replica because it doesn't see // this pod, but has fulfilled its expectations. - manager.podStore.Indexer.Add(&postExpectationsPod) + manager.podLister.Indexer.Add(&postExpectationsPod) }, }) manager.syncReplicaSet(getKey(rsSpec, t)) @@ -875,11 +941,13 @@ func TestRSSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0) + manager.podListerSynced = alwaysReady rs := newReplicaSet(1, map[string]string{"foo": "bar"}) - manager.rsStore.Indexer.Add(rs) + manager.rsLister.Indexer.Add(rs) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -901,7 +969,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { if !exists || err != nil { t.Errorf("No expectations found for ReplicaSet") } - manager.rsStore.Indexer.Delete(rs) + manager.rsLister.Indexer.Delete(rs) manager.syncReplicaSet(getKey(rs, t)) if _, exists, err = manager.expectations.GetExpectations(rsKey); exists { @@ -910,37 +978,11 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should have no effect, since we've deleted the ReplicaSet. podExp.Add(-1, 0) - manager.podStore.Indexer.Replace(make([]interface{}, 0), "0") + manager.podLister.Indexer.Replace(make([]interface{}, 0), "0") manager.syncReplicaSet(getKey(rs, t)) validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) } -func TestRSManagerNotReady(t *testing.T) { - client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - fakePodControl := controller.FakePodControl{} - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0) - manager.podControl = &fakePodControl - manager.podStoreSynced = func() bool { return false } - - // Simulates the ReplicaSet reflector running before the pod reflector. We don't - // want to end up creating replicas in this case until the pod reflector - // has synced, so the ReplicaSet controller should just requeue the ReplicaSet. - rsSpec := newReplicaSet(1, map[string]string{"foo": "bar"}) - manager.rsStore.Indexer.Add(rsSpec) - - rsKey := getKey(rsSpec, t) - manager.syncReplicaSet(rsKey) - validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) - queueRS, _ := manager.queue.Get() - if queueRS != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) - } - - manager.podStoreSynced = alwaysReady - manager.syncReplicaSet(rsKey) - validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) -} - // shuffle returns a new shuffled list of container controllers. func shuffle(controllers []*extensions.ReplicaSet) []*extensions.ReplicaSet { numControllers := len(controllers) @@ -957,41 +999,47 @@ func TestOverlappingRSs(t *testing.T) { labelMap := map[string]string{"foo": "bar"} for i := 0; i < 5; i++ { - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0) - manager.podStoreSynced = alwaysReady + func() { + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0) + manager.podListerSynced = alwaysReady - // Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store - var controllers []*extensions.ReplicaSet - for j := 1; j < 10; j++ { - rsSpec := newReplicaSet(1, labelMap) - rsSpec.CreationTimestamp = unversioned.Date(2014, time.December, j, 0, 0, 0, 0, time.Local) - rsSpec.Name = string(uuid.NewUUID()) - controllers = append(controllers, rsSpec) - } - shuffledControllers := shuffle(controllers) - for j := range shuffledControllers { - manager.rsStore.Indexer.Add(shuffledControllers[j]) - } - // Add a pod and make sure only the oldest ReplicaSet is synced - pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod") - rsKey := getKey(controllers[0], t) + // Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store + var controllers []*extensions.ReplicaSet + for j := 1; j < 10; j++ { + rsSpec := newReplicaSet(1, labelMap) + rsSpec.CreationTimestamp = unversioned.Date(2014, time.December, j, 0, 0, 0, 0, time.Local) + rsSpec.Name = string(uuid.NewUUID()) + controllers = append(controllers, rsSpec) + } + shuffledControllers := shuffle(controllers) + for j := range shuffledControllers { + manager.rsLister.Indexer.Add(shuffledControllers[j]) + } + // Add a pod and make sure only the oldest ReplicaSet is synced + pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod") + rsKey := getKey(controllers[0], t) - manager.addPod(&pods.Items[0]) - queueRS, _ := manager.queue.Get() - if queueRS != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) - } + manager.addPod(&pods.Items[0]) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) + } + }() } } func TestDeletionTimestamp(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) labelMap := map[string]string{"foo": "bar"} - manager := NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(c, stopCh, 10, 0) + manager.podListerSynced = alwaysReady rs := newReplicaSet(1, labelMap) - manager.rsStore.Indexer.Add(rs) + manager.rsLister.Indexer.Add(rs) rsKey, err := controller.KeyFunc(rs) if err != nil { t.Errorf("Couldn't get key for object %#v: %v", rs, err) @@ -1077,12 +1125,14 @@ func TestDeletionTimestamp(t *testing.T) { // setupManagerWithGCEnabled creates a RS manager with a fakePodControl // and with garbageCollectorEnabled set to true -func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) { +func setupManagerWithGCEnabled(stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) { c := fakeclientset.NewSimpleClientset(objs...) fakePodControl = &controller.FakePodControl{} - manager = NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) + manager = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas, 0) manager.garbageCollectorEnabled = true - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady + manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} manager.podControl = fakePodControl return manager, fakePodControl } @@ -1090,14 +1140,16 @@ func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicaSetContr func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager, fakePodControl := setupManagerWithGCEnabled(rs) - manager.rsStore.Indexer.Add(rs) + stopCh := make(chan struct{}) + defer close(stopCh) + manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs) + manager.rsLister.Indexer.Add(rs) var trueVar = true otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar} - // add to podStore a matching Pod controlled by another controller. Expect no patch. + // add to podLister a matching Pod controlled by another controller. Expect no patch. pod := newPod("pod", rs, api.PodRunning, nil) pod.OwnerReferences = []api.OwnerReference{otherControllerReference} - manager.podStore.Indexer.Add(pod) + manager.podLister.Indexer.Add(pod) err := manager.syncReplicaSet(getKey(rs, t)) if err != nil { t.Fatal(err) @@ -1109,15 +1161,17 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { func TestPatchPodWithOtherOwnerRef(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager, fakePodControl := setupManagerWithGCEnabled(rs) - manager.rsStore.Indexer.Add(rs) - // add to podStore one more matching pod that doesn't have a controller + stopCh := make(chan struct{}) + defer close(stopCh) + manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs) + manager.rsLister.Indexer.Add(rs) + // add to podLister one more matching pod that doesn't have a controller // ref, but has an owner ref pointing to other object. Expect a patch to // take control of it. unrelatedOwnerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"} pod := newPod("pod", rs, api.PodRunning, nil) pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference} - manager.podStore.Indexer.Add(pod) + manager.podLister.Indexer.Add(pod) err := manager.syncReplicaSet(getKey(rs, t)) if err != nil { @@ -1130,14 +1184,16 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) { func TestPatchPodWithCorrectOwnerRef(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager, fakePodControl := setupManagerWithGCEnabled(rs) - manager.rsStore.Indexer.Add(rs) - // add to podStore a matching pod that has an ownerRef pointing to the rs, + stopCh := make(chan struct{}) + defer close(stopCh) + manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs) + manager.rsLister.Indexer.Add(rs) + // add to podLister a matching pod that has an ownerRef pointing to the rs, // but ownerRef.Controller is false. Expect a patch to take control it. rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name} pod := newPod("pod", rs, api.PodRunning, nil) pod.OwnerReferences = []api.OwnerReference{rsOwnerReference} - manager.podStore.Indexer.Add(pod) + manager.podLister.Indexer.Add(pod) err := manager.syncReplicaSet(getKey(rs, t)) if err != nil { @@ -1150,12 +1206,14 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) { func TestPatchPodFails(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager, fakePodControl := setupManagerWithGCEnabled(rs) - manager.rsStore.Indexer.Add(rs) - // add to podStore two matching pods. Expect two patches to take control + stopCh := make(chan struct{}) + defer close(stopCh) + manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs) + manager.rsLister.Indexer.Add(rs) + // add to podLister two matching pods. Expect two patches to take control // them. - manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil)) - manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil)) // let both patches fail. The rs controller will assume it fails to take // control of the pods and create new ones. fakePodControl.Err = fmt.Errorf("Fake Error") @@ -1170,13 +1228,15 @@ func TestPatchPodFails(t *testing.T) { func TestPatchExtraPodsThenDelete(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager, fakePodControl := setupManagerWithGCEnabled(rs) - manager.rsStore.Indexer.Add(rs) - // add to podStore three matching pods. Expect three patches to take control + stopCh := make(chan struct{}) + defer close(stopCh) + manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs) + manager.rsLister.Indexer.Add(rs) + // add to podLister three matching pods. Expect three patches to take control // them, and later delete one of them. - manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil)) - manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil)) - manager.podStore.Indexer.Add(newPod("pod3", rs, api.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod3", rs, api.PodRunning, nil)) err := manager.syncReplicaSet(getKey(rs, t)) if err != nil { t.Fatal(err) @@ -1188,9 +1248,11 @@ func TestPatchExtraPodsThenDelete(t *testing.T) { func TestUpdateLabelsRemoveControllerRef(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager, fakePodControl := setupManagerWithGCEnabled(rs) - manager.rsStore.Indexer.Add(rs) - // put one pod in the podStore + stopCh := make(chan struct{}) + defer close(stopCh) + manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs) + manager.rsLister.Indexer.Add(rs) + // put one pod in the podLister pod := newPod("pod", rs, api.PodRunning, nil) pod.ResourceVersion = "1" var trueVar = true @@ -1203,7 +1265,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) { // add the updatedPod to the store. This is consistent with the behavior of // the Informer: Informer updates the store before call the handler // (updatePod() in this case). - manager.podStore.Indexer.Add(&updatedPod) + manager.podLister.Indexer.Add(&updatedPod) // send a update of the same pod with modified labels manager.updatePod(pod, &updatedPod) // verifies that rs is added to the queue @@ -1227,16 +1289,18 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) { func TestUpdateSelectorControllerRef(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager, fakePodControl := setupManagerWithGCEnabled(rs) - // put 2 pods in the podStore - newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") + stopCh := make(chan struct{}) + defer close(stopCh) + manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs) + // put 2 pods in the podLister + newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rs, "pod") // update the RS so that its selector no longer matches the pods updatedRS := *rs updatedRS.Spec.Selector.MatchLabels = map[string]string{"foo": "baz"} // put the updatedRS into the store. This is consistent with the behavior of // the Informer: Informer updates the store before call the handler // (updateRS() in this case). - manager.rsStore.Indexer.Add(&updatedRS) + manager.rsLister.Indexer.Add(&updatedRS) manager.updateRS(rs, &updatedRS) // verifies that the rs is added to the queue rsKey := getKey(rs, t) @@ -1261,12 +1325,14 @@ func TestUpdateSelectorControllerRef(t *testing.T) { func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) - manager, fakePodControl := setupManagerWithGCEnabled(rs) + stopCh := make(chan struct{}) + defer close(stopCh) + manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs) now := unversioned.Now() rs.DeletionTimestamp = &now - manager.rsStore.Indexer.Add(rs) + manager.rsLister.Indexer.Add(rs) pod1 := newPod("pod1", rs, api.PodRunning, nil) - manager.podStore.Indexer.Add(pod1) + manager.podLister.Indexer.Add(pod1) // no patch, no create err := manager.syncReplicaSet(getKey(rs, t)) @@ -1279,29 +1345,34 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { func TestReadyReplicas(t *testing.T) { // This is a happy server just to record the PUT request we expect for status.Replicas fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "{}", + StatusCode: 200, + ResponseBody: "{}", + SkipRequestFn: skipListerFunc, } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady + manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} // Status.Replica should update to match number of pods in system, 1 new pod should be created. labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(2, labelMap) rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1} rs.Generation = 1 - manager.rsStore.Indexer.Add(rs) + manager.rsLister.Indexer.Add(rs) - newPodList(manager.podStore.Indexer, 2, api.PodPending, labelMap, rs, "pod") - newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") + newPodList(manager.podLister.Indexer, 2, api.PodPending, labelMap, rs, "pod") + newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rs, "pod") // This response body is just so we don't err out decoding the http response response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) - fakeHandler.ResponseBody = response + fakeHandler.SetResponseBody(response) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -1319,15 +1390,20 @@ func TestReadyReplicas(t *testing.T) { func TestAvailableReplicas(t *testing.T) { // This is a happy server just to record the PUT request we expect for status.Replicas fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: "{}", + StatusCode: 200, + ResponseBody: "{}", + SkipRequestFn: skipListerFunc, } testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion}}) - manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager.podListerSynced = alwaysReady + manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} // Status.Replica should update to match number of pods in system, 1 new pod should be created. labelMap := map[string]string{"foo": "bar"} @@ -1336,21 +1412,21 @@ func TestAvailableReplicas(t *testing.T) { rs.Generation = 1 // minReadySeconds set to 15s rs.Spec.MinReadySeconds = 15 - manager.rsStore.Indexer.Add(rs) + manager.rsLister.Indexer.Add(rs) // First pod becomes ready 20s ago moment := unversioned.Time{Time: time.Now().Add(-2e10)} pod := newPod("pod", rs, api.PodRunning, &moment) - manager.podStore.Indexer.Add(pod) + manager.podLister.Indexer.Add(pod) // Second pod becomes ready now otherMoment := unversioned.Now() otherPod := newPod("otherPod", rs, api.PodRunning, &otherMoment) - manager.podStore.Indexer.Add(otherPod) + manager.podLister.Indexer.Add(otherPod) // This response body is just so we don't err out decoding the http response response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) - fakeHandler.ResponseBody = response + fakeHandler.SetResponseBody(response) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl diff --git a/pkg/controller/replicaset/replica_set_utils.go b/pkg/controller/replicaset/replica_set_utils.go index 2a9eba24a41..77067aaf509 100644 --- a/pkg/controller/replicaset/replica_set_utils.go +++ b/pkg/controller/replicaset/replica_set_utils.go @@ -22,6 +22,8 @@ import ( "fmt" "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" ) @@ -38,6 +40,15 @@ func updateReplicaCount(rsClient unversionedextensions.ReplicaSetInterface, rs e rs.Generation == rs.Status.ObservedGeneration { return nil } + + // deep copy to avoid mutation now. + // TODO this method need some work. Retry on conflict probably, though I suspect this is stomping status to something it probably shouldn't + copyObj, err := api.Scheme.DeepCopy(rs) + if err != nil { + return err + } + rs = copyObj.(extensions.ReplicaSet) + // Save the generation number we acted on, otherwise we might wrongfully indicate // that we've seen a spec update when we retry. // TODO: This can clobber an update if we allow multiple agents to write to the diff --git a/pkg/util/testing/fake_handler.go b/pkg/util/testing/fake_handler.go index 64a357fd79d..71f8f534c93 100644 --- a/pkg/util/testing/fake_handler.go +++ b/pkg/util/testing/fake_handler.go @@ -52,11 +52,27 @@ type FakeHandler struct { lock sync.Mutex requestCount int hasBeenChecked bool + + SkipRequestFn func(verb string, url url.URL) bool +} + +func (f *FakeHandler) SetResponseBody(responseBody string) { + f.lock.Lock() + defer f.lock.Unlock() + f.ResponseBody = responseBody } func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { f.lock.Lock() defer f.lock.Unlock() + + if f.SkipRequestFn != nil && f.SkipRequestFn(request.Method, *request.URL) { + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(f.StatusCode) + response.Write([]byte(f.ResponseBody)) + return + } + f.requestCount++ if f.hasBeenChecked { panic("got request after having been validated") diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go index effe57cfda2..404017e47b3 100644 --- a/test/integration/replicaset/replicaset_test.go +++ b/test/integration/replicaset/replicaset_test.go @@ -127,7 +127,7 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa return ret, nil } -func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replicaset.ReplicaSetController, cache.SharedIndexInformer, clientset.Interface) { +func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replicaset.ReplicaSetController, cache.SharedIndexInformer, cache.SharedIndexInformer, clientset.Interface) { masterConfig := framework.NewIntegrationTestMasterConfig() _, s := framework.RunAMaster(masterConfig) @@ -137,14 +137,12 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl t.Fatalf("Error in create clientset: %v", err) } resyncPeriod := 12 * time.Hour - resyncPeriodFunc := func() time.Duration { - return resyncPeriod - } - podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) + informers := informers.NewSharedInformerFactory(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "rs-informers")), resyncPeriod) + rm := replicaset.NewReplicaSetController( - podInformer, + informers.ReplicaSets(), + informers.Pods(), internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")), - resyncPeriodFunc, replicaset.BurstReplicas, 4096, enableGarbageCollector, @@ -153,7 +151,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl if err != nil { t.Fatalf("Failed to create replicaset controller") } - return s, rm, podInformer, clientSet + return s, rm, informers.ReplicaSets().Informer(), informers.Pods().Informer(), clientSet } // wait for the podInformer to observe the pods. Call this function before @@ -223,7 +221,7 @@ func TestAdoption(t *testing.T) { }, } for i, tc := range testCases { - s, rm, podInformer, clientSet := rmSetup(t, true) + s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true) ns := framework.CreateTestingNamespace(fmt.Sprintf("rs-adoption-%d", i), s, t) defer framework.DeleteTestingNamespace(ns, s, t) @@ -243,6 +241,7 @@ func TestAdoption(t *testing.T) { } stopCh := make(chan struct{}) + go rsInformer.Run(stopCh) go podInformer.Run(stopCh) waitToObservePods(t, podInformer, 1) go rm.Run(5, stopCh) @@ -300,7 +299,7 @@ func TestUpdateSelectorToAdopt(t *testing.T) { // We have pod1, pod2 and rs. rs.spec.replicas=1. At first rs.Selector // matches pod1 only; change the selector to match pod2 as well. Verify // there is only one pod left. - s, rm, podInformer, clientSet := rmSetup(t, true) + s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true) ns := framework.CreateTestingNamespace("rs-update-selector-to-adopt", s, t) defer framework.DeleteTestingNamespace(ns, s, t) rs := newRS("rs", ns.Name, 1) @@ -314,6 +313,7 @@ func TestUpdateSelectorToAdopt(t *testing.T) { createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) stopCh := make(chan struct{}) + go rsInformer.Run(stopCh) go podInformer.Run(stopCh) go rm.Run(5, stopCh) waitRSStable(t, clientSet, rs, ns.Name) @@ -340,7 +340,7 @@ func TestUpdateSelectorToRemoveControllerRef(t *testing.T) { // matches pod1 and pod2; change the selector to match only pod1. Verify // that rs creates one more pod, so there are 3 pods. Also verify that // pod2's controllerRef is cleared. - s, rm, podInformer, clientSet := rmSetup(t, true) + s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true) ns := framework.CreateTestingNamespace("rs-update-selector-to-remove-controllerref", s, t) defer framework.DeleteTestingNamespace(ns, s, t) rs := newRS("rs", ns.Name, 2) @@ -351,6 +351,7 @@ func TestUpdateSelectorToRemoveControllerRef(t *testing.T) { createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) stopCh := make(chan struct{}) + go rsInformer.Run(stopCh) go podInformer.Run(stopCh) waitToObservePods(t, podInformer, 2) go rm.Run(5, stopCh) @@ -386,7 +387,7 @@ func TestUpdateLabelToRemoveControllerRef(t *testing.T) { // matches pod1 and pod2; change pod2's labels to non-matching. Verify // that rs creates one more pod, so there are 3 pods. Also verify that // pod2's controllerRef is cleared. - s, rm, podInformer, clientSet := rmSetup(t, true) + s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true) ns := framework.CreateTestingNamespace("rs-update-label-to-remove-controllerref", s, t) defer framework.DeleteTestingNamespace(ns, s, t) rs := newRS("rs", ns.Name, 2) @@ -395,6 +396,7 @@ func TestUpdateLabelToRemoveControllerRef(t *testing.T) { createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) stopCh := make(chan struct{}) + go rsInformer.Run(stopCh) go podInformer.Run(stopCh) go rm.Run(5, stopCh) waitRSStable(t, clientSet, rs, ns.Name) @@ -428,7 +430,7 @@ func TestUpdateLabelToBeAdopted(t *testing.T) { // matches pod1 only; change pod2's labels to be matching. Verify the RS // controller adopts pod2 and delete one of them, so there is only 1 pod // left. - s, rm, podInformer, clientSet := rmSetup(t, true) + s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true) ns := framework.CreateTestingNamespace("rs-update-label-to-be-adopted", s, t) defer framework.DeleteTestingNamespace(ns, s, t) rs := newRS("rs", ns.Name, 1) @@ -442,6 +444,7 @@ func TestUpdateLabelToBeAdopted(t *testing.T) { createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) stopCh := make(chan struct{}) + go rsInformer.Run(stopCh) go podInformer.Run(stopCh) go rm.Run(5, stopCh) waitRSStable(t, clientSet, rs, ns.Name)