Merge pull request #39355 from kargakis/update-rc-manager

Automatic merge from submit-queue

Share rc cache from the rc manager

@kubernetes/sig-apps-misc @hodovska
This commit is contained in:
Kubernetes Submit Queue 2017-01-04 05:18:29 -08:00 committed by GitHub
commit 38d57e5a71
11 changed files with 241 additions and 272 deletions

View File

@ -55,8 +55,8 @@ func startEndpointController(ctx ControllerContext) (bool, error) {
func startReplicationController(ctx ControllerContext) (bool, error) {
go replicationcontroller.NewReplicationManager(
ctx.InformerFactory.Pods().Informer(),
ctx.InformerFactory.ReplicationControllers().Informer(),
ctx.ClientBuilder.ClientOrDie("replication-controller"),
ResyncPeriod(&ctx.Options),
replicationcontroller.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRC),
ctx.Options.EnableGarbageCollector,

View File

@ -318,6 +318,42 @@ func (f *internalLimitRangeInformer) Lister() coreinternallisters.LimitRangeList
//*****************************************************************************
// ReplicationControllerInformer is type of SharedIndexInformer which watches and lists all replication controllers.
// Interface provides constructor for informer and lister for replication controllers.
type ReplicationControllerInformer interface {
Informer() cache.SharedIndexInformer
Lister() *cache.StoreToReplicationControllerLister
}
type replicationControllerInformer struct {
*sharedInformerFactory
}
// Informer checks whether replicationControllerInformer exists in sharedInformerFactory and if not, it creates new informer of type
// replicationControllerInformer and connects it to sharedInformerFactory
func (f *replicationControllerInformer) Informer() cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(&v1.ReplicationController{})
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = NewReplicationControllerInformer(f.client, f.defaultResync)
f.informers[informerType] = informer
return informer
}
// Lister returns lister for replicationControllerInformer
func (f *replicationControllerInformer) Lister() *cache.StoreToReplicationControllerLister {
informer := f.Informer()
return &cache.StoreToReplicationControllerLister{Indexer: informer.GetIndexer()}
}
//*****************************************************************************
// NewPodInformer returns a SharedIndexInformer that lists and watches all pods
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
@ -350,7 +386,7 @@ func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) cac
},
&v1.Node{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
cache.Indexers{})
return sharedIndexInformer
}
@ -445,7 +481,7 @@ func NewLimitRangeInformer(client clientset.Interface, resyncPeriod time.Duratio
},
&v1.LimitRange{},
resyncPeriod,
cache.Indexers{})
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
return sharedIndexInformer
}
@ -472,6 +508,25 @@ func NewInternalLimitRangeInformer(internalclient internalclientset.Interface, r
return sharedIndexInformer
}
// NewReplicationControllerInformer returns a SharedIndexInformer that lists and watches all replication controllers.
func NewReplicationControllerInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return client.Core().ReplicationControllers(v1.NamespaceAll).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return client.Core().ReplicationControllers(v1.NamespaceAll).Watch(options)
},
},
&v1.ReplicationController{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer
}
/*****************************************************************************/
// ServiceAccountInformer is type of SharedIndexInformer which watches and lists all ServiceAccounts.

View File

@ -52,6 +52,7 @@ type SharedInformerFactory interface {
DaemonSets() DaemonSetInformer
Deployments() DeploymentInformer
ReplicaSets() ReplicaSetInformer
ReplicationControllers() ReplicationControllerInformer
ClusterRoleBindings() ClusterRoleBindingInformer
ClusterRoles() ClusterRoleInformer
@ -151,6 +152,10 @@ func (f *sharedInformerFactory) ReplicaSets() ReplicaSetInformer {
return &replicaSetInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ReplicationControllers() ReplicationControllerInformer {
return &replicationControllerInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ClusterRoles() ClusterRoleInformer {
return &clusterRoleInformer{sharedInformerFactory: f}
}

View File

@ -47,19 +47,10 @@ import (
)
const (
// We'll attempt to recompute the required replicas of all ReplicaSets
// that have fulfilled their expectations at least this often. This recomputation
// happens based on contents in local pod storage.
FullControllerResyncPeriod = 30 * time.Second
// Realistic value of the burstReplica field for the replica set manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// The number of times we retry updating a ReplicaSet's status.
statusUpdateRetries = 1
)
@ -568,14 +559,14 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
}()
obj, exists, err := rsc.rsLister.Indexer.GetByKey(key)
if err != nil {
return err
}
if !exists {
glog.V(4).Infof("ReplicaSet has been deleted %v", key)
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
rs := *obj.(*extensions.ReplicaSet)
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)

View File

@ -27,7 +27,6 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/util:go_default_library",
"//pkg/util/errors:go_default_library",
@ -35,7 +34,6 @@ go_library(
"//pkg/util/runtime:go_default_library",
"//pkg/util/wait:go_default_library",
"//pkg/util/workqueue:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
],
)
@ -57,6 +55,7 @@ go_test(
"//pkg/client/restclient:go_default_library",
"//pkg/client/testing/core:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/securitycontext:go_default_library",
"//pkg/util/sets:go_default_library",

View File

@ -35,7 +35,6 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
@ -43,27 +42,13 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
const (
// We'll attempt to recompute the required replicas of all replication controllers
// that have fulfilled their expectations at least this often. This recomputation
// happens based on contents in local pod storage.
// Full Resync shouldn't be needed at all in a healthy system. This is a protection
// against disappearing objects and watch notification, that we believe should not
// happen at all.
// TODO: We should get rid of it completely in the fullness of time.
FullControllerResyncPeriod = 10 * time.Minute
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// The number of times we retry updating a replication controller's status.
statusUpdateRetries = 1
)
@ -97,16 +82,14 @@ type ReplicationManager struct {
expectations *controller.UIDTrackingControllerExpectations
// A store of replication controllers, populated by the rcController
rcStore cache.StoreToReplicationControllerLister
// Watches changes to all replication controllers
rcController *cache.Controller
rcLister cache.StoreToReplicationControllerLister
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
podLister cache.StoreToPodLister
// Watches changes to all pods
podController cache.ControllerInterface
// podStoreSynced returns true if the pod store has been synced at least once.
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
podListerSynced func() bool
lookupCache *controller.MatchingCache
@ -118,27 +101,21 @@ type ReplicationManager struct {
garbageCollectorEnabled bool
}
// NewReplicationManager creates a replication manager
func NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
return newReplicationManager(
eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}),
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
}
// newReplicationManager configures a replication manager with the specified event recorder
func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer, rcInformer cache.SharedIndexInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
rm := &ReplicationManager{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventRecorder,
Recorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}),
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
@ -146,29 +123,14 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache
garbageCollectorEnabled: garbageCollectorEnabled,
}
rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options)
},
},
&v1.ReplicationController{},
// TODO: Can we have much longer period here?
FullControllerResyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: rm.updateRC,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
rcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: rm.updateRC,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
})
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
@ -177,31 +139,21 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
})
rm.podStore.Indexer = podInformer.GetIndexer()
rm.podController = podInformer.GetController()
rm.syncHandler = rm.syncReplicationController
rm.podStoreSynced = rm.podController.HasSynced
rm.rcLister.Indexer = rcInformer.GetIndexer()
rm.podLister.Indexer = podInformer.GetIndexer()
rm.podListerSynced = podInformer.HasSynced
rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rm
}
// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests.
func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false
rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rm.internalPodInformer = podInformer
return rm
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rcInformer := informers.NewReplicationControllerInformer(kubeClient, resyncPeriod())
rm := NewReplicationManager(podInformer, rcInformer, kubeClient, burstReplicas, lookupCacheSize, false)
rm.internalPodInformer = podInformer
return rm
}
@ -216,20 +168,23 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
// Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting RC Manager")
go rm.rcController.Run(stopCh)
go rm.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(rm.worker, time.Second, stopCh)
}
defer rm.queue.ShutDown()
glog.Infof("Starting RC Manager")
if rm.internalPodInformer != nil {
go rm.internalPodInformer.Run(stopCh)
}
if !cache.WaitForCacheSync(stopCh, rm.podListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(rm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down RC Manager")
rm.queue.ShutDown()
}
// getPodController returns the controller managing the given pod.
@ -250,7 +205,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr
}
// if not cached or cached value is invalid, search all the rc to find the matching one, and update cache
controllers, err := rm.rcStore.GetPodControllers(pod)
controllers, err := rm.rcLister.GetPodControllers(pod)
if err != nil {
glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name)
return nil
@ -276,7 +231,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr
// isCacheValid check if the cache is valid
func (rm *ReplicationManager) isCacheValid(pod *v1.Pod, cachedRC *v1.ReplicationController) bool {
_, err := rm.rcStore.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name)
_, err := rm.rcLister.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name)
// rc has been deleted or updated, cache is invalid
if err != nil || !isControllerMatch(pod, cachedRC) {
return false
@ -648,23 +603,15 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
}()
if !rm.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key)
rm.queue.Add(key)
return nil
obj, exists, err := rm.rcLister.Indexer.GetByKey(key)
if err != nil {
return err
}
obj, exists, err := rm.rcStore.Indexer.GetByKey(key)
if !exists {
glog.Infof("Replication Controller has been deleted %v", key)
rm.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
rc := *obj.(*v1.ReplicationController)
trace.Step("ReplicationController restored")
@ -678,7 +625,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
if rm.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rc's selector
// anymore but has the stale controller ref.
pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
@ -719,7 +666,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
return aggregate
}
} else {
pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/util/sets"
@ -162,12 +163,12 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
// 2 running pods, a controller with 2 replicas, sync is a no-op
controllerSpec := newReplicationController(2)
manager.rcStore.Indexer.Add(controllerSpec)
newPodList(manager.podStore.Indexer, 2, v1.PodRunning, controllerSpec, "pod")
manager.rcLister.Indexer.Add(controllerSpec)
newPodList(manager.podLister.Indexer, 2, v1.PodRunning, controllerSpec, "pod")
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(controllerSpec, t))
@ -178,13 +179,13 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
manager.podControl = &fakePodControl
// 2 running pods and a controller with 1 replica, one pod delete expected
controllerSpec := newReplicationController(1)
manager.rcStore.Indexer.Add(controllerSpec)
newPodList(manager.podStore.Indexer, 2, v1.PodRunning, controllerSpec, "pod")
manager.rcLister.Indexer.Add(controllerSpec)
newPodList(manager.podLister.Indexer, 2, v1.PodRunning, controllerSpec, "pod")
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 1, 0)
@ -194,7 +195,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
manager.podControl = &fakePodControl
received := make(chan string)
@ -206,7 +207,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
// The DeletedFinalStateUnknown object should cause the rc manager to insert
// the controller matching the selectors of the deleted pod into the work queue.
controllerSpec := newReplicationController(1)
manager.rcStore.Indexer.Add(controllerSpec)
manager.rcLister.Indexer.Add(controllerSpec)
pods := newPodList(nil, 1, v1.PodRunning, controllerSpec, "pod")
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
@ -226,11 +227,11 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
func TestSyncReplicationControllerCreates(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
// A controller with 2 replicas and no pods in the store, 2 creates expected
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
manager.rcLister.Indexer.Add(rc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -248,14 +249,14 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
// Steady state for the replication controller, no Status.Replicas updates expected
activePods := 5
rc := newReplicationController(activePods)
manager.rcStore.Indexer.Add(rc)
manager.rcLister.Indexer.Add(rc)
rc.Status = v1.ReplicationControllerStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)}
newPodList(manager.podStore.Indexer, activePods, v1.PodRunning, rc, "pod")
newPodList(manager.podLister.Indexer, activePods, v1.PodRunning, rc, "pod")
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -289,19 +290,19 @@ func TestControllerUpdateReplicas(t *testing.T) {
defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
// Insufficient number of pods in the system, and Status.Replicas is wrong;
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(5)
manager.rcStore.Indexer.Add(rc)
manager.rcLister.Indexer.Add(rc)
rc.Status = v1.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0}
rc.Generation = 1
newPodList(manager.podStore.Indexer, 2, v1.PodRunning, rc, "pod")
newPodList(manager.podLister.Indexer, 2, v1.PodRunning, rc, "pod")
rcCopy := *rc
extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
rcCopy.Spec.Selector = extraLabelMap
newPodList(manager.podStore.Indexer, 2, v1.PodRunning, &rcCopy, "podWithExtraLabel")
newPodList(manager.podLister.Indexer, 2, v1.PodRunning, &rcCopy, "podWithExtraLabel")
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{})
@ -335,12 +336,12 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(2)
manager.rcStore.Indexer.Add(controllerSpec)
newPodList(manager.podStore.Indexer, 1, v1.PodRunning, controllerSpec, "pod")
manager.rcLister.Indexer.Add(controllerSpec)
newPodList(manager.podLister.Indexer, 1, v1.PodRunning, controllerSpec, "pod")
// Creates a replica and sets expectations
controllerSpec.Status.Replicas = 1
@ -388,7 +389,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
func TestPodControllerLookup(t *testing.T) {
manager := NewReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}}), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
testCases := []struct {
inRCs []*v1.ReplicationController
pod *v1.Pod
@ -434,7 +435,7 @@ func TestPodControllerLookup(t *testing.T) {
}
for _, c := range testCases {
for _, r := range c.inRCs {
manager.rcStore.Indexer.Add(r)
manager.rcLister.Indexer.Add(r)
}
if rc := manager.getPodController(c.pod); rc != nil {
if c.outRCName != rc.Name {
@ -449,9 +450,15 @@ func TestPodControllerLookup(t *testing.T) {
func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake()
c := &fake.Clientset{}
c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
c.AddWatchReactor("replicationcontrollers", core.DefaultWatchReactor(fakeWatch, nil))
stopCh := make(chan struct{})
defer close(stopCh)
informers := informers.NewSharedInformerFactory(c, nil, controller.NoResyncPeriodFunc())
podInformer := informers.Pods().Informer()
rcInformer := informers.ReplicationControllers().Informer()
manager := NewReplicationManager(podInformer, rcInformer, c, BurstReplicas, 0, false)
informers.Start(stopCh)
manager.podListerSynced = alwaysReady
var testControllerSpec v1.ReplicationController
received := make(chan string)
@ -460,8 +467,7 @@ func TestWatchControllers(t *testing.T) {
// and eventually into the syncHandler. The handler validates the received controller
// and closes the received channel to indicate that the test can finish.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rcStore.Indexer.GetByKey(key)
obj, exists, err := manager.rcLister.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find controller under key %v", key)
}
@ -472,11 +478,9 @@ func TestWatchControllers(t *testing.T) {
close(received)
return nil
}
// Start only the rc watcher and the workqueue, send a watch event,
// and make sure it hits the sync method.
stopCh := make(chan struct{})
defer close(stopCh)
go manager.rcController.Run(stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
testControllerSpec.Name = "foo"
@ -494,17 +498,17 @@ func TestWatchPods(t *testing.T) {
c := &fake.Clientset{}
c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
// Put one rc and one pod into the controller's stores
testControllerSpec := newReplicationController(1)
manager.rcStore.Indexer.Add(testControllerSpec)
manager.rcLister.Indexer.Add(testControllerSpec)
received := make(chan string)
// The pod update sent through the fakeWatcher should figure out the managing rc and
// send it into the syncHandler.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rcStore.Indexer.GetByKey(key)
obj, exists, err := manager.rcLister.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find controller under key %v", key)
}
@ -519,7 +523,6 @@ func TestWatchPods(t *testing.T) {
// and make sure it hits the sync method for the right rc.
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go manager.internalPodInformer.Run(stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
@ -537,12 +540,12 @@ func TestWatchPods(t *testing.T) {
func TestUpdatePods(t *testing.T) {
manager := NewReplicationManagerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
received := make(chan string)
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rcStore.Indexer.GetByKey(key)
obj, exists, err := manager.rcLister.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find controller under key %v", key)
}
@ -556,16 +559,16 @@ func TestUpdatePods(t *testing.T) {
// Put 2 rcs and one pod into the controller's stores
testControllerSpec1 := newReplicationController(1)
manager.rcStore.Indexer.Add(testControllerSpec1)
manager.rcLister.Indexer.Add(testControllerSpec1)
testControllerSpec2 := *testControllerSpec1
testControllerSpec2.Spec.Selector = map[string]string{"bar": "foo"}
testControllerSpec2.Name = "barfoo"
manager.rcStore.Indexer.Add(&testControllerSpec2)
manager.rcLister.Indexer.Add(&testControllerSpec2)
// case 1: We put in the podStore a pod with labels matching
// case 1: We put in the podLister a pod with labels matching
// testControllerSpec1, then update its labels to match testControllerSpec2.
// We expect to receive a sync request for both controllers.
pod1 := newPodList(manager.podStore.Indexer, 1, v1.PodRunning, testControllerSpec1, "pod").Items[0]
pod1 := newPodList(manager.podLister.Indexer, 1, v1.PodRunning, testControllerSpec1, "pod").Items[0]
pod1.ResourceVersion = "1"
pod2 := pod1
pod2.Labels = testControllerSpec2.Spec.Selector
@ -584,7 +587,7 @@ func TestUpdatePods(t *testing.T) {
}
}
// case 2: pod1 in the podStore has labels matching testControllerSpec1.
// case 2: pod1 in the podLister has labels matching testControllerSpec1.
// We update its labels to match no replication controller. We expect to
// receive a sync request for testControllerSpec1.
pod2.Labels = make(map[string]string)
@ -615,12 +618,12 @@ func TestControllerUpdateRequeue(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
rc := newReplicationController(1)
manager.rcStore.Indexer.Add(rc)
manager.rcLister.Indexer.Add(rc)
rc.Status = v1.ReplicationControllerStatus{Replicas: 2}
newPodList(manager.podStore.Indexer, 1, v1.PodRunning, rc, "pod")
newPodList(manager.podLister.Indexer, 1, v1.PodRunning, rc, "pod")
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -686,11 +689,11 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, burstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(numReplicas)
manager.rcStore.Indexer.Add(controllerSpec)
manager.rcLister.Indexer.Add(controllerSpec)
expectedPods := 0
pods := newPodList(nil, numReplicas, v1.PodPending, controllerSpec, "pod")
@ -704,14 +707,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
for _, replicas := range []int{numReplicas, 0} {
*(controllerSpec.Spec.Replicas) = int32(replicas)
manager.rcStore.Indexer.Add(controllerSpec)
manager.rcLister.Indexer.Add(controllerSpec)
for i := 0; i < numReplicas; i += burstReplicas {
manager.syncReplicationController(getKey(controllerSpec, t))
// The store accrues active pods. It's also used by the rc to determine how many
// replicas to create.
activePods := len(manager.podStore.Indexer.List())
activePods := len(manager.podLister.Indexer.List())
if replicas != 0 {
// This is the number of pods currently "in flight". They were created by the rc manager above,
// which then puts the rc to sleep till all of them have been observed.
@ -725,7 +728,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// This simulates the watch events for all but 1 of the expected pods.
// None of these should wake the controller because it has expectations==BurstReplicas.
for i := 0; i < expectedPods-1; i++ {
manager.podStore.Indexer.Add(&pods.Items[i])
manager.podLister.Indexer.Add(&pods.Items[i])
manager.addPod(&pods.Items[i])
}
@ -761,7 +764,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// has exactly one expectation at the end, to verify that we
// don't double delete.
for i := range podsToDelete[1:] {
manager.podStore.Indexer.Delete(podsToDelete[i])
manager.podLister.Indexer.Delete(podsToDelete[i])
manager.deletePod(podsToDelete[i])
}
podExp, exists, err := manager.expectations.GetExpectations(rcKey)
@ -782,7 +785,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// The last add pod will decrease the expectation of the rc to 0,
// which will cause it to create/delete the remaining replicas up to burstReplicas.
if replicas != 0 {
manager.podStore.Indexer.Add(&pods.Items[expectedPods-1])
manager.podLister.Indexer.Add(&pods.Items[expectedPods-1])
manager.addPod(&pods.Items[expectedPods-1])
} else {
expectedDel := manager.expectations.GetUIDs(getKey(controllerSpec, t))
@ -797,14 +800,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
Labels: controllerSpec.Spec.Selector,
},
}
manager.podStore.Indexer.Delete(lastPod)
manager.podLister.Indexer.Delete(lastPod)
manager.deletePod(lastPod)
}
pods.Items = pods.Items[expectedPods:]
}
// Confirm that we've created the right number of replicas
activePods := int32(len(manager.podStore.Indexer.List()))
activePods := int32(len(manager.podLister.Indexer.List()))
if activePods != *(controllerSpec.Spec.Replicas) {
t.Fatalf("Unexpected number of active pods, expected %d, got %d", *(controllerSpec.Spec.Replicas), activePods)
}
@ -836,13 +839,13 @@ func TestRCSyncExpectations(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(2)
manager.rcStore.Indexer.Add(controllerSpec)
manager.rcLister.Indexer.Add(controllerSpec)
pods := newPodList(nil, 2, v1.PodPending, controllerSpec, "pod")
manager.podStore.Indexer.Add(&pods.Items[0])
manager.podLister.Indexer.Add(&pods.Items[0])
postExpectationsPod := pods.Items[1]
manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRCExpectations{
@ -850,7 +853,7 @@ func TestRCSyncExpectations(t *testing.T) {
// If we check active pods before checking expectataions, the rc
// will create a new replica because it doesn't see this pod, but
// has fulfilled its expectations.
manager.podStore.Indexer.Add(&postExpectationsPod)
manager.podLister.Indexer.Add(&postExpectationsPod)
},
})
manager.syncReplicationController(getKey(controllerSpec, t))
@ -860,10 +863,10 @@ func TestRCSyncExpectations(t *testing.T) {
func TestDeleteControllerAndExpectations(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
rc := newReplicationController(1)
manager.rcStore.Indexer.Add(rc)
manager.rcLister.Indexer.Add(rc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -885,7 +888,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
if !exists || err != nil {
t.Errorf("No expectations found for rc")
}
manager.rcStore.Indexer.Delete(rc)
manager.rcLister.Indexer.Delete(rc)
manager.syncReplicationController(getKey(rc, t))
if _, exists, err = manager.expectations.GetExpectations(rcKey); exists {
@ -894,37 +897,11 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
// This should have no effect, since we've deleted the rc.
podExp.Add(-1, 0)
manager.podStore.Indexer.Replace(make([]interface{}, 0), "0")
manager.podLister.Indexer.Replace(make([]interface{}, 0), "0")
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
}
func TestRCManagerNotReady(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0)
manager.podControl = &fakePodControl
manager.podStoreSynced = func() bool { return false }
// Simulates the rc reflector running before the pod reflector. We don't
// want to end up creating replicas in this case until the pod reflector
// has synced, so the rc manager should just requeue the rc.
controllerSpec := newReplicationController(1)
manager.rcStore.Indexer.Add(controllerSpec)
rcKey := getKey(controllerSpec, t)
manager.syncReplicationController(rcKey)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
queueRC, _ := manager.queue.Get()
if queueRC != rcKey {
t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
}
manager.podStoreSynced = alwaysReady
manager.syncReplicationController(rcKey)
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
}
// shuffle returns a new shuffled list of container controllers.
func shuffle(controllers []*v1.ReplicationController) []*v1.ReplicationController {
numControllers := len(controllers)
@ -941,7 +918,7 @@ func TestOverlappingRCs(t *testing.T) {
for i := 0; i < 5; i++ {
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
// Create 10 rcs, shuffled them randomly and insert them into the rc manager's store
var controllers []*v1.ReplicationController
@ -953,7 +930,7 @@ func TestOverlappingRCs(t *testing.T) {
}
shuffledControllers := shuffle(controllers)
for j := range shuffledControllers {
manager.rcStore.Indexer.Add(shuffledControllers[j])
manager.rcLister.Indexer.Add(shuffledControllers[j])
}
// Add a pod and make sure only the oldest rc is synced
pods := newPodList(nil, 1, v1.PodPending, controllers[0], "pod")
@ -970,10 +947,10 @@ func TestOverlappingRCs(t *testing.T) {
func TestDeletionTimestamp(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
controllerSpec := newReplicationController(1)
manager.rcStore.Indexer.Add(controllerSpec)
manager.rcLister.Indexer.Add(controllerSpec)
rcKey, err := controller.KeyFunc(controllerSpec)
if err != nil {
t.Errorf("Couldn't get key for object %#v: %v", controllerSpec, err)
@ -1085,7 +1062,7 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) {
ns := fmt.Sprintf("ns-%d", i)
for j := 0; j < 10; j++ {
rcName := fmt.Sprintf("rc-%d", j)
manager.rcStore.Indexer.Add(&v1.ReplicationController{
manager.rcLister.Indexer.Add(&v1.ReplicationController{
ObjectMeta: v1.ObjectMeta{Name: rcName, Namespace: ns},
Spec: v1.ReplicationControllerSpec{
Selector: map[string]string{"rcName": rcName},
@ -1127,7 +1104,7 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) {
for i := 0; i < rcNum; i++ {
rcName := fmt.Sprintf("rc-%d", i)
manager.rcStore.Indexer.Add(&v1.ReplicationController{
manager.rcLister.Indexer.Add(&v1.ReplicationController{
ObjectMeta: v1.ObjectMeta{Name: rcName, Namespace: "foo"},
Spec: v1.ReplicationControllerSpec{
Selector: map[string]string{"rcName": rcName},
@ -1149,7 +1126,7 @@ func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationMana
fakePodControl = &controller.FakePodControl{}
manager = NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.garbageCollectorEnabled = true
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
manager.podControl = fakePodControl
return manager, fakePodControl
}
@ -1157,13 +1134,13 @@ func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationMana
func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
manager.rcLister.Indexer.Add(rc)
var trueVar = true
otherControllerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1", Kind: "ReplicationController", Name: "AnotherRC", Controller: &trueVar}
// add to podStore a matching Pod controlled by another controller. Expect no patch.
// add to podLister a matching Pod controlled by another controller. Expect no patch.
pod := newPod("pod", rc, v1.PodRunning, nil)
pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference}
manager.podStore.Indexer.Add(pod)
manager.podLister.Indexer.Add(pod)
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
t.Fatal(err)
@ -1175,14 +1152,14 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
func TestPatchPodWithOtherOwnerRef(t *testing.T) {
rc := newReplicationController(2)
manager, fakePodControl := setupManagerWithGCEnabled(rc)
manager.rcStore.Indexer.Add(rc)
// add to podStore one more matching pod that doesn't have a controller
manager.rcLister.Indexer.Add(rc)
// add to podLister one more matching pod that doesn't have a controller
// ref, but has an owner ref pointing to other object. Expect a patch to
// take control of it.
unrelatedOwnerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"}
pod := newPod("pod", rc, v1.PodRunning, nil)
pod.OwnerReferences = []metav1.OwnerReference{unrelatedOwnerReference}
manager.podStore.Indexer.Add(pod)
manager.podLister.Indexer.Add(pod)
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
@ -1195,13 +1172,13 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) {
func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
rc := newReplicationController(2)
manager, fakePodControl := setupManagerWithGCEnabled(rc)
manager.rcStore.Indexer.Add(rc)
// add to podStore a matching pod that has an ownerRef pointing to the rc,
manager.rcLister.Indexer.Add(rc)
// add to podLister a matching pod that has an ownerRef pointing to the rc,
// but ownerRef.Controller is false. Expect a patch to take control it.
rcOwnerReference := metav1.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name}
pod := newPod("pod", rc, v1.PodRunning, nil)
pod.OwnerReferences = []metav1.OwnerReference{rcOwnerReference}
manager.podStore.Indexer.Add(pod)
manager.podLister.Indexer.Add(pod)
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
@ -1214,11 +1191,11 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
func TestPatchPodFails(t *testing.T) {
rc := newReplicationController(2)
manager, fakePodControl := setupManagerWithGCEnabled(rc)
manager.rcStore.Indexer.Add(rc)
// add to podStore two matching pods. Expect two patches to take control
manager.rcLister.Indexer.Add(rc)
// add to podLister two matching pods. Expect two patches to take control
// them.
manager.podStore.Indexer.Add(newPod("pod1", rc, v1.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod2", rc, v1.PodRunning, nil))
manager.podLister.Indexer.Add(newPod("pod1", rc, v1.PodRunning, nil))
manager.podLister.Indexer.Add(newPod("pod2", rc, v1.PodRunning, nil))
// let both patches fail. The rc manager will assume it fails to take
// control of the pods and create new ones.
fakePodControl.Err = fmt.Errorf("Fake Error")
@ -1233,12 +1210,12 @@ func TestPatchPodFails(t *testing.T) {
func TestPatchExtraPodsThenDelete(t *testing.T) {
rc := newReplicationController(2)
manager, fakePodControl := setupManagerWithGCEnabled(rc)
manager.rcStore.Indexer.Add(rc)
// add to podStore three matching pods. Expect three patches to take control
manager.rcLister.Indexer.Add(rc)
// add to podLister three matching pods. Expect three patches to take control
// them, and later delete one of them.
manager.podStore.Indexer.Add(newPod("pod1", rc, v1.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod2", rc, v1.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod3", rc, v1.PodRunning, nil))
manager.podLister.Indexer.Add(newPod("pod1", rc, v1.PodRunning, nil))
manager.podLister.Indexer.Add(newPod("pod2", rc, v1.PodRunning, nil))
manager.podLister.Indexer.Add(newPod("pod3", rc, v1.PodRunning, nil))
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
t.Fatal(err)
@ -1250,8 +1227,8 @@ func TestPatchExtraPodsThenDelete(t *testing.T) {
func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
// put one pod in the podStore
manager.rcLister.Indexer.Add(rc)
// put one pod in the podLister
pod := newPod("pod", rc, v1.PodRunning, nil)
pod.ResourceVersion = "1"
var trueVar = true
@ -1264,7 +1241,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
// add the updatedPod to the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler
// (updatePod() in this case).
manager.podStore.Indexer.Add(&updatedPod)
manager.podLister.Indexer.Add(&updatedPod)
// send a update of the same pod with modified labels
manager.updatePod(pod, &updatedPod)
// verifies that rc is added to the queue
@ -1288,15 +1265,15 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
func TestUpdateSelectorControllerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
// put 2 pods in the podStore
newPodList(manager.podStore.Indexer, 2, v1.PodRunning, rc, "pod")
// put 2 pods in the podLister
newPodList(manager.podLister.Indexer, 2, v1.PodRunning, rc, "pod")
// update the RC so that its selector no longer matches the pods
updatedRC := *rc
updatedRC.Spec.Selector = map[string]string{"foo": "baz"}
// put the updatedRC into the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler
// (updateRC() in this case).
manager.rcStore.Indexer.Add(&updatedRC)
manager.rcLister.Indexer.Add(&updatedRC)
manager.updateRC(rc, &updatedRC)
// verifies that the rc is added to the queue
rcKey := getKey(rc, t)
@ -1323,9 +1300,9 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
rc := newReplicationController(2)
now := metav1.Now()
rc.DeletionTimestamp = &now
manager.rcStore.Indexer.Add(rc)
manager.rcLister.Indexer.Add(rc)
pod1 := newPod("pod1", rc, v1.PodRunning, nil)
manager.podStore.Indexer.Add(pod1)
manager.podLister.Indexer.Add(pod1)
// no patch, no create
err := manager.syncReplicationController(getKey(rc, t))
@ -1346,16 +1323,16 @@ func TestReadyReplicas(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(2)
rc.Status = v1.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1}
rc.Generation = 1
manager.rcStore.Indexer.Add(rc)
manager.rcLister.Indexer.Add(rc)
newPodList(manager.podStore.Indexer, 2, v1.PodPending, rc, "pod")
newPodList(manager.podStore.Indexer, 2, v1.PodRunning, rc, "pod")
newPodList(manager.podLister.Indexer, 2, v1.PodPending, rc, "pod")
newPodList(manager.podLister.Indexer, 2, v1.PodRunning, rc, "pod")
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{})
@ -1385,7 +1362,7 @@ func TestAvailableReplicas(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(v1.GroupName).GroupVersion}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podListerSynced = alwaysReady
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(2)
@ -1393,17 +1370,17 @@ func TestAvailableReplicas(t *testing.T) {
rc.Generation = 1
// minReadySeconds set to 15s
rc.Spec.MinReadySeconds = 15
manager.rcStore.Indexer.Add(rc)
manager.rcLister.Indexer.Add(rc)
// First pod becomes ready 20s ago
moment := metav1.Time{Time: time.Now().Add(-2e10)}
pod := newPod("pod", rc, v1.PodRunning, &moment)
manager.podStore.Indexer.Add(pod)
manager.podLister.Indexer.Add(pod)
// Second pod becomes ready now
otherMoment := metav1.Now()
otherPod := newPod("otherPod", rc, v1.PodRunning, &otherMoment)
manager.podStore.Indexer.Add(otherPod)
manager.podLister.Indexer.Add(otherPod)
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.ReplicationController{})

View File

@ -250,7 +250,7 @@ func ClusterRoles() []rbac.ClusterRole {
rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "serviceaccounts").RuleOrDie(),
rbac.NewRule("list", "watch").Groups("*").Resources("namespaces", "nodes", "persistentvolumeclaims",
"persistentvolumes", "pods", "secrets", "serviceaccounts").RuleOrDie(),
"persistentvolumes", "pods", "secrets", "serviceaccounts", "replicationcontrollers").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(),
},

View File

@ -447,6 +447,7 @@ items:
- persistentvolumeclaims
- persistentvolumes
- pods
- replicationcontrollers
- secrets
- serviceaccounts
verbs:

View File

@ -31,8 +31,10 @@ import (
"k8s.io/kubernetes/pkg/apimachinery/registered"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
"k8s.io/kubernetes/pkg/fields"
@ -82,8 +84,13 @@ func TestQuota(t *testing.T) {
controllerCh := make(chan struct{})
defer close(controllerCh)
go replicationcontroller.NewReplicationManagerFromClientForIntegration(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
Run(3, controllerCh)
informers := informers.NewSharedInformerFactory(clientset, nil, controller.NoResyncPeriodFunc())
podInformer := informers.Pods().Informer()
rcInformer := informers.ReplicationControllers().Informer()
rm := replicationcontroller.NewReplicationManager(podInformer, rcInformer, clientset, replicationcontroller.BurstReplicas, 4096, false)
rm.SetEventRecorder(&record.FakeRecorder{})
informers.Start(controllerCh)
go rm.Run(3, controllerCh)
resourceQuotaRegistry := quotainstall.NewRegistry(clientset, nil)
groupKindsToReplenish := []schema.GroupKind{

View File

@ -123,7 +123,7 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa
return ret, nil
}
func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replication.ReplicationManager, cache.SharedIndexInformer, clientset.Interface) {
func rmSetup(t *testing.T, stopCh chan struct{}, enableGarbageCollector bool) (*httptest.Server, *replication.ReplicationManager, cache.SharedIndexInformer, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
_, s := framework.RunAMaster(masterConfig)
@ -133,22 +133,13 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl
t.Fatalf("Error in create clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
resyncPeriodFunc := func() time.Duration {
return resyncPeriod
}
podInformer := informers.NewPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
rm := replication.NewReplicationManager(
podInformer,
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")),
resyncPeriodFunc,
replication.BurstReplicas,
4096,
enableGarbageCollector,
)
if err != nil {
t.Fatalf("Failed to create replication manager")
}
informers := informers.NewSharedInformerFactory(clientSet, nil, resyncPeriod)
podInformer := informers.Pods().Informer()
rcInformer := informers.ReplicationControllers().Informer()
rm := replication.NewReplicationManager(podInformer, rcInformer, clientSet, replication.BurstReplicas, 4096, enableGarbageCollector)
informers.Start(stopCh)
return s, rm, podInformer, clientSet
}
@ -219,7 +210,8 @@ func TestAdoption(t *testing.T) {
},
}
for i, tc := range testCases {
s, rm, podInformer, clientSet := rmSetup(t, true)
stopCh := make(chan struct{})
s, rm, podInformer, clientSet := rmSetup(t, stopCh, true)
ns := framework.CreateTestingNamespace(fmt.Sprintf("adoption-%d", i), s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
@ -238,7 +230,6 @@ func TestAdoption(t *testing.T) {
t.Fatalf("Failed to create Pod: %v", err)
}
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
waitToObservePods(t, podInformer, 1)
go rm.Run(5, stopCh)
@ -296,7 +287,8 @@ func TestUpdateSelectorToAdopt(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=1. At first rc.Selector
// matches pod1 only; change the selector to match pod2 as well. Verify
// there is only one pod left.
s, rm, podInformer, clientSet := rmSetup(t, true)
stopCh := make(chan struct{})
s, rm, _, clientSet := rmSetup(t, stopCh, true)
ns := framework.CreateTestingNamespace("update-selector-to-adopt", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 1)
@ -309,8 +301,6 @@ func TestUpdateSelectorToAdopt(t *testing.T) {
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
@ -336,7 +326,8 @@ func TestUpdateSelectorToRemoveControllerRef(t *testing.T) {
// matches pod1 and pod2; change the selector to match only pod1. Verify
// that rc creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true)
stopCh := make(chan struct{})
s, rm, podInformer, clientSet := rmSetup(t, stopCh, true)
ns := framework.CreateTestingNamespace("update-selector-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 2)
@ -346,8 +337,6 @@ func TestUpdateSelectorToRemoveControllerRef(t *testing.T) {
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
waitToObservePods(t, podInformer, 2)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
@ -382,7 +371,8 @@ func TestUpdateLabelToRemoveControllerRef(t *testing.T) {
// matches pod1 and pod2; change pod2's labels to non-matching. Verify
// that rc creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true)
stopCh := make(chan struct{})
s, rm, _, clientSet := rmSetup(t, stopCh, true)
ns := framework.CreateTestingNamespace("update-label-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 2)
@ -390,8 +380,6 @@ func TestUpdateLabelToRemoveControllerRef(t *testing.T) {
pod2 := newMatchingPod("pod2", ns.Name)
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
@ -424,7 +412,8 @@ func TestUpdateLabelToBeAdopted(t *testing.T) {
// matches pod1 only; change pod2's labels to be matching. Verify the RC
// controller adopts pod2 and delete one of them, so there is only 1 pod
// left.
s, rm, podInformer, clientSet := rmSetup(t, true)
stopCh := make(chan struct{})
s, rm, _, clientSet := rmSetup(t, stopCh, true)
ns := framework.CreateTestingNamespace("update-label-to-be-adopted", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 1)
@ -437,8 +426,6 @@ func TestUpdateLabelToBeAdopted(t *testing.T) {
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)