diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 5257a126675..bddeabaa220 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -55,8 +55,8 @@ func startEndpointController(ctx ControllerContext) (bool, error) { func startReplicationController(ctx ControllerContext) (bool, error) { go replicationcontroller.NewReplicationManager( ctx.InformerFactory.Pods().Informer(), + ctx.InformerFactory.ReplicationControllers().Informer(), ctx.ClientBuilder.ClientOrDie("replication-controller"), - ResyncPeriod(&ctx.Options), replicationcontroller.BurstReplicas, int(ctx.Options.LookupCacheSizeForRC), ctx.Options.EnableGarbageCollector, diff --git a/pkg/controller/informers/core.go b/pkg/controller/informers/core.go index f4a6d51e6eb..5713a6fdd27 100644 --- a/pkg/controller/informers/core.go +++ b/pkg/controller/informers/core.go @@ -318,6 +318,42 @@ func (f *internalLimitRangeInformer) Lister() coreinternallisters.LimitRangeList //***************************************************************************** +// ReplicationControllerInformer is type of SharedIndexInformer which watches and lists all replication controllers. +// Interface provides constructor for informer and lister for replication controllers. +type ReplicationControllerInformer interface { + Informer() cache.SharedIndexInformer + Lister() *cache.StoreToReplicationControllerLister +} + +type replicationControllerInformer struct { + *sharedInformerFactory +} + +// Informer checks whether replicationControllerInformer exists in sharedInformerFactory and if not, it creates new informer of type +// replicationControllerInformer and connects it to sharedInformerFactory +func (f *replicationControllerInformer) Informer() cache.SharedIndexInformer { + f.lock.Lock() + defer f.lock.Unlock() + + informerType := reflect.TypeOf(&v1.ReplicationController{}) + informer, exists := f.informers[informerType] + if exists { + return informer + } + informer = NewReplicationControllerInformer(f.client, f.defaultResync) + f.informers[informerType] = informer + + return informer +} + +// Lister returns lister for replicationControllerInformer +func (f *replicationControllerInformer) Lister() *cache.StoreToReplicationControllerLister { + informer := f.Informer() + return &cache.StoreToReplicationControllerLister{Indexer: informer.GetIndexer()} +} + +//***************************************************************************** + // NewPodInformer returns a SharedIndexInformer that lists and watches all pods func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { sharedIndexInformer := cache.NewSharedIndexInformer( @@ -350,7 +386,7 @@ func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) cac }, &v1.Node{}, resyncPeriod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + cache.Indexers{}) return sharedIndexInformer } @@ -445,7 +481,7 @@ func NewLimitRangeInformer(client clientset.Interface, resyncPeriod time.Duratio }, &v1.LimitRange{}, resyncPeriod, - cache.Indexers{}) + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return sharedIndexInformer } @@ -472,6 +508,25 @@ func NewInternalLimitRangeInformer(internalclient internalclientset.Interface, r return sharedIndexInformer } +// NewReplicationControllerInformer returns a SharedIndexInformer that lists and watches all replication controllers. +func NewReplicationControllerInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + sharedIndexInformer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + return client.Core().ReplicationControllers(v1.NamespaceAll).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + return client.Core().ReplicationControllers(v1.NamespaceAll).Watch(options) + }, + }, + &v1.ReplicationController{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + return sharedIndexInformer +} + /*****************************************************************************/ // ServiceAccountInformer is type of SharedIndexInformer which watches and lists all ServiceAccounts. diff --git a/pkg/controller/informers/factory.go b/pkg/controller/informers/factory.go index c13b1909b3e..a8680793c6f 100644 --- a/pkg/controller/informers/factory.go +++ b/pkg/controller/informers/factory.go @@ -52,6 +52,7 @@ type SharedInformerFactory interface { DaemonSets() DaemonSetInformer Deployments() DeploymentInformer ReplicaSets() ReplicaSetInformer + ReplicationControllers() ReplicationControllerInformer ClusterRoleBindings() ClusterRoleBindingInformer ClusterRoles() ClusterRoleInformer @@ -151,6 +152,10 @@ func (f *sharedInformerFactory) ReplicaSets() ReplicaSetInformer { return &replicaSetInformer{sharedInformerFactory: f} } +func (f *sharedInformerFactory) ReplicationControllers() ReplicationControllerInformer { + return &replicationControllerInformer{sharedInformerFactory: f} +} + func (f *sharedInformerFactory) ClusterRoles() ClusterRoleInformer { return &clusterRoleInformer{sharedInformerFactory: f} } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index ab2160502e5..6ba0e90847d 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -47,19 +47,10 @@ import ( ) const ( - // We'll attempt to recompute the required replicas of all ReplicaSets - // that have fulfilled their expectations at least this often. This recomputation - // happens based on contents in local pod storage. - FullControllerResyncPeriod = 30 * time.Second - // Realistic value of the burstReplica field for the replica set manager based off // performance requirements for kubernetes 1.0. BurstReplicas = 500 - // We must avoid counting pods until the pod store has synced. If it hasn't synced, to - // avoid a hot loop, we'll wait this long between checks. - PodStoreSyncedPollPeriod = 100 * time.Millisecond - // The number of times we retry updating a ReplicaSet's status. statusUpdateRetries = 1 ) @@ -568,14 +559,14 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { }() obj, exists, err := rsc.rsLister.Indexer.GetByKey(key) + if err != nil { + return err + } if !exists { glog.V(4).Infof("ReplicaSet has been deleted %v", key) rsc.expectations.DeleteExpectations(key) return nil } - if err != nil { - return err - } rs := *obj.(*extensions.ReplicaSet) rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) diff --git a/pkg/controller/replication/BUILD b/pkg/controller/replication/BUILD index a6ee535a234..6940b1e3d50 100644 --- a/pkg/controller/replication/BUILD +++ b/pkg/controller/replication/BUILD @@ -27,7 +27,6 @@ go_library( "//pkg/controller:go_default_library", "//pkg/controller/informers:go_default_library", "//pkg/labels:go_default_library", - "//pkg/runtime:go_default_library", "//pkg/runtime/schema:go_default_library", "//pkg/util:go_default_library", "//pkg/util/errors:go_default_library", @@ -35,7 +34,6 @@ go_library( "//pkg/util/runtime:go_default_library", "//pkg/util/wait:go_default_library", "//pkg/util/workqueue:go_default_library", - "//pkg/watch:go_default_library", "//vendor:github.com/golang/glog", ], ) @@ -57,6 +55,7 @@ go_test( "//pkg/client/restclient:go_default_library", "//pkg/client/testing/core:go_default_library", "//pkg/controller:go_default_library", + "//pkg/controller/informers:go_default_library", "//pkg/runtime:go_default_library", "//pkg/securitycontext:go_default_library", "//pkg/util/sets:go_default_library", diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 67e1b62a9b7..e50ff792447 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -35,7 +35,6 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/schema" "k8s.io/kubernetes/pkg/util" utilerrors "k8s.io/kubernetes/pkg/util/errors" @@ -43,27 +42,13 @@ import ( utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" - "k8s.io/kubernetes/pkg/watch" ) const ( - // We'll attempt to recompute the required replicas of all replication controllers - // that have fulfilled their expectations at least this often. This recomputation - // happens based on contents in local pod storage. - // Full Resync shouldn't be needed at all in a healthy system. This is a protection - // against disappearing objects and watch notification, that we believe should not - // happen at all. - // TODO: We should get rid of it completely in the fullness of time. - FullControllerResyncPeriod = 10 * time.Minute - // Realistic value of the burstReplica field for the replication manager based off // performance requirements for kubernetes 1.0. BurstReplicas = 500 - // We must avoid counting pods until the pod store has synced. If it hasn't synced, to - // avoid a hot loop, we'll wait this long between checks. - PodStoreSyncedPollPeriod = 100 * time.Millisecond - // The number of times we retry updating a replication controller's status. statusUpdateRetries = 1 ) @@ -97,16 +82,14 @@ type ReplicationManager struct { expectations *controller.UIDTrackingControllerExpectations // A store of replication controllers, populated by the rcController - rcStore cache.StoreToReplicationControllerLister - // Watches changes to all replication controllers - rcController *cache.Controller + rcLister cache.StoreToReplicationControllerLister // A store of pods, populated by the podController - podStore cache.StoreToPodLister + podLister cache.StoreToPodLister // Watches changes to all pods podController cache.ControllerInterface - // podStoreSynced returns true if the pod store has been synced at least once. + // podListerSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. - podStoreSynced func() bool + podListerSynced func() bool lookupCache *controller.MatchingCache @@ -118,27 +101,21 @@ type ReplicationManager struct { garbageCollectorEnabled bool } -// NewReplicationManager creates a replication manager -func NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) - return newReplicationManager( - eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}), - podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) -} - -// newReplicationManager configures a replication manager with the specified event recorder -func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { +// NewReplicationManager configures a replication manager with the specified event recorder +func NewReplicationManager(podInformer, rcInformer cache.SharedIndexInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + rm := &ReplicationManager{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, - Recorder: eventRecorder, + Recorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}), }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), @@ -146,29 +123,14 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache garbageCollectorEnabled: garbageCollectorEnabled, } - rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options v1.ListOptions) (runtime.Object, error) { - return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options) - }, - WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { - return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options) - }, - }, - &v1.ReplicationController{}, - // TODO: Can we have much longer period here? - FullControllerResyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: rm.enqueueController, - UpdateFunc: rm.updateRC, - // This will enter the sync loop and no-op, because the controller has been deleted from the store. - // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended - // way of achieving this is by performing a `stop` operation on the controller. - DeleteFunc: rm.enqueueController, - }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - + rcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: rm.enqueueController, + UpdateFunc: rm.updateRC, + // This will enter the sync loop and no-op, because the controller has been deleted from the store. + // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended + // way of achieving this is by performing a `stop` operation on the controller. + DeleteFunc: rm.enqueueController, + }) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rm.addPod, // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill @@ -177,31 +139,21 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache UpdateFunc: rm.updatePod, DeleteFunc: rm.deletePod, }) - rm.podStore.Indexer = podInformer.GetIndexer() - rm.podController = podInformer.GetController() rm.syncHandler = rm.syncReplicationController - rm.podStoreSynced = rm.podController.HasSynced + rm.rcLister.Indexer = rcInformer.GetIndexer() + rm.podLister.Indexer = podInformer.GetIndexer() + rm.podListerSynced = podInformer.HasSynced rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm } -// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests. -func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { - podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) - garbageCollectorEnabled := false - rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) - rm.internalPodInformer = podInformer - return rm -} - // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) - garbageCollectorEnabled := false - rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) + rcInformer := informers.NewReplicationControllerInformer(kubeClient, resyncPeriod()) + rm := NewReplicationManager(podInformer, rcInformer, kubeClient, burstReplicas, lookupCacheSize, false) rm.internalPodInformer = podInformer - return rm } @@ -216,20 +168,23 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) { // Run begins watching and syncing. func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - glog.Infof("Starting RC Manager") - go rm.rcController.Run(stopCh) - go rm.podController.Run(stopCh) - for i := 0; i < workers; i++ { - go wait.Until(rm.worker, time.Second, stopCh) - } + defer rm.queue.ShutDown() + glog.Infof("Starting RC Manager") if rm.internalPodInformer != nil { go rm.internalPodInformer.Run(stopCh) } + if !cache.WaitForCacheSync(stopCh, rm.podListerSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(rm.worker, time.Second, stopCh) + } + <-stopCh glog.Infof("Shutting down RC Manager") - rm.queue.ShutDown() } // getPodController returns the controller managing the given pod. @@ -250,7 +205,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr } // if not cached or cached value is invalid, search all the rc to find the matching one, and update cache - controllers, err := rm.rcStore.GetPodControllers(pod) + controllers, err := rm.rcLister.GetPodControllers(pod) if err != nil { glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name) return nil @@ -276,7 +231,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr // isCacheValid check if the cache is valid func (rm *ReplicationManager) isCacheValid(pod *v1.Pod, cachedRC *v1.ReplicationController) bool { - _, err := rm.rcStore.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name) + _, err := rm.rcLister.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name) // rc has been deleted or updated, cache is invalid if err != nil || !isControllerMatch(pod, cachedRC) { return false @@ -648,23 +603,15 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime)) }() - if !rm.podStoreSynced() { - // Sleep so we give the pod reflector goroutine a chance to run. - time.Sleep(PodStoreSyncedPollPeriod) - glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key) - rm.queue.Add(key) - return nil + obj, exists, err := rm.rcLister.Indexer.GetByKey(key) + if err != nil { + return err } - - obj, exists, err := rm.rcStore.Indexer.GetByKey(key) if !exists { glog.Infof("Replication Controller has been deleted %v", key) rm.expectations.DeleteExpectations(key) return nil } - if err != nil { - return err - } rc := *obj.(*v1.ReplicationController) trace.Step("ReplicationController restored") @@ -678,7 +625,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { if rm.garbageCollectorEnabled { // list all pods to include the pods that don't match the rc's selector // anymore but has the stale controller ref. - pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything()) + pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) @@ -719,7 +666,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { return aggregate } } else { - pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated()) + pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index ee54cb9de7b..05a4a3b6d8e 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/util/sets" @@ -162,12 +163,12 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady // 2 running pods, a controller with 2 replicas, sync is a no-op controllerSpec := newReplicationController(2) - manager.rcStore.Indexer.Add(controllerSpec) - newPodList(manager.podStore.Indexer, 2, v1.PodRunning, controllerSpec, "pod") + manager.rcLister.Indexer.Add(controllerSpec) + newPodList(manager.podLister.Indexer, 2, v1.PodRunning, controllerSpec, "pod") manager.podControl = &fakePodControl manager.syncReplicationController(getKey(controllerSpec, t)) @@ -178,13 +179,13 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected controllerSpec := newReplicationController(1) - manager.rcStore.Indexer.Add(controllerSpec) - newPodList(manager.podStore.Indexer, 2, v1.PodRunning, controllerSpec, "pod") + manager.rcLister.Indexer.Add(controllerSpec) + newPodList(manager.podLister.Indexer, 2, v1.PodRunning, controllerSpec, "pod") manager.syncReplicationController(getKey(controllerSpec, t)) validateSyncReplication(t, &fakePodControl, 0, 1, 0) @@ -194,7 +195,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady manager.podControl = &fakePodControl received := make(chan string) @@ -206,7 +207,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { // The DeletedFinalStateUnknown object should cause the rc manager to insert // the controller matching the selectors of the deleted pod into the work queue. controllerSpec := newReplicationController(1) - manager.rcStore.Indexer.Add(controllerSpec) + manager.rcLister.Indexer.Add(controllerSpec) pods := newPodList(nil, 1, v1.PodRunning, controllerSpec, "pod") manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) @@ -226,11 +227,11 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady // A controller with 2 replicas and no pods in the store, 2 creates expected rc := newReplicationController(2) - manager.rcStore.Indexer.Add(rc) + manager.rcLister.Indexer.Add(rc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -248,14 +249,14 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady // Steady state for the replication controller, no Status.Replicas updates expected activePods := 5 rc := newReplicationController(activePods) - manager.rcStore.Indexer.Add(rc) + manager.rcLister.Indexer.Add(rc) rc.Status = v1.ReplicationControllerStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)} - newPodList(manager.podStore.Indexer, activePods, v1.PodRunning, rc, "pod") + newPodList(manager.podLister.Indexer, activePods, v1.PodRunning, rc, "pod") fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -289,19 +290,19 @@ func TestControllerUpdateReplicas(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. rc := newReplicationController(5) - manager.rcStore.Indexer.Add(rc) + manager.rcLister.Indexer.Add(rc) rc.Status = v1.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0} rc.Generation = 1 - newPodList(manager.podStore.Indexer, 2, v1.PodRunning, rc, "pod") + newPodList(manager.podLister.Indexer, 2, v1.PodRunning, rc, "pod") rcCopy := *rc extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"} rcCopy.Spec.Selector = extraLabelMap - newPodList(manager.podStore.Indexer, 2, v1.PodRunning, &rcCopy, "podWithExtraLabel") + newPodList(manager.podLister.Indexer, 2, v1.PodRunning, &rcCopy, "podWithExtraLabel") // This response body is just so we don't err out decoding the http response response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{}) @@ -335,12 +336,12 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) - manager.rcStore.Indexer.Add(controllerSpec) - newPodList(manager.podStore.Indexer, 1, v1.PodRunning, controllerSpec, "pod") + manager.rcLister.Indexer.Add(controllerSpec) + newPodList(manager.podLister.Indexer, 1, v1.PodRunning, controllerSpec, "pod") // Creates a replica and sets expectations controllerSpec.Status.Replicas = 1 @@ -388,7 +389,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { func TestPodControllerLookup(t *testing.T) { manager := NewReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}), controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady testCases := []struct { inRCs []*v1.ReplicationController pod *v1.Pod @@ -434,7 +435,7 @@ func TestPodControllerLookup(t *testing.T) { } for _, c := range testCases { for _, r := range c.inRCs { - manager.rcStore.Indexer.Add(r) + manager.rcLister.Indexer.Add(r) } if rc := manager.getPodController(c.pod); rc != nil { if c.outRCName != rc.Name { @@ -449,9 +450,15 @@ func TestPodControllerLookup(t *testing.T) { func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() c := &fake.Clientset{} - c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + c.AddWatchReactor("replicationcontrollers", core.DefaultWatchReactor(fakeWatch, nil)) + stopCh := make(chan struct{}) + defer close(stopCh) + informers := informers.NewSharedInformerFactory(c, nil, controller.NoResyncPeriodFunc()) + podInformer := informers.Pods().Informer() + rcInformer := informers.ReplicationControllers().Informer() + manager := NewReplicationManager(podInformer, rcInformer, c, BurstReplicas, 0, false) + informers.Start(stopCh) + manager.podListerSynced = alwaysReady var testControllerSpec v1.ReplicationController received := make(chan string) @@ -460,8 +467,7 @@ func TestWatchControllers(t *testing.T) { // and eventually into the syncHandler. The handler validates the received controller // and closes the received channel to indicate that the test can finish. manager.syncHandler = func(key string) error { - - obj, exists, err := manager.rcStore.Indexer.GetByKey(key) + obj, exists, err := manager.rcLister.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find controller under key %v", key) } @@ -472,11 +478,9 @@ func TestWatchControllers(t *testing.T) { close(received) return nil } + // Start only the rc watcher and the workqueue, send a watch event, // and make sure it hits the sync method. - stopCh := make(chan struct{}) - defer close(stopCh) - go manager.rcController.Run(stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh) testControllerSpec.Name = "foo" @@ -494,17 +498,17 @@ func TestWatchPods(t *testing.T) { c := &fake.Clientset{} c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady // Put one rc and one pod into the controller's stores testControllerSpec := newReplicationController(1) - manager.rcStore.Indexer.Add(testControllerSpec) + manager.rcLister.Indexer.Add(testControllerSpec) received := make(chan string) // The pod update sent through the fakeWatcher should figure out the managing rc and // send it into the syncHandler. manager.syncHandler = func(key string) error { - obj, exists, err := manager.rcStore.Indexer.GetByKey(key) + obj, exists, err := manager.rcLister.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find controller under key %v", key) } @@ -519,7 +523,6 @@ func TestWatchPods(t *testing.T) { // and make sure it hits the sync method for the right rc. stopCh := make(chan struct{}) defer close(stopCh) - go manager.podController.Run(stopCh) go manager.internalPodInformer.Run(stopCh) go wait.Until(manager.worker, 10*time.Millisecond, stopCh) @@ -537,12 +540,12 @@ func TestWatchPods(t *testing.T) { func TestUpdatePods(t *testing.T) { manager := NewReplicationManagerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady received := make(chan string) manager.syncHandler = func(key string) error { - obj, exists, err := manager.rcStore.Indexer.GetByKey(key) + obj, exists, err := manager.rcLister.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find controller under key %v", key) } @@ -556,16 +559,16 @@ func TestUpdatePods(t *testing.T) { // Put 2 rcs and one pod into the controller's stores testControllerSpec1 := newReplicationController(1) - manager.rcStore.Indexer.Add(testControllerSpec1) + manager.rcLister.Indexer.Add(testControllerSpec1) testControllerSpec2 := *testControllerSpec1 testControllerSpec2.Spec.Selector = map[string]string{"bar": "foo"} testControllerSpec2.Name = "barfoo" - manager.rcStore.Indexer.Add(&testControllerSpec2) + manager.rcLister.Indexer.Add(&testControllerSpec2) - // case 1: We put in the podStore a pod with labels matching + // case 1: We put in the podLister a pod with labels matching // testControllerSpec1, then update its labels to match testControllerSpec2. // We expect to receive a sync request for both controllers. - pod1 := newPodList(manager.podStore.Indexer, 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] + pod1 := newPodList(manager.podLister.Indexer, 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] pod1.ResourceVersion = "1" pod2 := pod1 pod2.Labels = testControllerSpec2.Spec.Selector @@ -584,7 +587,7 @@ func TestUpdatePods(t *testing.T) { } } - // case 2: pod1 in the podStore has labels matching testControllerSpec1. + // case 2: pod1 in the podLister has labels matching testControllerSpec1. // We update its labels to match no replication controller. We expect to // receive a sync request for testControllerSpec1. pod2.Labels = make(map[string]string) @@ -615,12 +618,12 @@ func TestControllerUpdateRequeue(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady rc := newReplicationController(1) - manager.rcStore.Indexer.Add(rc) + manager.rcLister.Indexer.Add(rc) rc.Status = v1.ReplicationControllerStatus{Replicas: 2} - newPodList(manager.podStore.Indexer, 1, v1.PodRunning, rc, "pod") + newPodList(manager.podLister.Indexer, 1, v1.PodRunning, rc, "pod") fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -686,11 +689,11 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, burstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady manager.podControl = &fakePodControl controllerSpec := newReplicationController(numReplicas) - manager.rcStore.Indexer.Add(controllerSpec) + manager.rcLister.Indexer.Add(controllerSpec) expectedPods := 0 pods := newPodList(nil, numReplicas, v1.PodPending, controllerSpec, "pod") @@ -704,14 +707,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) for _, replicas := range []int{numReplicas, 0} { *(controllerSpec.Spec.Replicas) = int32(replicas) - manager.rcStore.Indexer.Add(controllerSpec) + manager.rcLister.Indexer.Add(controllerSpec) for i := 0; i < numReplicas; i += burstReplicas { manager.syncReplicationController(getKey(controllerSpec, t)) // The store accrues active pods. It's also used by the rc to determine how many // replicas to create. - activePods := len(manager.podStore.Indexer.List()) + activePods := len(manager.podLister.Indexer.List()) if replicas != 0 { // This is the number of pods currently "in flight". They were created by the rc manager above, // which then puts the rc to sleep till all of them have been observed. @@ -725,7 +728,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // This simulates the watch events for all but 1 of the expected pods. // None of these should wake the controller because it has expectations==BurstReplicas. for i := 0; i < expectedPods-1; i++ { - manager.podStore.Indexer.Add(&pods.Items[i]) + manager.podLister.Indexer.Add(&pods.Items[i]) manager.addPod(&pods.Items[i]) } @@ -761,7 +764,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // has exactly one expectation at the end, to verify that we // don't double delete. for i := range podsToDelete[1:] { - manager.podStore.Indexer.Delete(podsToDelete[i]) + manager.podLister.Indexer.Delete(podsToDelete[i]) manager.deletePod(podsToDelete[i]) } podExp, exists, err := manager.expectations.GetExpectations(rcKey) @@ -782,7 +785,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // The last add pod will decrease the expectation of the rc to 0, // which will cause it to create/delete the remaining replicas up to burstReplicas. if replicas != 0 { - manager.podStore.Indexer.Add(&pods.Items[expectedPods-1]) + manager.podLister.Indexer.Add(&pods.Items[expectedPods-1]) manager.addPod(&pods.Items[expectedPods-1]) } else { expectedDel := manager.expectations.GetUIDs(getKey(controllerSpec, t)) @@ -797,14 +800,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) Labels: controllerSpec.Spec.Selector, }, } - manager.podStore.Indexer.Delete(lastPod) + manager.podLister.Indexer.Delete(lastPod) manager.deletePod(lastPod) } pods.Items = pods.Items[expectedPods:] } // Confirm that we've created the right number of replicas - activePods := int32(len(manager.podStore.Indexer.List())) + activePods := int32(len(manager.podLister.Indexer.List())) if activePods != *(controllerSpec.Spec.Replicas) { t.Fatalf("Unexpected number of active pods, expected %d, got %d", *(controllerSpec.Spec.Replicas), activePods) } @@ -836,13 +839,13 @@ func TestRCSyncExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) - manager.rcStore.Indexer.Add(controllerSpec) + manager.rcLister.Indexer.Add(controllerSpec) pods := newPodList(nil, 2, v1.PodPending, controllerSpec, "pod") - manager.podStore.Indexer.Add(&pods.Items[0]) + manager.podLister.Indexer.Add(&pods.Items[0]) postExpectationsPod := pods.Items[1] manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRCExpectations{ @@ -850,7 +853,7 @@ func TestRCSyncExpectations(t *testing.T) { // If we check active pods before checking expectataions, the rc // will create a new replica because it doesn't see this pod, but // has fulfilled its expectations. - manager.podStore.Indexer.Add(&postExpectationsPod) + manager.podLister.Indexer.Add(&postExpectationsPod) }, }) manager.syncReplicationController(getKey(controllerSpec, t)) @@ -860,10 +863,10 @@ func TestRCSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady rc := newReplicationController(1) - manager.rcStore.Indexer.Add(rc) + manager.rcLister.Indexer.Add(rc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -885,7 +888,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { if !exists || err != nil { t.Errorf("No expectations found for rc") } - manager.rcStore.Indexer.Delete(rc) + manager.rcLister.Indexer.Delete(rc) manager.syncReplicationController(getKey(rc, t)) if _, exists, err = manager.expectations.GetExpectations(rcKey); exists { @@ -894,37 +897,11 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should have no effect, since we've deleted the rc. podExp.Add(-1, 0) - manager.podStore.Indexer.Replace(make([]interface{}, 0), "0") + manager.podLister.Indexer.Replace(make([]interface{}, 0), "0") manager.syncReplicationController(getKey(rc, t)) validateSyncReplication(t, &fakePodControl, 0, 0, 0) } -func TestRCManagerNotReady(t *testing.T) { - c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) - fakePodControl := controller.FakePodControl{} - manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0) - manager.podControl = &fakePodControl - manager.podStoreSynced = func() bool { return false } - - // Simulates the rc reflector running before the pod reflector. We don't - // want to end up creating replicas in this case until the pod reflector - // has synced, so the rc manager should just requeue the rc. - controllerSpec := newReplicationController(1) - manager.rcStore.Indexer.Add(controllerSpec) - - rcKey := getKey(controllerSpec, t) - manager.syncReplicationController(rcKey) - validateSyncReplication(t, &fakePodControl, 0, 0, 0) - queueRC, _ := manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } - - manager.podStoreSynced = alwaysReady - manager.syncReplicationController(rcKey) - validateSyncReplication(t, &fakePodControl, 1, 0, 0) -} - // shuffle returns a new shuffled list of container controllers. func shuffle(controllers []*v1.ReplicationController) []*v1.ReplicationController { numControllers := len(controllers) @@ -941,7 +918,7 @@ func TestOverlappingRCs(t *testing.T) { for i := 0; i < 5; i++ { manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady // Create 10 rcs, shuffled them randomly and insert them into the rc manager's store var controllers []*v1.ReplicationController @@ -953,7 +930,7 @@ func TestOverlappingRCs(t *testing.T) { } shuffledControllers := shuffle(controllers) for j := range shuffledControllers { - manager.rcStore.Indexer.Add(shuffledControllers[j]) + manager.rcLister.Indexer.Add(shuffledControllers[j]) } // Add a pod and make sure only the oldest rc is synced pods := newPodList(nil, 1, v1.PodPending, controllers[0], "pod") @@ -970,10 +947,10 @@ func TestOverlappingRCs(t *testing.T) { func TestDeletionTimestamp(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady controllerSpec := newReplicationController(1) - manager.rcStore.Indexer.Add(controllerSpec) + manager.rcLister.Indexer.Add(controllerSpec) rcKey, err := controller.KeyFunc(controllerSpec) if err != nil { t.Errorf("Couldn't get key for object %#v: %v", controllerSpec, err) @@ -1085,7 +1062,7 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) { ns := fmt.Sprintf("ns-%d", i) for j := 0; j < 10; j++ { rcName := fmt.Sprintf("rc-%d", j) - manager.rcStore.Indexer.Add(&v1.ReplicationController{ + manager.rcLister.Indexer.Add(&v1.ReplicationController{ ObjectMeta: v1.ObjectMeta{Name: rcName, Namespace: ns}, Spec: v1.ReplicationControllerSpec{ Selector: map[string]string{"rcName": rcName}, @@ -1127,7 +1104,7 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { for i := 0; i < rcNum; i++ { rcName := fmt.Sprintf("rc-%d", i) - manager.rcStore.Indexer.Add(&v1.ReplicationController{ + manager.rcLister.Indexer.Add(&v1.ReplicationController{ ObjectMeta: v1.ObjectMeta{Name: rcName, Namespace: "foo"}, Spec: v1.ReplicationControllerSpec{ Selector: map[string]string{"rcName": rcName}, @@ -1149,7 +1126,7 @@ func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationMana fakePodControl = &controller.FakePodControl{} manager = NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) manager.garbageCollectorEnabled = true - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady manager.podControl = fakePodControl return manager, fakePodControl } @@ -1157,13 +1134,13 @@ func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationMana func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { manager, fakePodControl := setupManagerWithGCEnabled() rc := newReplicationController(2) - manager.rcStore.Indexer.Add(rc) + manager.rcLister.Indexer.Add(rc) var trueVar = true otherControllerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1", Kind: "ReplicationController", Name: "AnotherRC", Controller: &trueVar} - // add to podStore a matching Pod controlled by another controller. Expect no patch. + // add to podLister a matching Pod controlled by another controller. Expect no patch. pod := newPod("pod", rc, v1.PodRunning, nil) pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference} - manager.podStore.Indexer.Add(pod) + manager.podLister.Indexer.Add(pod) err := manager.syncReplicationController(getKey(rc, t)) if err != nil { t.Fatal(err) @@ -1175,14 +1152,14 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { func TestPatchPodWithOtherOwnerRef(t *testing.T) { rc := newReplicationController(2) manager, fakePodControl := setupManagerWithGCEnabled(rc) - manager.rcStore.Indexer.Add(rc) - // add to podStore one more matching pod that doesn't have a controller + manager.rcLister.Indexer.Add(rc) + // add to podLister one more matching pod that doesn't have a controller // ref, but has an owner ref pointing to other object. Expect a patch to // take control of it. unrelatedOwnerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"} pod := newPod("pod", rc, v1.PodRunning, nil) pod.OwnerReferences = []metav1.OwnerReference{unrelatedOwnerReference} - manager.podStore.Indexer.Add(pod) + manager.podLister.Indexer.Add(pod) err := manager.syncReplicationController(getKey(rc, t)) if err != nil { @@ -1195,13 +1172,13 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) { func TestPatchPodWithCorrectOwnerRef(t *testing.T) { rc := newReplicationController(2) manager, fakePodControl := setupManagerWithGCEnabled(rc) - manager.rcStore.Indexer.Add(rc) - // add to podStore a matching pod that has an ownerRef pointing to the rc, + manager.rcLister.Indexer.Add(rc) + // add to podLister a matching pod that has an ownerRef pointing to the rc, // but ownerRef.Controller is false. Expect a patch to take control it. rcOwnerReference := metav1.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name} pod := newPod("pod", rc, v1.PodRunning, nil) pod.OwnerReferences = []metav1.OwnerReference{rcOwnerReference} - manager.podStore.Indexer.Add(pod) + manager.podLister.Indexer.Add(pod) err := manager.syncReplicationController(getKey(rc, t)) if err != nil { @@ -1214,11 +1191,11 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) { func TestPatchPodFails(t *testing.T) { rc := newReplicationController(2) manager, fakePodControl := setupManagerWithGCEnabled(rc) - manager.rcStore.Indexer.Add(rc) - // add to podStore two matching pods. Expect two patches to take control + manager.rcLister.Indexer.Add(rc) + // add to podLister two matching pods. Expect two patches to take control // them. - manager.podStore.Indexer.Add(newPod("pod1", rc, v1.PodRunning, nil)) - manager.podStore.Indexer.Add(newPod("pod2", rc, v1.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod1", rc, v1.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod2", rc, v1.PodRunning, nil)) // let both patches fail. The rc manager will assume it fails to take // control of the pods and create new ones. fakePodControl.Err = fmt.Errorf("Fake Error") @@ -1233,12 +1210,12 @@ func TestPatchPodFails(t *testing.T) { func TestPatchExtraPodsThenDelete(t *testing.T) { rc := newReplicationController(2) manager, fakePodControl := setupManagerWithGCEnabled(rc) - manager.rcStore.Indexer.Add(rc) - // add to podStore three matching pods. Expect three patches to take control + manager.rcLister.Indexer.Add(rc) + // add to podLister three matching pods. Expect three patches to take control // them, and later delete one of them. - manager.podStore.Indexer.Add(newPod("pod1", rc, v1.PodRunning, nil)) - manager.podStore.Indexer.Add(newPod("pod2", rc, v1.PodRunning, nil)) - manager.podStore.Indexer.Add(newPod("pod3", rc, v1.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod1", rc, v1.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod2", rc, v1.PodRunning, nil)) + manager.podLister.Indexer.Add(newPod("pod3", rc, v1.PodRunning, nil)) err := manager.syncReplicationController(getKey(rc, t)) if err != nil { t.Fatal(err) @@ -1250,8 +1227,8 @@ func TestPatchExtraPodsThenDelete(t *testing.T) { func TestUpdateLabelsRemoveControllerRef(t *testing.T) { manager, fakePodControl := setupManagerWithGCEnabled() rc := newReplicationController(2) - manager.rcStore.Indexer.Add(rc) - // put one pod in the podStore + manager.rcLister.Indexer.Add(rc) + // put one pod in the podLister pod := newPod("pod", rc, v1.PodRunning, nil) pod.ResourceVersion = "1" var trueVar = true @@ -1264,7 +1241,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) { // add the updatedPod to the store. This is consistent with the behavior of // the Informer: Informer updates the store before call the handler // (updatePod() in this case). - manager.podStore.Indexer.Add(&updatedPod) + manager.podLister.Indexer.Add(&updatedPod) // send a update of the same pod with modified labels manager.updatePod(pod, &updatedPod) // verifies that rc is added to the queue @@ -1288,15 +1265,15 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) { func TestUpdateSelectorControllerRef(t *testing.T) { manager, fakePodControl := setupManagerWithGCEnabled() rc := newReplicationController(2) - // put 2 pods in the podStore - newPodList(manager.podStore.Indexer, 2, v1.PodRunning, rc, "pod") + // put 2 pods in the podLister + newPodList(manager.podLister.Indexer, 2, v1.PodRunning, rc, "pod") // update the RC so that its selector no longer matches the pods updatedRC := *rc updatedRC.Spec.Selector = map[string]string{"foo": "baz"} // put the updatedRC into the store. This is consistent with the behavior of // the Informer: Informer updates the store before call the handler // (updateRC() in this case). - manager.rcStore.Indexer.Add(&updatedRC) + manager.rcLister.Indexer.Add(&updatedRC) manager.updateRC(rc, &updatedRC) // verifies that the rc is added to the queue rcKey := getKey(rc, t) @@ -1323,9 +1300,9 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { rc := newReplicationController(2) now := metav1.Now() rc.DeletionTimestamp = &now - manager.rcStore.Indexer.Add(rc) + manager.rcLister.Indexer.Add(rc) pod1 := newPod("pod1", rc, v1.PodRunning, nil) - manager.podStore.Indexer.Add(pod1) + manager.podLister.Indexer.Add(pod1) // no patch, no create err := manager.syncReplicationController(getKey(rc, t)) @@ -1346,16 +1323,16 @@ func TestReadyReplicas(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady // Status.Replica should update to match number of pods in system, 1 new pod should be created. rc := newReplicationController(2) rc.Status = v1.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1} rc.Generation = 1 - manager.rcStore.Indexer.Add(rc) + manager.rcLister.Indexer.Add(rc) - newPodList(manager.podStore.Indexer, 2, v1.PodPending, rc, "pod") - newPodList(manager.podStore.Indexer, 2, v1.PodRunning, rc, "pod") + newPodList(manager.podLister.Indexer, 2, v1.PodPending, rc, "pod") + newPodList(manager.podLister.Indexer, 2, v1.PodRunning, rc, "pod") // This response body is just so we don't err out decoding the http response response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{}) @@ -1385,7 +1362,7 @@ func TestAvailableReplicas(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: ®istered.GroupOrDie(v1.GroupName).GroupVersion}}) manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) - manager.podStoreSynced = alwaysReady + manager.podListerSynced = alwaysReady // Status.Replica should update to match number of pods in system, 1 new pod should be created. rc := newReplicationController(2) @@ -1393,17 +1370,17 @@ func TestAvailableReplicas(t *testing.T) { rc.Generation = 1 // minReadySeconds set to 15s rc.Spec.MinReadySeconds = 15 - manager.rcStore.Indexer.Add(rc) + manager.rcLister.Indexer.Add(rc) // First pod becomes ready 20s ago moment := metav1.Time{Time: time.Now().Add(-2e10)} pod := newPod("pod", rc, v1.PodRunning, &moment) - manager.podStore.Indexer.Add(pod) + manager.podLister.Indexer.Add(pod) // Second pod becomes ready now otherMoment := metav1.Now() otherPod := newPod("otherPod", rc, v1.PodRunning, &otherMoment) - manager.podStore.Indexer.Add(otherPod) + manager.podLister.Indexer.Add(otherPod) // This response body is just so we don't err out decoding the http response response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{}) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 0ce6d29193b..09695234a68 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -250,7 +250,7 @@ func ClusterRoles() []rbac.ClusterRole { rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "serviceaccounts").RuleOrDie(), rbac.NewRule("list", "watch").Groups("*").Resources("namespaces", "nodes", "persistentvolumeclaims", - "persistentvolumes", "pods", "secrets", "serviceaccounts").RuleOrDie(), + "persistentvolumes", "pods", "secrets", "serviceaccounts", "replicationcontrollers").RuleOrDie(), rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(), rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(), }, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index bf47185b72a..c693b81fc7d 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -447,6 +447,7 @@ items: - persistentvolumeclaims - persistentvolumes - pods + - replicationcontrollers - secrets - serviceaccounts verbs: diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index cf049fb3542..2e5c44d5bd1 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -31,8 +31,10 @@ import ( "k8s.io/kubernetes/pkg/apimachinery/registered" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" "k8s.io/kubernetes/pkg/fields" @@ -82,8 +84,13 @@ func TestQuota(t *testing.T) { controllerCh := make(chan struct{}) defer close(controllerCh) - go replicationcontroller.NewReplicationManagerFromClientForIntegration(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). - Run(3, controllerCh) + informers := informers.NewSharedInformerFactory(clientset, nil, controller.NoResyncPeriodFunc()) + podInformer := informers.Pods().Informer() + rcInformer := informers.ReplicationControllers().Informer() + rm := replicationcontroller.NewReplicationManager(podInformer, rcInformer, clientset, replicationcontroller.BurstReplicas, 4096, false) + rm.SetEventRecorder(&record.FakeRecorder{}) + informers.Start(controllerCh) + go rm.Run(3, controllerCh) resourceQuotaRegistry := quotainstall.NewRegistry(clientset, nil) groupKindsToReplenish := []schema.GroupKind{ diff --git a/test/integration/replicationcontroller/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go index 9ace7351588..67e87593e89 100644 --- a/test/integration/replicationcontroller/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -123,7 +123,7 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa return ret, nil } -func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replication.ReplicationManager, cache.SharedIndexInformer, clientset.Interface) { +func rmSetup(t *testing.T, stopCh chan struct{}, enableGarbageCollector bool) (*httptest.Server, *replication.ReplicationManager, cache.SharedIndexInformer, clientset.Interface) { masterConfig := framework.NewIntegrationTestMasterConfig() _, s := framework.RunAMaster(masterConfig) @@ -133,22 +133,13 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl t.Fatalf("Error in create clientset: %v", err) } resyncPeriod := 12 * time.Hour - resyncPeriodFunc := func() time.Duration { - return resyncPeriod - } - podInformer := informers.NewPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) - rm := replication.NewReplicationManager( - podInformer, - clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")), - resyncPeriodFunc, - replication.BurstReplicas, - 4096, - enableGarbageCollector, - ) - if err != nil { - t.Fatalf("Failed to create replication manager") - } + informers := informers.NewSharedInformerFactory(clientSet, nil, resyncPeriod) + podInformer := informers.Pods().Informer() + rcInformer := informers.ReplicationControllers().Informer() + rm := replication.NewReplicationManager(podInformer, rcInformer, clientSet, replication.BurstReplicas, 4096, enableGarbageCollector) + informers.Start(stopCh) + return s, rm, podInformer, clientSet } @@ -219,7 +210,8 @@ func TestAdoption(t *testing.T) { }, } for i, tc := range testCases { - s, rm, podInformer, clientSet := rmSetup(t, true) + stopCh := make(chan struct{}) + s, rm, podInformer, clientSet := rmSetup(t, stopCh, true) ns := framework.CreateTestingNamespace(fmt.Sprintf("adoption-%d", i), s, t) defer framework.DeleteTestingNamespace(ns, s, t) @@ -238,7 +230,6 @@ func TestAdoption(t *testing.T) { t.Fatalf("Failed to create Pod: %v", err) } - stopCh := make(chan struct{}) go podInformer.Run(stopCh) waitToObservePods(t, podInformer, 1) go rm.Run(5, stopCh) @@ -296,7 +287,8 @@ func TestUpdateSelectorToAdopt(t *testing.T) { // We have pod1, pod2 and rc. rc.spec.replicas=1. At first rc.Selector // matches pod1 only; change the selector to match pod2 as well. Verify // there is only one pod left. - s, rm, podInformer, clientSet := rmSetup(t, true) + stopCh := make(chan struct{}) + s, rm, _, clientSet := rmSetup(t, stopCh, true) ns := framework.CreateTestingNamespace("update-selector-to-adopt", s, t) defer framework.DeleteTestingNamespace(ns, s, t) rc := newRC("rc", ns.Name, 1) @@ -309,8 +301,6 @@ func TestUpdateSelectorToAdopt(t *testing.T) { pod2.Labels["uniqueKey"] = "2" createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name) - stopCh := make(chan struct{}) - go podInformer.Run(stopCh) go rm.Run(5, stopCh) waitRCStable(t, clientSet, rc, ns.Name) @@ -336,7 +326,8 @@ func TestUpdateSelectorToRemoveControllerRef(t *testing.T) { // matches pod1 and pod2; change the selector to match only pod1. Verify // that rc creates one more pod, so there are 3 pods. Also verify that // pod2's controllerRef is cleared. - s, rm, podInformer, clientSet := rmSetup(t, true) + stopCh := make(chan struct{}) + s, rm, podInformer, clientSet := rmSetup(t, stopCh, true) ns := framework.CreateTestingNamespace("update-selector-to-remove-controllerref", s, t) defer framework.DeleteTestingNamespace(ns, s, t) rc := newRC("rc", ns.Name, 2) @@ -346,8 +337,6 @@ func TestUpdateSelectorToRemoveControllerRef(t *testing.T) { pod2.Labels["uniqueKey"] = "2" createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name) - stopCh := make(chan struct{}) - go podInformer.Run(stopCh) waitToObservePods(t, podInformer, 2) go rm.Run(5, stopCh) waitRCStable(t, clientSet, rc, ns.Name) @@ -382,7 +371,8 @@ func TestUpdateLabelToRemoveControllerRef(t *testing.T) { // matches pod1 and pod2; change pod2's labels to non-matching. Verify // that rc creates one more pod, so there are 3 pods. Also verify that // pod2's controllerRef is cleared. - s, rm, podInformer, clientSet := rmSetup(t, true) + stopCh := make(chan struct{}) + s, rm, _, clientSet := rmSetup(t, stopCh, true) ns := framework.CreateTestingNamespace("update-label-to-remove-controllerref", s, t) defer framework.DeleteTestingNamespace(ns, s, t) rc := newRC("rc", ns.Name, 2) @@ -390,8 +380,6 @@ func TestUpdateLabelToRemoveControllerRef(t *testing.T) { pod2 := newMatchingPod("pod2", ns.Name) createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name) - stopCh := make(chan struct{}) - go podInformer.Run(stopCh) go rm.Run(5, stopCh) waitRCStable(t, clientSet, rc, ns.Name) @@ -424,7 +412,8 @@ func TestUpdateLabelToBeAdopted(t *testing.T) { // matches pod1 only; change pod2's labels to be matching. Verify the RC // controller adopts pod2 and delete one of them, so there is only 1 pod // left. - s, rm, podInformer, clientSet := rmSetup(t, true) + stopCh := make(chan struct{}) + s, rm, _, clientSet := rmSetup(t, stopCh, true) ns := framework.CreateTestingNamespace("update-label-to-be-adopted", s, t) defer framework.DeleteTestingNamespace(ns, s, t) rc := newRC("rc", ns.Name, 1) @@ -437,8 +426,6 @@ func TestUpdateLabelToBeAdopted(t *testing.T) { pod2.Labels["uniqueKey"] = "2" createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name) - stopCh := make(chan struct{}) - go podInformer.Run(stopCh) go rm.Run(5, stopCh) waitRCStable(t, clientSet, rc, ns.Name)