update replicaset lister

This commit is contained in:
deads2k
2016-10-04 13:23:27 -04:00
parent 358a57d74a
commit c30b2efc46
22 changed files with 249 additions and 213 deletions

View File

@@ -142,7 +142,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac
garbageCollectorEnabled: garbageCollectorEnabled,
}
rsc.rsStore.Store, rsc.rsController = cache.NewInformer(
rsc.rsStore.Indexer, rsc.rsController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options)
@@ -162,6 +162,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -251,9 +252,9 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl
}
// update lookup cache
rsc.lookupCache.Update(pod, &rss[0])
rsc.lookupCache.Update(pod, rss[0])
return &rss[0]
return rss[0]
}
// callback when RS is updated
@@ -296,9 +297,9 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
// isCacheValid check if the cache is valid
func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool {
_, exists, err := rsc.rsStore.Get(cachedRS)
_, err := rsc.rsStore.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name)
// rs has been deleted or updated, cache is invalid
if err != nil || !exists || !isReplicaSetMatch(pod, cachedRS) {
if err != nil || !isReplicaSetMatch(pod, cachedRS) {
return false
}
return true
@@ -564,7 +565,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
return nil
}
obj, exists, err := rsc.rsStore.Store.GetByKey(key)
obj, exists, err := rsc.rsStore.Indexer.GetByKey(key)
if !exists {
glog.Infof("ReplicaSet has been deleted %v", key)
rsc.expectations.DeleteExpectations(key)

View File

@@ -165,7 +165,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
// 2 running pods, a controller with 2 replicas, sync is a no-op
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
manager.podControl = &fakePodControl
@@ -183,7 +183,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) {
// 2 running pods and a controller with 1 replica, one pod delete expected
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
manager.syncReplicaSet(getKey(rsSpec, t))
@@ -207,7 +207,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
// the controller matching the selectors of the deleted pod into the work queue.
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec, "pod")
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
@@ -232,7 +232,7 @@ func TestSyncReplicaSetCreates(t *testing.T) {
// A controller with 2 replicas and no pods in the store, 2 creates expected
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@@ -256,7 +256,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
activePods := 5
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(activePods, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)}
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod")
@@ -301,7 +301,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
rs := newReplicaSet(5, labelMap)
rs.Spec.Template.Labels = extraLabelMap
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0}
rs.Generation = 1
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
@@ -344,7 +344,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod")
// Creates a replica and sets expectations
@@ -439,7 +439,7 @@ func TestPodControllerLookup(t *testing.T) {
}
for _, c := range testCases {
for _, r := range c.inRSs {
manager.rsStore.Add(r)
manager.rsStore.Indexer.Add(r)
}
if rs := manager.getPodReplicaSet(c.pod); rs != nil {
if c.outRSName != rs.Name {
@@ -471,7 +471,7 @@ func TestWatchControllers(t *testing.T) {
// and closes the received channel to indicate that the test can finish.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rsStore.Store.GetByKey(key)
obj, exists, err := manager.rsStore.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find replica set under key %v", key)
}
@@ -509,13 +509,13 @@ func TestWatchPods(t *testing.T) {
// Put one ReplicaSet and one pod into the controller's stores
labelMap := map[string]string{"foo": "bar"}
testRSSpec := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(testRSSpec)
manager.rsStore.Indexer.Add(testRSSpec)
received := make(chan string)
// The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and
// send it into the syncHandler.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rsStore.Store.GetByKey(key)
obj, exists, err := manager.rsStore.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find replica set under key %v", key)
}
@@ -553,7 +553,7 @@ func TestUpdatePods(t *testing.T) {
received := make(chan string)
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rsStore.Store.GetByKey(key)
obj, exists, err := manager.rsStore.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find replica set under key %v", key)
}
@@ -568,12 +568,12 @@ func TestUpdatePods(t *testing.T) {
// Put 2 ReplicaSets and one pod into the controller's stores
labelMap1 := map[string]string{"foo": "bar"}
testRSSpec1 := newReplicaSet(1, labelMap1)
manager.rsStore.Store.Add(testRSSpec1)
manager.rsStore.Indexer.Add(testRSSpec1)
testRSSpec2 := *testRSSpec1
labelMap2 := map[string]string{"bar": "foo"}
testRSSpec2.Spec.Selector = &unversioned.LabelSelector{MatchLabels: labelMap2}
testRSSpec2.Name = "barfoo"
manager.rsStore.Store.Add(&testRSSpec2)
manager.rsStore.Indexer.Add(&testRSSpec2)
// case 1: We put in the podStore a pod with labels matching testRSSpec1,
// then update its labels to match testRSSpec2. We expect to receive a sync
@@ -632,7 +632,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2}
newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rs, "pod")
@@ -714,7 +714,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(numReplicas, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
expectedPods := int32(0)
pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec, "pod")
@@ -728,7 +728,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
for _, replicas := range []int32{int32(numReplicas), 0} {
rsSpec.Spec.Replicas = replicas
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
for i := 0; i < numReplicas; i += burstReplicas {
manager.syncReplicaSet(getKey(rsSpec, t))
@@ -866,7 +866,7 @@ func TestRSSyncExpectations(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod")
manager.podStore.Indexer.Add(&pods.Items[0])
postExpectationsPod := pods.Items[1]
@@ -889,7 +889,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
manager.podStoreSynced = alwaysReady
rs := newReplicaSet(1, map[string]string{"foo": "bar"})
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@@ -911,7 +911,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
if !exists || err != nil {
t.Errorf("No expectations found for ReplicaSet")
}
manager.rsStore.Delete(rs)
manager.rsStore.Indexer.Delete(rs)
manager.syncReplicaSet(getKey(rs, t))
if _, exists, err = manager.expectations.GetExpectations(rsKey); exists {
@@ -936,7 +936,7 @@ func TestRSManagerNotReady(t *testing.T) {
// want to end up creating replicas in this case until the pod reflector
// has synced, so the ReplicaSet controller should just requeue the ReplicaSet.
rsSpec := newReplicaSet(1, map[string]string{"foo": "bar"})
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
rsKey := getKey(rsSpec, t)
manager.syncReplicaSet(rsKey)
@@ -980,7 +980,7 @@ func TestOverlappingRSs(t *testing.T) {
}
shuffledControllers := shuffle(controllers)
for j := range shuffledControllers {
manager.rsStore.Store.Add(shuffledControllers[j])
manager.rsStore.Indexer.Add(shuffledControllers[j])
}
// Add a pod and make sure only the oldest ReplicaSet is synced
pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod")
@@ -1001,7 +1001,7 @@ func TestDeletionTimestamp(t *testing.T) {
manager.podStoreSynced = alwaysReady
rs := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
rsKey, err := controller.KeyFunc(rs)
if err != nil {
t.Errorf("Couldn't get key for object %#v: %v", rs, err)
@@ -1101,7 +1101,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
var trueVar = true
otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
// add to podStore a matching Pod controlled by another controller. Expect no patch.
@@ -1120,7 +1120,7 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// add to podStore one more matching pod that doesn't have a controller
// ref, but has an owner ref pointing to other object. Expect a patch to
// take control of it.
@@ -1141,7 +1141,7 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// add to podStore a matching pod that has an ownerRef pointing to the rs,
// but ownerRef.Controller is false. Expect a patch to take control it.
rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name}
@@ -1161,7 +1161,7 @@ func TestPatchPodFails(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// add to podStore two matching pods. Expect two patches to take control
// them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil))
@@ -1181,7 +1181,7 @@ func TestPatchExtraPodsThenDelete(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// add to podStore three matching pods. Expect three patches to take control
// them, and later delete one of them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil))
@@ -1199,7 +1199,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// put one pod in the podStore
pod := newPod("pod", rs, api.PodRunning, nil)
pod.ResourceVersion = "1"
@@ -1246,7 +1246,7 @@ func TestUpdateSelectorControllerRef(t *testing.T) {
// put the updatedRS into the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler
// (updateRS() in this case).
manager.rsStore.Store.Add(&updatedRS)
manager.rsStore.Indexer.Add(&updatedRS)
manager.updateRS(rs, &updatedRS)
// verifies that the rs is added to the queue
rsKey := getKey(rs, t)
@@ -1274,7 +1274,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
rs := newReplicaSet(2, labelMap)
now := unversioned.Now()
rs.DeletionTimestamp = &now
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
pod1 := newPod("pod1", rs, api.PodRunning, nil)
manager.podStore.Indexer.Add(pod1)
@@ -1304,7 +1304,7 @@ func TestReadyReplicas(t *testing.T) {
rs := newReplicaSet(2, labelMap)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1}
rs.Generation = 1
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
newPodList(manager.podStore.Indexer, 2, api.PodPending, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
@@ -1346,7 +1346,7 @@ func TestAvailableReplicas(t *testing.T) {
rs.Generation = 1
// minReadySeconds set to 15s
rs.Spec.MinReadySeconds = 15
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// First pod becomes ready 20s ago
moment := unversioned.Time{Time: time.Now().Add(-2e10)}

View File

@@ -74,7 +74,7 @@ func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.Repli
}
// overlappingReplicaSets sorts a list of ReplicaSets by creation timestamp, using their names as a tie breaker.
type overlappingReplicaSets []extensions.ReplicaSet
type overlappingReplicaSets []*extensions.ReplicaSet
func (o overlappingReplicaSets) Len() int { return len(o) }
func (o overlappingReplicaSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] }