convert replica set controller to shared informer

This commit is contained in:
deads2k 2016-10-07 16:31:34 -04:00
parent bcbdcd17f3
commit b471398f1f
6 changed files with 348 additions and 295 deletions

View File

@ -422,7 +422,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
if containsResource(resources, "replicasets") { if containsResource(resources, "replicasets") {
glog.Infof("Starting ReplicaSet controller") glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), client("replicaset-controller"), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
Run(int(s.ConcurrentRSSyncs), wait.NeverStop) Run(int(s.ConcurrentRSSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }

View File

@ -38,13 +38,11 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
) )
const ( const (
@ -75,13 +73,6 @@ type ReplicaSetController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
podControl controller.PodControlInterface podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewReplicationManager(passing SharedInformer), this
// will be null
internalPodInformer cache.SharedIndexInformer
// A ReplicaSet is temporarily suspended after creating/deleting these many replicas. // A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them. // It resumes normal action after observing the watch events for them.
burstReplicas int burstReplicas int
@ -92,16 +83,12 @@ type ReplicaSetController struct {
expectations *controller.UIDTrackingControllerExpectations expectations *controller.UIDTrackingControllerExpectations
// A store of ReplicaSets, populated by the rsController // A store of ReplicaSets, populated by the rsController
rsStore cache.StoreToReplicaSetLister rsLister *cache.StoreToReplicaSetLister
// Watches changes to all ReplicaSets
rsController *cache.Controller
// A store of pods, populated by the podController // A store of pods, populated by the podController
podStore cache.StoreToPodLister podLister *cache.StoreToPodLister
// Watches changes to all pods // podListerSynced returns true if the pod store has been synced at least once.
podController cache.ControllerInterface
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool podListerSynced cache.InformerSynced
lookupCache *controller.MatchingCache lookupCache *controller.MatchingCache
@ -113,28 +100,20 @@ type ReplicaSetController struct {
garbageCollectorEnabled bool garbageCollectorEnabled bool
} }
// NewReplicaSetController creates a new ReplicaSetController. // NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController { func NewReplicaSetController(rsInformer informers.ReplicaSetInformer, podInformer informers.PodInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
return newReplicaSetController(
eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}),
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
}
// newReplicaSetController configures a replica set controller with the specified event recorder
func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil { if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter()) metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
} }
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
rsc := &ReplicaSetController{ rsc := &ReplicaSetController{
kubeClient: kubeClient, kubeClient: kubeClient,
podControl: controller.RealPodControl{ podControl: controller.RealPodControl{
KubeClient: kubeClient, KubeClient: kubeClient,
Recorder: eventRecorder, Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}),
}, },
burstReplicas: burstReplicas, burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
@ -142,30 +121,15 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac
garbageCollectorEnabled: garbageCollectorEnabled, garbageCollectorEnabled: garbageCollectorEnabled,
} }
rsc.rsStore.Indexer, rsc.rsController = cache.NewIndexerInformer( rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
&cache.ListWatch{ AddFunc: rsc.enqueueReplicaSet,
ListFunc: func(options api.ListOptions) (runtime.Object, error) { UpdateFunc: rsc.updateRS,
return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options) // This will enter the sync loop and no-op, because the replica set has been deleted from the store.
}, // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { // way of achieving this is by performing a `stop` operation on the replica set.
return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).Watch(options) DeleteFunc: rsc.enqueueReplicaSet,
}, })
}, podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
&extensions.ReplicaSet{},
// TODO: Can we have much longer period here?
FullControllerResyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod, AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
@ -173,24 +137,15 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac
UpdateFunc: rsc.updatePod, UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod, DeleteFunc: rsc.deletePod,
}) })
rsc.podStore.Indexer = podInformer.GetIndexer()
rsc.podController = podInformer.GetController()
rsc.syncHandler = rsc.syncReplicaSet rsc.syncHandler = rsc.syncReplicaSet
rsc.podStoreSynced = rsc.podController.HasSynced rsc.rsLister = rsInformer.Lister()
rsc.podLister = podInformer.Lister()
rsc.podListerSynced = podInformer.Informer().HasSynced
rsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) rsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rsc return rsc
} }
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false
rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rsc.internalPodInformer = podInformer
return rsc
}
// SetEventRecorder replaces the event recorder used by the ReplicaSetController // SetEventRecorder replaces the event recorder used by the ReplicaSetController
// with the given recorder. Only used for testing. // with the given recorder. Only used for testing.
func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) { func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
@ -204,16 +159,16 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown() defer rsc.queue.ShutDown()
go rsc.rsController.Run(stopCh) glog.Infof("Starting ReplicaSet controller")
go rsc.podController.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, rsc.podListerSynced) {
return
}
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh) go wait.Until(rsc.worker, time.Second, stopCh)
} }
if rsc.internalPodInformer != nil {
go rsc.internalPodInformer.Run(stopCh)
}
<-stopCh <-stopCh
glog.Infof("Shutting down ReplicaSet Controller") glog.Infof("Shutting down ReplicaSet Controller")
} }
@ -236,7 +191,7 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl
} }
// if not cached or cached value is invalid, search all the rs to find the matching one, and update cache // if not cached or cached value is invalid, search all the rs to find the matching one, and update cache
rss, err := rsc.rsStore.GetPodReplicaSets(pod) rss, err := rsc.rsLister.GetPodReplicaSets(pod)
if err != nil { if err != nil {
glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name) glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name)
return nil return nil
@ -300,7 +255,7 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
// isCacheValid check if the cache is valid // isCacheValid check if the cache is valid
func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool { func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool {
_, err := rsc.rsStore.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name) _, err := rsc.rsLister.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name)
// rs has been deleted or updated, cache is invalid // rs has been deleted or updated, cache is invalid
if err != nil || !isReplicaSetMatch(pod, cachedRS) { if err != nil || !isReplicaSetMatch(pod, cachedRS) {
return false return false
@ -582,15 +537,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime)) glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime))
}() }()
if !rsc.podStoreSynced() { obj, exists, err := rsc.rsLister.Indexer.GetByKey(key)
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing ReplicaSet %v", key)
rsc.queue.Add(key)
return nil
}
obj, exists, err := rsc.rsStore.Indexer.GetByKey(key)
if !exists { if !exists {
glog.V(4).Infof("ReplicaSet has been deleted %v", key) glog.V(4).Infof("ReplicaSet has been deleted %v", key)
rsc.expectations.DeleteExpectations(key) rsc.expectations.DeleteExpectations(key)
@ -624,7 +571,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
if rsc.garbageCollectorEnabled { if rsc.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rs`s selector // list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref. // anymore but has the stale controller ref.
pods, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything()) pods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil { if err != nil {
return err return err
} }
@ -659,7 +606,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
return aggregate return aggregate
} }
} else { } else {
pods, err := rsc.podStore.Pods(rs.Namespace).List(selector) pods, err := rsc.podLister.Pods(rs.Namespace).List(selector)
if err != nil { if err != nil {
return err return err
} }

View File

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net/http/httptest" "net/http/httptest"
"net/url"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -38,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -47,6 +49,39 @@ import (
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
ret := NewReplicaSetController(informers.ReplicaSets(), informers.Pods(), client, burstReplicas, lookupCacheSize, false)
informers.Start(stopCh)
return ret
}
func filterInformerActions(actions []core.Action) []core.Action {
ret := []core.Action{}
for _, action := range actions {
if len(action.GetNamespace()) == 0 &&
(action.Matches("list", "pods") ||
action.Matches("list", "replicasets") ||
action.Matches("watch", "pods") ||
action.Matches("watch", "replicasets")) {
continue
}
ret = append(ret, action)
}
return ret
}
func skipListerFunc(verb string, url url.URL) bool {
if verb != "GET" {
return false
}
if strings.HasSuffix(url.Path, "/pods") || strings.HasSuffix(url.Path, "/replicasets") {
return true
}
return false
}
var alwaysReady = func() bool { return true } var alwaysReady = func() bool { return true }
func getKey(rs *extensions.ReplicaSet, t *testing.T) string { func getKey(rs *extensions.ReplicaSet, t *testing.T) string {
@ -161,14 +196,16 @@ type serverResponse struct {
func TestSyncReplicaSetDoesNothing(t *testing.T) { func TestSyncReplicaSetDoesNothing(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
// 2 running pods, a controller with 2 replicas, sync is a no-op // 2 running pods, a controller with 2 replicas, sync is a no-op
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap) rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Indexer.Add(rsSpec) manager.rsLister.Indexer.Add(rsSpec)
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.syncReplicaSet(getKey(rsSpec, t)) manager.syncReplicaSet(getKey(rsSpec, t))
@ -178,15 +215,17 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
func TestSyncReplicaSetDeletes(t *testing.T) { func TestSyncReplicaSetDeletes(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
// 2 running pods and a controller with 1 replica, one pod delete expected // 2 running pods and a controller with 1 replica, one pod delete expected
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(1, labelMap) rsSpec := newReplicaSet(1, labelMap)
manager.rsStore.Indexer.Add(rsSpec) manager.rsLister.Indexer.Add(rsSpec)
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
manager.syncReplicaSet(getKey(rsSpec, t)) manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 1, 0) validateSyncReplicaSet(t, &fakePodControl, 0, 1, 0)
@ -195,8 +234,10 @@ func TestSyncReplicaSetDeletes(t *testing.T) {
func TestDeleteFinalStateUnknown(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
received := make(chan string) received := make(chan string)
@ -209,7 +250,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
// the controller matching the selectors of the deleted pod into the work queue. // the controller matching the selectors of the deleted pod into the work queue.
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(1, labelMap) rsSpec := newReplicaSet(1, labelMap)
manager.rsStore.Indexer.Add(rsSpec) manager.rsLister.Indexer.Add(rsSpec)
pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec, "pod") pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec, "pod")
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
@ -228,13 +269,15 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
func TestSyncReplicaSetCreates(t *testing.T) { func TestSyncReplicaSetCreates(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
// A controller with 2 replicas and no pods in the store, 2 creates expected // A controller with 2 replicas and no pods in the store, 2 creates expected
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
manager.rsStore.Indexer.Add(rs) manager.rsLister.Indexer.Add(rs)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
@ -245,22 +288,27 @@ func TestSyncReplicaSetCreates(t *testing.T) {
func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state // Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state
fakeHandler := utiltesting.FakeHandler{ fakeHandler := utiltesting.FakeHandler{
StatusCode: 200, StatusCode: 200,
ResponseBody: "{}", ResponseBody: "{}",
SkipRequestFn: skipListerFunc,
} }
testServer := httptest.NewServer(&fakeHandler) testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close() defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
// Steady state for the ReplicaSet, no Status.Replicas updates expected // Steady state for the ReplicaSet, no Status.Replicas updates expected
activePods := 5 activePods := 5
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(activePods, labelMap) rs := newReplicaSet(activePods, labelMap)
manager.rsStore.Indexer.Add(rs) manager.rsLister.Indexer.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)} rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)}
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod") newPodList(manager.podLister.Indexer, activePods, api.PodRunning, labelMap, rs, "pod")
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
@ -274,7 +322,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// This response body is just so we don't err out decoding the http response, all // This response body is just so we don't err out decoding the http response, all
// we care about is the request body sent below. // we care about is the request body sent below.
response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
fakeHandler.ResponseBody = response fakeHandler.SetResponseBody(response)
rs.Generation = rs.Generation + 1 rs.Generation = rs.Generation + 1
manager.syncReplicaSet(getKey(rs, t)) manager.syncReplicaSet(getKey(rs, t))
@ -287,15 +335,20 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
func TestControllerUpdateReplicas(t *testing.T) { func TestControllerUpdateReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas // This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := utiltesting.FakeHandler{ fakeHandler := utiltesting.FakeHandler{
StatusCode: 200, StatusCode: 200,
ResponseBody: "{}", ResponseBody: "{}",
SkipRequestFn: skipListerFunc,
} }
testServer := httptest.NewServer(&fakeHandler) testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close() defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
// Insufficient number of pods in the system, and Status.Replicas is wrong; // Insufficient number of pods in the system, and Status.Replicas is wrong;
// Status.Replica should update to match number of pods in system, 1 new pod should be created. // Status.Replica should update to match number of pods in system, 1 new pod should be created.
@ -303,15 +356,15 @@ func TestControllerUpdateReplicas(t *testing.T) {
extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"} extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
rs := newReplicaSet(5, labelMap) rs := newReplicaSet(5, labelMap)
rs.Spec.Template.Labels = extraLabelMap rs.Spec.Template.Labels = extraLabelMap
manager.rsStore.Indexer.Add(rs) manager.rsLister.Indexer.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0} rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0}
rs.Generation = 1 rs.Generation = 1
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel") newPodList(manager.podLister.Indexer, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel")
// This response body is just so we don't err out decoding the http response // This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
fakeHandler.ResponseBody = response fakeHandler.SetResponseBody(response)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
@ -332,22 +385,27 @@ func TestControllerUpdateReplicas(t *testing.T) {
func TestSyncReplicaSetDormancy(t *testing.T) { func TestSyncReplicaSetDormancy(t *testing.T) {
// Setup a test server so we can lie about the current state of pods // Setup a test server so we can lie about the current state of pods
fakeHandler := utiltesting.FakeHandler{ fakeHandler := utiltesting.FakeHandler{
StatusCode: 200, StatusCode: 200,
ResponseBody: "{}", ResponseBody: "{}",
SkipRequestFn: skipListerFunc,
} }
testServer := httptest.NewServer(&fakeHandler) testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close() defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap) rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Indexer.Add(rsSpec) manager.rsLister.Indexer.Add(rsSpec)
newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod") newPodList(manager.podLister.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod")
// Creates a replica and sets expectations // Creates a replica and sets expectations
rsSpec.Status.Replicas = 1 rsSpec.Status.Replicas = 1
@ -394,8 +452,10 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
} }
func TestPodControllerLookup(t *testing.T) { func TestPodControllerLookup(t *testing.T) {
manager := NewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}), stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
testCases := []struct { testCases := []struct {
inRSs []*extensions.ReplicaSet inRSs []*extensions.ReplicaSet
pod *api.Pod pod *api.Pod
@ -441,7 +501,7 @@ func TestPodControllerLookup(t *testing.T) {
} }
for _, c := range testCases { for _, c := range testCases {
for _, r := range c.inRSs { for _, r := range c.inRSs {
manager.rsStore.Indexer.Add(r) manager.rsLister.Indexer.Add(r)
} }
if rs := manager.getPodReplicaSet(c.pod); rs != nil { if rs := manager.getPodReplicaSet(c.pod); rs != nil {
if c.outRSName != rs.Name { if c.outRSName != rs.Name {
@ -461,9 +521,11 @@ type FakeWatcher struct {
func TestWatchControllers(t *testing.T) { func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
client := &fake.Clientset{} client := &fake.Clientset{}
client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) client.AddWatchReactor("replicasets", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
var testRSSpec extensions.ReplicaSet var testRSSpec extensions.ReplicaSet
received := make(chan string) received := make(chan string)
@ -472,8 +534,7 @@ func TestWatchControllers(t *testing.T) {
// and eventually into the syncHandler. The handler validates the received controller // and eventually into the syncHandler. The handler validates the received controller
// and closes the received channel to indicate that the test can finish. // and closes the received channel to indicate that the test can finish.
manager.syncHandler = func(key string) error { manager.syncHandler = func(key string) error {
obj, exists, err := manager.rsLister.Indexer.GetByKey(key)
obj, exists, err := manager.rsStore.Indexer.GetByKey(key)
if !exists || err != nil { if !exists || err != nil {
t.Errorf("Expected to find replica set under key %v", key) t.Errorf("Expected to find replica set under key %v", key)
} }
@ -486,9 +547,6 @@ func TestWatchControllers(t *testing.T) {
} }
// Start only the ReplicaSet watcher and the workqueue, send a watch event, // Start only the ReplicaSet watcher and the workqueue, send a watch event,
// and make sure it hits the sync method. // and make sure it hits the sync method.
stopCh := make(chan struct{})
defer close(stopCh)
go manager.rsController.Run(stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
testRSSpec.Name = "foo" testRSSpec.Name = "foo"
@ -504,20 +562,23 @@ func TestWatchControllers(t *testing.T) {
func TestWatchPods(t *testing.T) { func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake() fakeWatch := watch.NewFake()
client := &fake.Clientset{} client := &fake.Clientset{}
client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) client.AddWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
// Put one ReplicaSet and one pod into the controller's stores // Put one ReplicaSet and one pod into the controller's stores
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
testRSSpec := newReplicaSet(1, labelMap) testRSSpec := newReplicaSet(1, labelMap)
manager.rsStore.Indexer.Add(testRSSpec) manager.rsLister.Indexer.Add(testRSSpec)
received := make(chan string) received := make(chan string)
// The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and // The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and
// send it into the syncHandler. // send it into the syncHandler.
manager.syncHandler = func(key string) error { manager.syncHandler = func(key string) error {
obj, exists, err := manager.rsLister.Indexer.GetByKey(key)
obj, exists, err := manager.rsStore.Indexer.GetByKey(key)
if !exists || err != nil { if !exists || err != nil {
t.Errorf("Expected to find replica set under key %v", key) t.Errorf("Expected to find replica set under key %v", key)
} }
@ -530,10 +591,6 @@ func TestWatchPods(t *testing.T) {
} }
// Start only the pod watcher and the workqueue, send a watch event, // Start only the pod watcher and the workqueue, send a watch event,
// and make sure it hits the sync method for the right ReplicaSet. // and make sure it hits the sync method for the right ReplicaSet.
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go manager.internalPodInformer.Run(stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec, "pod") pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec, "pod")
@ -549,13 +606,15 @@ func TestWatchPods(t *testing.T) {
} }
func TestUpdatePods(t *testing.T) { func TestUpdatePods(t *testing.T) {
manager := NewReplicaSetControllerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
received := make(chan string) received := make(chan string)
manager.syncHandler = func(key string) error { manager.syncHandler = func(key string) error {
obj, exists, err := manager.rsStore.Indexer.GetByKey(key) obj, exists, err := manager.rsLister.Indexer.GetByKey(key)
if !exists || err != nil { if !exists || err != nil {
t.Errorf("Expected to find replica set under key %v", key) t.Errorf("Expected to find replica set under key %v", key)
} }
@ -563,24 +622,22 @@ func TestUpdatePods(t *testing.T) {
return nil return nil
} }
stopCh := make(chan struct{})
defer close(stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
// Put 2 ReplicaSets and one pod into the controller's stores // Put 2 ReplicaSets and one pod into the controller's stores
labelMap1 := map[string]string{"foo": "bar"} labelMap1 := map[string]string{"foo": "bar"}
testRSSpec1 := newReplicaSet(1, labelMap1) testRSSpec1 := newReplicaSet(1, labelMap1)
manager.rsStore.Indexer.Add(testRSSpec1) manager.rsLister.Indexer.Add(testRSSpec1)
testRSSpec2 := *testRSSpec1 testRSSpec2 := *testRSSpec1
labelMap2 := map[string]string{"bar": "foo"} labelMap2 := map[string]string{"bar": "foo"}
testRSSpec2.Spec.Selector = &unversioned.LabelSelector{MatchLabels: labelMap2} testRSSpec2.Spec.Selector = &unversioned.LabelSelector{MatchLabels: labelMap2}
testRSSpec2.Name = "barfoo" testRSSpec2.Name = "barfoo"
manager.rsStore.Indexer.Add(&testRSSpec2) manager.rsLister.Indexer.Add(&testRSSpec2)
// case 1: We put in the podStore a pod with labels matching testRSSpec1, // case 1: We put in the podLister a pod with labels matching testRSSpec1,
// then update its labels to match testRSSpec2. We expect to receive a sync // then update its labels to match testRSSpec2. We expect to receive a sync
// request for both replica sets. // request for both replica sets.
pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] pod1 := newPodList(manager.podLister.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
pod1.ResourceVersion = "1" pod1.ResourceVersion = "1"
pod2 := pod1 pod2 := pod1
pod2.Labels = labelMap2 pod2.Labels = labelMap2
@ -599,7 +656,7 @@ func TestUpdatePods(t *testing.T) {
} }
} }
// case 2: pod1 in the podStore has labels matching testRSSpec1. We update // case 2: pod1 in the podLister has labels matching testRSSpec1. We update
// its labels to match no replica set. We expect to receive a sync request // its labels to match no replica set. We expect to receive a sync request
// for testRSSpec1. // for testRSSpec1.
pod2.Labels = make(map[string]string) pod2.Labels = make(map[string]string)
@ -622,21 +679,26 @@ func TestUpdatePods(t *testing.T) {
func TestControllerUpdateRequeue(t *testing.T) { func TestControllerUpdateRequeue(t *testing.T) {
// This server should force a requeue of the controller because it fails to update status.Replicas. // This server should force a requeue of the controller because it fails to update status.Replicas.
fakeHandler := utiltesting.FakeHandler{ fakeHandler := utiltesting.FakeHandler{
StatusCode: 500, StatusCode: 500,
ResponseBody: "{}", ResponseBody: "{}",
SkipRequestFn: skipListerFunc,
} }
testServer := httptest.NewServer(&fakeHandler) testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close() defer testServer.Close()
stopCh := make(chan struct{})
defer close(stopCh)
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady manager.podListerSynced = alwaysReady
manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(1, labelMap) rs := newReplicaSet(1, labelMap)
manager.rsStore.Indexer.Add(rs) manager.rsLister.Indexer.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2} rs.Status = extensions.ReplicaSetStatus{Replicas: 2}
newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rs, "pod") newPodList(manager.podLister.Indexer, 1, api.PodRunning, labelMap, rs, "pod")
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
@ -698,13 +760,15 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, burstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas, 0)
manager.podListerSynced = alwaysReady
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(numReplicas, labelMap) rsSpec := newReplicaSet(numReplicas, labelMap)
manager.rsStore.Indexer.Add(rsSpec) manager.rsLister.Indexer.Add(rsSpec)
expectedPods := int32(0) expectedPods := int32(0)
pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec, "pod") pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec, "pod")
@ -718,14 +782,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
for _, replicas := range []int32{int32(numReplicas), 0} { for _, replicas := range []int32{int32(numReplicas), 0} {
rsSpec.Spec.Replicas = replicas rsSpec.Spec.Replicas = replicas
manager.rsStore.Indexer.Add(rsSpec) manager.rsLister.Indexer.Add(rsSpec)
for i := 0; i < numReplicas; i += burstReplicas { for i := 0; i < numReplicas; i += burstReplicas {
manager.syncReplicaSet(getKey(rsSpec, t)) manager.syncReplicaSet(getKey(rsSpec, t))
// The store accrues active pods. It's also used by the ReplicaSet to determine how many // The store accrues active pods. It's also used by the ReplicaSet to determine how many
// replicas to create. // replicas to create.
activePods := int32(len(manager.podStore.Indexer.List())) activePods := int32(len(manager.podLister.Indexer.List()))
if replicas != 0 { if replicas != 0 {
// This is the number of pods currently "in flight". They were created by the // This is the number of pods currently "in flight". They were created by the
// ReplicaSet controller above, which then puts the ReplicaSet to sleep till // ReplicaSet controller above, which then puts the ReplicaSet to sleep till
@ -740,7 +804,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// This simulates the watch events for all but 1 of the expected pods. // This simulates the watch events for all but 1 of the expected pods.
// None of these should wake the controller because it has expectations==BurstReplicas. // None of these should wake the controller because it has expectations==BurstReplicas.
for i := int32(0); i < expectedPods-1; i++ { for i := int32(0); i < expectedPods-1; i++ {
manager.podStore.Indexer.Add(&pods.Items[i]) manager.podLister.Indexer.Add(&pods.Items[i])
manager.addPod(&pods.Items[i]) manager.addPod(&pods.Items[i])
} }
@ -776,7 +840,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// has exactly one expectation at the end, to verify that we // has exactly one expectation at the end, to verify that we
// don't double delete. // don't double delete.
for i := range podsToDelete[1:] { for i := range podsToDelete[1:] {
manager.podStore.Indexer.Delete(podsToDelete[i]) manager.podLister.Indexer.Delete(podsToDelete[i])
manager.deletePod(podsToDelete[i]) manager.deletePod(podsToDelete[i])
} }
podExp, exists, err := manager.expectations.GetExpectations(rsKey) podExp, exists, err := manager.expectations.GetExpectations(rsKey)
@ -797,7 +861,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// The last add pod will decrease the expectation of the ReplicaSet to 0, // The last add pod will decrease the expectation of the ReplicaSet to 0,
// which will cause it to create/delete the remaining replicas up to burstReplicas. // which will cause it to create/delete the remaining replicas up to burstReplicas.
if replicas != 0 { if replicas != 0 {
manager.podStore.Indexer.Add(&pods.Items[expectedPods-1]) manager.podLister.Indexer.Add(&pods.Items[expectedPods-1])
manager.addPod(&pods.Items[expectedPods-1]) manager.addPod(&pods.Items[expectedPods-1])
} else { } else {
expectedDel := manager.expectations.GetUIDs(getKey(rsSpec, t)) expectedDel := manager.expectations.GetUIDs(getKey(rsSpec, t))
@ -812,14 +876,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
Labels: rsSpec.Spec.Selector.MatchLabels, Labels: rsSpec.Spec.Selector.MatchLabels,
}, },
} }
manager.podStore.Indexer.Delete(lastPod) manager.podLister.Indexer.Delete(lastPod)
manager.deletePod(lastPod) manager.deletePod(lastPod)
} }
pods.Items = pods.Items[expectedPods:] pods.Items = pods.Items[expectedPods:]
} }
// Confirm that we've created the right number of replicas // Confirm that we've created the right number of replicas
activePods := int32(len(manager.podStore.Indexer.List())) activePods := int32(len(manager.podLister.Indexer.List()))
if activePods != rsSpec.Spec.Replicas { if activePods != rsSpec.Spec.Replicas {
t.Fatalf("Unexpected number of active pods, expected %d, got %d", rsSpec.Spec.Replicas, activePods) t.Fatalf("Unexpected number of active pods, expected %d, got %d", rsSpec.Spec.Replicas, activePods)
} }
@ -850,15 +914,17 @@ func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool {
func TestRSSyncExpectations(t *testing.T) { func TestRSSyncExpectations(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, 2, 0)
manager.podListerSynced = alwaysReady
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap) rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Indexer.Add(rsSpec) manager.rsLister.Indexer.Add(rsSpec)
pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod") pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod")
manager.podStore.Indexer.Add(&pods.Items[0]) manager.podLister.Indexer.Add(&pods.Items[0])
postExpectationsPod := pods.Items[1] postExpectationsPod := pods.Items[1]
manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{ manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{
@ -866,7 +932,7 @@ func TestRSSyncExpectations(t *testing.T) {
// If we check active pods before checking expectataions, the // If we check active pods before checking expectataions, the
// ReplicaSet will create a new replica because it doesn't see // ReplicaSet will create a new replica because it doesn't see
// this pod, but has fulfilled its expectations. // this pod, but has fulfilled its expectations.
manager.podStore.Indexer.Add(&postExpectationsPod) manager.podLister.Indexer.Add(&postExpectationsPod)
}, },
}) })
manager.syncReplicaSet(getKey(rsSpec, t)) manager.syncReplicaSet(getKey(rsSpec, t))
@ -875,11 +941,13 @@ func TestRSSyncExpectations(t *testing.T) {
func TestDeleteControllerAndExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0)
manager.podListerSynced = alwaysReady
rs := newReplicaSet(1, map[string]string{"foo": "bar"}) rs := newReplicaSet(1, map[string]string{"foo": "bar"})
manager.rsStore.Indexer.Add(rs) manager.rsLister.Indexer.Add(rs)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
@ -901,7 +969,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
if !exists || err != nil { if !exists || err != nil {
t.Errorf("No expectations found for ReplicaSet") t.Errorf("No expectations found for ReplicaSet")
} }
manager.rsStore.Indexer.Delete(rs) manager.rsLister.Indexer.Delete(rs)
manager.syncReplicaSet(getKey(rs, t)) manager.syncReplicaSet(getKey(rs, t))
if _, exists, err = manager.expectations.GetExpectations(rsKey); exists { if _, exists, err = manager.expectations.GetExpectations(rsKey); exists {
@ -910,37 +978,11 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
// This should have no effect, since we've deleted the ReplicaSet. // This should have no effect, since we've deleted the ReplicaSet.
podExp.Add(-1, 0) podExp.Add(-1, 0)
manager.podStore.Indexer.Replace(make([]interface{}, 0), "0") manager.podLister.Indexer.Replace(make([]interface{}, 0), "0")
manager.syncReplicaSet(getKey(rs, t)) manager.syncReplicaSet(getKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
} }
func TestRSManagerNotReady(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0)
manager.podControl = &fakePodControl
manager.podStoreSynced = func() bool { return false }
// Simulates the ReplicaSet reflector running before the pod reflector. We don't
// want to end up creating replicas in this case until the pod reflector
// has synced, so the ReplicaSet controller should just requeue the ReplicaSet.
rsSpec := newReplicaSet(1, map[string]string{"foo": "bar"})
manager.rsStore.Indexer.Add(rsSpec)
rsKey := getKey(rsSpec, t)
manager.syncReplicaSet(rsKey)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
queueRS, _ := manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
}
manager.podStoreSynced = alwaysReady
manager.syncReplicaSet(rsKey)
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
}
// shuffle returns a new shuffled list of container controllers. // shuffle returns a new shuffled list of container controllers.
func shuffle(controllers []*extensions.ReplicaSet) []*extensions.ReplicaSet { func shuffle(controllers []*extensions.ReplicaSet) []*extensions.ReplicaSet {
numControllers := len(controllers) numControllers := len(controllers)
@ -957,41 +999,47 @@ func TestOverlappingRSs(t *testing.T) {
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0) func() {
manager.podStoreSynced = alwaysReady stopCh := make(chan struct{})
defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0)
manager.podListerSynced = alwaysReady
// Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store // Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store
var controllers []*extensions.ReplicaSet var controllers []*extensions.ReplicaSet
for j := 1; j < 10; j++ { for j := 1; j < 10; j++ {
rsSpec := newReplicaSet(1, labelMap) rsSpec := newReplicaSet(1, labelMap)
rsSpec.CreationTimestamp = unversioned.Date(2014, time.December, j, 0, 0, 0, 0, time.Local) rsSpec.CreationTimestamp = unversioned.Date(2014, time.December, j, 0, 0, 0, 0, time.Local)
rsSpec.Name = string(uuid.NewUUID()) rsSpec.Name = string(uuid.NewUUID())
controllers = append(controllers, rsSpec) controllers = append(controllers, rsSpec)
} }
shuffledControllers := shuffle(controllers) shuffledControllers := shuffle(controllers)
for j := range shuffledControllers { for j := range shuffledControllers {
manager.rsStore.Indexer.Add(shuffledControllers[j]) manager.rsLister.Indexer.Add(shuffledControllers[j])
} }
// Add a pod and make sure only the oldest ReplicaSet is synced // Add a pod and make sure only the oldest ReplicaSet is synced
pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod") pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod")
rsKey := getKey(controllers[0], t) rsKey := getKey(controllers[0], t)
manager.addPod(&pods.Items[0]) manager.addPod(&pods.Items[0])
queueRS, _ := manager.queue.Get() queueRS, _ := manager.queue.Get()
if queueRS != rsKey { if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
} }
}()
} }
} }
func TestDeletionTimestamp(t *testing.T) { func TestDeletionTimestamp(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
manager := NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(c, stopCh, 10, 0)
manager.podListerSynced = alwaysReady
rs := newReplicaSet(1, labelMap) rs := newReplicaSet(1, labelMap)
manager.rsStore.Indexer.Add(rs) manager.rsLister.Indexer.Add(rs)
rsKey, err := controller.KeyFunc(rs) rsKey, err := controller.KeyFunc(rs)
if err != nil { if err != nil {
t.Errorf("Couldn't get key for object %#v: %v", rs, err) t.Errorf("Couldn't get key for object %#v: %v", rs, err)
@ -1077,12 +1125,14 @@ func TestDeletionTimestamp(t *testing.T) {
// setupManagerWithGCEnabled creates a RS manager with a fakePodControl // setupManagerWithGCEnabled creates a RS manager with a fakePodControl
// and with garbageCollectorEnabled set to true // and with garbageCollectorEnabled set to true
func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) { func setupManagerWithGCEnabled(stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) {
c := fakeclientset.NewSimpleClientset(objs...) c := fakeclientset.NewSimpleClientset(objs...)
fakePodControl = &controller.FakePodControl{} fakePodControl = &controller.FakePodControl{}
manager = NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas, 0)
manager.garbageCollectorEnabled = true manager.garbageCollectorEnabled = true
manager.podStoreSynced = alwaysReady manager.podListerSynced = alwaysReady
manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.podControl = fakePodControl manager.podControl = fakePodControl
return manager, fakePodControl return manager, fakePodControl
} }
@ -1090,14 +1140,16 @@ func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicaSetContr
func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
manager, fakePodControl := setupManagerWithGCEnabled(rs) stopCh := make(chan struct{})
manager.rsStore.Indexer.Add(rs) defer close(stopCh)
manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs)
manager.rsLister.Indexer.Add(rs)
var trueVar = true var trueVar = true
otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar} otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
// add to podStore a matching Pod controlled by another controller. Expect no patch. // add to podLister a matching Pod controlled by another controller. Expect no patch.
pod := newPod("pod", rs, api.PodRunning, nil) pod := newPod("pod", rs, api.PodRunning, nil)
pod.OwnerReferences = []api.OwnerReference{otherControllerReference} pod.OwnerReferences = []api.OwnerReference{otherControllerReference}
manager.podStore.Indexer.Add(pod) manager.podLister.Indexer.Add(pod)
err := manager.syncReplicaSet(getKey(rs, t)) err := manager.syncReplicaSet(getKey(rs, t))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1109,15 +1161,17 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
func TestPatchPodWithOtherOwnerRef(t *testing.T) { func TestPatchPodWithOtherOwnerRef(t *testing.T) {
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
manager, fakePodControl := setupManagerWithGCEnabled(rs) stopCh := make(chan struct{})
manager.rsStore.Indexer.Add(rs) defer close(stopCh)
// add to podStore one more matching pod that doesn't have a controller manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs)
manager.rsLister.Indexer.Add(rs)
// add to podLister one more matching pod that doesn't have a controller
// ref, but has an owner ref pointing to other object. Expect a patch to // ref, but has an owner ref pointing to other object. Expect a patch to
// take control of it. // take control of it.
unrelatedOwnerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"} unrelatedOwnerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"}
pod := newPod("pod", rs, api.PodRunning, nil) pod := newPod("pod", rs, api.PodRunning, nil)
pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference} pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference}
manager.podStore.Indexer.Add(pod) manager.podLister.Indexer.Add(pod)
err := manager.syncReplicaSet(getKey(rs, t)) err := manager.syncReplicaSet(getKey(rs, t))
if err != nil { if err != nil {
@ -1130,14 +1184,16 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) {
func TestPatchPodWithCorrectOwnerRef(t *testing.T) { func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
manager, fakePodControl := setupManagerWithGCEnabled(rs) stopCh := make(chan struct{})
manager.rsStore.Indexer.Add(rs) defer close(stopCh)
// add to podStore a matching pod that has an ownerRef pointing to the rs, manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs)
manager.rsLister.Indexer.Add(rs)
// add to podLister a matching pod that has an ownerRef pointing to the rs,
// but ownerRef.Controller is false. Expect a patch to take control it. // but ownerRef.Controller is false. Expect a patch to take control it.
rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name} rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name}
pod := newPod("pod", rs, api.PodRunning, nil) pod := newPod("pod", rs, api.PodRunning, nil)
pod.OwnerReferences = []api.OwnerReference{rsOwnerReference} pod.OwnerReferences = []api.OwnerReference{rsOwnerReference}
manager.podStore.Indexer.Add(pod) manager.podLister.Indexer.Add(pod)
err := manager.syncReplicaSet(getKey(rs, t)) err := manager.syncReplicaSet(getKey(rs, t))
if err != nil { if err != nil {
@ -1150,12 +1206,14 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
func TestPatchPodFails(t *testing.T) { func TestPatchPodFails(t *testing.T) {
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
manager, fakePodControl := setupManagerWithGCEnabled(rs) stopCh := make(chan struct{})
manager.rsStore.Indexer.Add(rs) defer close(stopCh)
// add to podStore two matching pods. Expect two patches to take control manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs)
manager.rsLister.Indexer.Add(rs)
// add to podLister two matching pods. Expect two patches to take control
// them. // them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil)) manager.podLister.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil)) manager.podLister.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil))
// let both patches fail. The rs controller will assume it fails to take // let both patches fail. The rs controller will assume it fails to take
// control of the pods and create new ones. // control of the pods and create new ones.
fakePodControl.Err = fmt.Errorf("Fake Error") fakePodControl.Err = fmt.Errorf("Fake Error")
@ -1170,13 +1228,15 @@ func TestPatchPodFails(t *testing.T) {
func TestPatchExtraPodsThenDelete(t *testing.T) { func TestPatchExtraPodsThenDelete(t *testing.T) {
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
manager, fakePodControl := setupManagerWithGCEnabled(rs) stopCh := make(chan struct{})
manager.rsStore.Indexer.Add(rs) defer close(stopCh)
// add to podStore three matching pods. Expect three patches to take control manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs)
manager.rsLister.Indexer.Add(rs)
// add to podLister three matching pods. Expect three patches to take control
// them, and later delete one of them. // them, and later delete one of them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil)) manager.podLister.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil)) manager.podLister.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod3", rs, api.PodRunning, nil)) manager.podLister.Indexer.Add(newPod("pod3", rs, api.PodRunning, nil))
err := manager.syncReplicaSet(getKey(rs, t)) err := manager.syncReplicaSet(getKey(rs, t))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1188,9 +1248,11 @@ func TestPatchExtraPodsThenDelete(t *testing.T) {
func TestUpdateLabelsRemoveControllerRef(t *testing.T) { func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
manager, fakePodControl := setupManagerWithGCEnabled(rs) stopCh := make(chan struct{})
manager.rsStore.Indexer.Add(rs) defer close(stopCh)
// put one pod in the podStore manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs)
manager.rsLister.Indexer.Add(rs)
// put one pod in the podLister
pod := newPod("pod", rs, api.PodRunning, nil) pod := newPod("pod", rs, api.PodRunning, nil)
pod.ResourceVersion = "1" pod.ResourceVersion = "1"
var trueVar = true var trueVar = true
@ -1203,7 +1265,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
// add the updatedPod to the store. This is consistent with the behavior of // add the updatedPod to the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler // the Informer: Informer updates the store before call the handler
// (updatePod() in this case). // (updatePod() in this case).
manager.podStore.Indexer.Add(&updatedPod) manager.podLister.Indexer.Add(&updatedPod)
// send a update of the same pod with modified labels // send a update of the same pod with modified labels
manager.updatePod(pod, &updatedPod) manager.updatePod(pod, &updatedPod)
// verifies that rs is added to the queue // verifies that rs is added to the queue
@ -1227,16 +1289,18 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
func TestUpdateSelectorControllerRef(t *testing.T) { func TestUpdateSelectorControllerRef(t *testing.T) {
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
manager, fakePodControl := setupManagerWithGCEnabled(rs) stopCh := make(chan struct{})
// put 2 pods in the podStore defer close(stopCh)
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs)
// put 2 pods in the podLister
newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
// update the RS so that its selector no longer matches the pods // update the RS so that its selector no longer matches the pods
updatedRS := *rs updatedRS := *rs
updatedRS.Spec.Selector.MatchLabels = map[string]string{"foo": "baz"} updatedRS.Spec.Selector.MatchLabels = map[string]string{"foo": "baz"}
// put the updatedRS into the store. This is consistent with the behavior of // put the updatedRS into the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler // the Informer: Informer updates the store before call the handler
// (updateRS() in this case). // (updateRS() in this case).
manager.rsStore.Indexer.Add(&updatedRS) manager.rsLister.Indexer.Add(&updatedRS)
manager.updateRS(rs, &updatedRS) manager.updateRS(rs, &updatedRS)
// verifies that the rs is added to the queue // verifies that the rs is added to the queue
rsKey := getKey(rs, t) rsKey := getKey(rs, t)
@ -1261,12 +1325,14 @@ func TestUpdateSelectorControllerRef(t *testing.T) {
func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
manager, fakePodControl := setupManagerWithGCEnabled(rs) stopCh := make(chan struct{})
defer close(stopCh)
manager, fakePodControl := setupManagerWithGCEnabled(stopCh, rs)
now := unversioned.Now() now := unversioned.Now()
rs.DeletionTimestamp = &now rs.DeletionTimestamp = &now
manager.rsStore.Indexer.Add(rs) manager.rsLister.Indexer.Add(rs)
pod1 := newPod("pod1", rs, api.PodRunning, nil) pod1 := newPod("pod1", rs, api.PodRunning, nil)
manager.podStore.Indexer.Add(pod1) manager.podLister.Indexer.Add(pod1)
// no patch, no create // no patch, no create
err := manager.syncReplicaSet(getKey(rs, t)) err := manager.syncReplicaSet(getKey(rs, t))
@ -1279,29 +1345,34 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
func TestReadyReplicas(t *testing.T) { func TestReadyReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas // This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := utiltesting.FakeHandler{ fakeHandler := utiltesting.FakeHandler{
StatusCode: 200, StatusCode: 200,
ResponseBody: "{}", ResponseBody: "{}",
SkipRequestFn: skipListerFunc,
} }
testServer := httptest.NewServer(&fakeHandler) testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close() defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
// Status.Replica should update to match number of pods in system, 1 new pod should be created. // Status.Replica should update to match number of pods in system, 1 new pod should be created.
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap) rs := newReplicaSet(2, labelMap)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1} rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1}
rs.Generation = 1 rs.Generation = 1
manager.rsStore.Indexer.Add(rs) manager.rsLister.Indexer.Add(rs)
newPodList(manager.podStore.Indexer, 2, api.PodPending, labelMap, rs, "pod") newPodList(manager.podLister.Indexer, 2, api.PodPending, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") newPodList(manager.podLister.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
// This response body is just so we don't err out decoding the http response // This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
fakeHandler.ResponseBody = response fakeHandler.SetResponseBody(response)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
@ -1319,15 +1390,20 @@ func TestReadyReplicas(t *testing.T) {
func TestAvailableReplicas(t *testing.T) { func TestAvailableReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas // This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := utiltesting.FakeHandler{ fakeHandler := utiltesting.FakeHandler{
StatusCode: 200, StatusCode: 200,
ResponseBody: "{}", ResponseBody: "{}",
SkipRequestFn: skipListerFunc,
} }
testServer := httptest.NewServer(&fakeHandler) testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close() defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) stopCh := make(chan struct{})
manager.podStoreSynced = alwaysReady defer close(stopCh)
manager := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0)
manager.podListerSynced = alwaysReady
manager.podLister = &cache.StoreToPodLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
manager.rsLister = &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}
// Status.Replica should update to match number of pods in system, 1 new pod should be created. // Status.Replica should update to match number of pods in system, 1 new pod should be created.
labelMap := map[string]string{"foo": "bar"} labelMap := map[string]string{"foo": "bar"}
@ -1336,21 +1412,21 @@ func TestAvailableReplicas(t *testing.T) {
rs.Generation = 1 rs.Generation = 1
// minReadySeconds set to 15s // minReadySeconds set to 15s
rs.Spec.MinReadySeconds = 15 rs.Spec.MinReadySeconds = 15
manager.rsStore.Indexer.Add(rs) manager.rsLister.Indexer.Add(rs)
// First pod becomes ready 20s ago // First pod becomes ready 20s ago
moment := unversioned.Time{Time: time.Now().Add(-2e10)} moment := unversioned.Time{Time: time.Now().Add(-2e10)}
pod := newPod("pod", rs, api.PodRunning, &moment) pod := newPod("pod", rs, api.PodRunning, &moment)
manager.podStore.Indexer.Add(pod) manager.podLister.Indexer.Add(pod)
// Second pod becomes ready now // Second pod becomes ready now
otherMoment := unversioned.Now() otherMoment := unversioned.Now()
otherPod := newPod("otherPod", rs, api.PodRunning, &otherMoment) otherPod := newPod("otherPod", rs, api.PodRunning, &otherMoment)
manager.podStore.Indexer.Add(otherPod) manager.podLister.Indexer.Add(otherPod)
// This response body is just so we don't err out decoding the http response // This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
fakeHandler.ResponseBody = response fakeHandler.SetResponseBody(response)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl

View File

@ -22,6 +22,8 @@ import (
"fmt" "fmt"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
) )
@ -38,6 +40,15 @@ func updateReplicaCount(rsClient unversionedextensions.ReplicaSetInterface, rs e
rs.Generation == rs.Status.ObservedGeneration { rs.Generation == rs.Status.ObservedGeneration {
return nil return nil
} }
// deep copy to avoid mutation now.
// TODO this method need some work. Retry on conflict probably, though I suspect this is stomping status to something it probably shouldn't
copyObj, err := api.Scheme.DeepCopy(rs)
if err != nil {
return err
}
rs = copyObj.(extensions.ReplicaSet)
// Save the generation number we acted on, otherwise we might wrongfully indicate // Save the generation number we acted on, otherwise we might wrongfully indicate
// that we've seen a spec update when we retry. // that we've seen a spec update when we retry.
// TODO: This can clobber an update if we allow multiple agents to write to the // TODO: This can clobber an update if we allow multiple agents to write to the

View File

@ -52,11 +52,27 @@ type FakeHandler struct {
lock sync.Mutex lock sync.Mutex
requestCount int requestCount int
hasBeenChecked bool hasBeenChecked bool
SkipRequestFn func(verb string, url url.URL) bool
}
func (f *FakeHandler) SetResponseBody(responseBody string) {
f.lock.Lock()
defer f.lock.Unlock()
f.ResponseBody = responseBody
} }
func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
if f.SkipRequestFn != nil && f.SkipRequestFn(request.Method, *request.URL) {
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(f.StatusCode)
response.Write([]byte(f.ResponseBody))
return
}
f.requestCount++ f.requestCount++
if f.hasBeenChecked { if f.hasBeenChecked {
panic("got request after having been validated") panic("got request after having been validated")

View File

@ -127,7 +127,7 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa
return ret, nil return ret, nil
} }
func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replicaset.ReplicaSetController, cache.SharedIndexInformer, clientset.Interface) { func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replicaset.ReplicaSetController, cache.SharedIndexInformer, cache.SharedIndexInformer, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig() masterConfig := framework.NewIntegrationTestMasterConfig()
_, s := framework.RunAMaster(masterConfig) _, s := framework.RunAMaster(masterConfig)
@ -137,14 +137,12 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl
t.Fatalf("Error in create clientset: %v", err) t.Fatalf("Error in create clientset: %v", err)
} }
resyncPeriod := 12 * time.Hour resyncPeriod := 12 * time.Hour
resyncPeriodFunc := func() time.Duration { informers := informers.NewSharedInformerFactory(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "rs-informers")), resyncPeriod)
return resyncPeriod
}
podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
rm := replicaset.NewReplicaSetController( rm := replicaset.NewReplicaSetController(
podInformer, informers.ReplicaSets(),
informers.Pods(),
internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")), internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")),
resyncPeriodFunc,
replicaset.BurstReplicas, replicaset.BurstReplicas,
4096, 4096,
enableGarbageCollector, enableGarbageCollector,
@ -153,7 +151,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl
if err != nil { if err != nil {
t.Fatalf("Failed to create replicaset controller") t.Fatalf("Failed to create replicaset controller")
} }
return s, rm, podInformer, clientSet return s, rm, informers.ReplicaSets().Informer(), informers.Pods().Informer(), clientSet
} }
// wait for the podInformer to observe the pods. Call this function before // wait for the podInformer to observe the pods. Call this function before
@ -223,7 +221,7 @@ func TestAdoption(t *testing.T) {
}, },
} }
for i, tc := range testCases { for i, tc := range testCases {
s, rm, podInformer, clientSet := rmSetup(t, true) s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace(fmt.Sprintf("rs-adoption-%d", i), s, t) ns := framework.CreateTestingNamespace(fmt.Sprintf("rs-adoption-%d", i), s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
@ -243,6 +241,7 @@ func TestAdoption(t *testing.T) {
} }
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go rsInformer.Run(stopCh)
go podInformer.Run(stopCh) go podInformer.Run(stopCh)
waitToObservePods(t, podInformer, 1) waitToObservePods(t, podInformer, 1)
go rm.Run(5, stopCh) go rm.Run(5, stopCh)
@ -300,7 +299,7 @@ func TestUpdateSelectorToAdopt(t *testing.T) {
// We have pod1, pod2 and rs. rs.spec.replicas=1. At first rs.Selector // We have pod1, pod2 and rs. rs.spec.replicas=1. At first rs.Selector
// matches pod1 only; change the selector to match pod2 as well. Verify // matches pod1 only; change the selector to match pod2 as well. Verify
// there is only one pod left. // there is only one pod left.
s, rm, podInformer, clientSet := rmSetup(t, true) s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("rs-update-selector-to-adopt", s, t) ns := framework.CreateTestingNamespace("rs-update-selector-to-adopt", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
rs := newRS("rs", ns.Name, 1) rs := newRS("rs", ns.Name, 1)
@ -314,6 +313,7 @@ func TestUpdateSelectorToAdopt(t *testing.T) {
createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go rsInformer.Run(stopCh)
go podInformer.Run(stopCh) go podInformer.Run(stopCh)
go rm.Run(5, stopCh) go rm.Run(5, stopCh)
waitRSStable(t, clientSet, rs, ns.Name) waitRSStable(t, clientSet, rs, ns.Name)
@ -340,7 +340,7 @@ func TestUpdateSelectorToRemoveControllerRef(t *testing.T) {
// matches pod1 and pod2; change the selector to match only pod1. Verify // matches pod1 and pod2; change the selector to match only pod1. Verify
// that rs creates one more pod, so there are 3 pods. Also verify that // that rs creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared. // pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true) s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("rs-update-selector-to-remove-controllerref", s, t) ns := framework.CreateTestingNamespace("rs-update-selector-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
rs := newRS("rs", ns.Name, 2) rs := newRS("rs", ns.Name, 2)
@ -351,6 +351,7 @@ func TestUpdateSelectorToRemoveControllerRef(t *testing.T) {
createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go rsInformer.Run(stopCh)
go podInformer.Run(stopCh) go podInformer.Run(stopCh)
waitToObservePods(t, podInformer, 2) waitToObservePods(t, podInformer, 2)
go rm.Run(5, stopCh) go rm.Run(5, stopCh)
@ -386,7 +387,7 @@ func TestUpdateLabelToRemoveControllerRef(t *testing.T) {
// matches pod1 and pod2; change pod2's labels to non-matching. Verify // matches pod1 and pod2; change pod2's labels to non-matching. Verify
// that rs creates one more pod, so there are 3 pods. Also verify that // that rs creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared. // pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true) s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("rs-update-label-to-remove-controllerref", s, t) ns := framework.CreateTestingNamespace("rs-update-label-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
rs := newRS("rs", ns.Name, 2) rs := newRS("rs", ns.Name, 2)
@ -395,6 +396,7 @@ func TestUpdateLabelToRemoveControllerRef(t *testing.T) {
createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go rsInformer.Run(stopCh)
go podInformer.Run(stopCh) go podInformer.Run(stopCh)
go rm.Run(5, stopCh) go rm.Run(5, stopCh)
waitRSStable(t, clientSet, rs, ns.Name) waitRSStable(t, clientSet, rs, ns.Name)
@ -428,7 +430,7 @@ func TestUpdateLabelToBeAdopted(t *testing.T) {
// matches pod1 only; change pod2's labels to be matching. Verify the RS // matches pod1 only; change pod2's labels to be matching. Verify the RS
// controller adopts pod2 and delete one of them, so there is only 1 pod // controller adopts pod2 and delete one of them, so there is only 1 pod
// left. // left.
s, rm, podInformer, clientSet := rmSetup(t, true) s, rm, rsInformer, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("rs-update-label-to-be-adopted", s, t) ns := framework.CreateTestingNamespace("rs-update-label-to-be-adopted", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
rs := newRS("rs", ns.Name, 1) rs := newRS("rs", ns.Name, 1)
@ -442,6 +444,7 @@ func TestUpdateLabelToBeAdopted(t *testing.T) {
createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name) createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go rsInformer.Run(stopCh)
go podInformer.Run(stopCh) go podInformer.Run(stopCh)
go rm.Run(5, stopCh) go rm.Run(5, stopCh)
waitRSStable(t, clientSet, rs, ns.Name) waitRSStable(t, clientSet, rs, ns.Name)