diff --git a/pkg/controller/replicaset/BUILD b/pkg/controller/replicaset/BUILD index f2be3ee7f53..1bed30f7336 100644 --- a/pkg/controller/replicaset/BUILD +++ b/pkg/controller/replicaset/BUILD @@ -46,6 +46,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "init_test.go", "replica_set_test.go", "replica_set_utils_test.go", ], @@ -72,6 +73,7 @@ go_test( "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/controller/replicaset/init_test.go b/pkg/controller/replicaset/init_test.go new file mode 100644 index 00000000000..72f29bd2613 --- /dev/null +++ b/pkg/controller/replicaset/init_test.go @@ -0,0 +1,25 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicaset + +import ( + "k8s.io/klog" +) + +func init() { + klog.InitFlags(nil) +} diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 37b51df9158..d1a5b3c0345 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -140,12 +140,9 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer } rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: rsc.enqueueReplicaSet, + AddFunc: rsc.addRS, UpdateFunc: rsc.updateRS, - // This will enter the sync loop and no-op, because the replica set has been deleted from the store. - // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended - // way of achieving this is by performing a `stop` operation on the replica set. - DeleteFunc: rsc.enqueueReplicaSet, + DeleteFunc: rsc.deleteRS, }) rsc.rsLister = rsInformer.Lister() rsc.rsListerSynced = rsInformer.Informer().HasSynced @@ -266,11 +263,50 @@ func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controll return rs } +func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) { + key, err := controller.KeyFunc(rs) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err)) + return + } + + rsc.queue.Add(key) +} + +func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration time.Duration) { + key, err := controller.KeyFunc(rs) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err)) + return + } + + rsc.queue.AddAfter(key, duration) +} + +func (rsc *ReplicaSetController) addRS(obj interface{}) { + rs := obj.(*apps.ReplicaSet) + klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name) + rsc.enqueueRS(rs) +} + // callback when RS is updated func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { oldRS := old.(*apps.ReplicaSet) curRS := cur.(*apps.ReplicaSet) + // TODO: make a KEP and fix informers to always call the delete event handler on re-create + if curRS.UID != oldRS.UID { + key, err := controller.KeyFunc(oldRS) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err)) + return + } + rsc.deleteRS(cache.DeletedFinalStateUnknown{ + Key: key, + Obj: oldRS, + }) + } + // You might imagine that we only really need to enqueue the // replica set when Spec changes, but it is safer to sync any // time this function is triggered. That way a full informer @@ -286,7 +322,36 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) { klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas)) } - rsc.enqueueReplicaSet(cur) + rsc.enqueueRS(curRS) +} + +func (rsc *ReplicaSetController) deleteRS(obj interface{}) { + rs, ok := obj.(*apps.ReplicaSet) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + rs, ok = tombstone.Obj.(*apps.ReplicaSet) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj)) + return + } + } + + key, err := controller.KeyFunc(rs) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err)) + return + } + + klog.V(4).Infof("Deleting %s %q", rsc.Kind, key) + + // Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean + rsc.expectations.DeleteExpectations(key) + + rsc.queue.Add(key) } // When a pod is created, enqueue the replica set that manages it and update its expectations. @@ -312,7 +377,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { } klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) rsc.expectations.CreationObserved(rsKey) - rsc.enqueueReplicaSet(rs) + rsc.queue.Add(rsKey) return } @@ -326,7 +391,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { } klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod) for _, rs := range rss { - rsc.enqueueReplicaSet(rs) + rsc.enqueueRS(rs) } } @@ -363,7 +428,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil { - rsc.enqueueReplicaSet(rs) + rsc.enqueueRS(rs) } } @@ -374,7 +439,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { return } klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) - rsc.enqueueReplicaSet(rs) + rsc.enqueueRS(rs) // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // the Pod status which in turn will trigger a requeue of the owning replica set thus // having its status updated with the newly available replica. For now, we can fake the @@ -386,7 +451,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds) // Add a second to avoid milliseconds skew in AddAfter. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. - rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second) + rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second) } return } @@ -400,7 +465,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { } klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) for _, rs := range rss { - rsc.enqueueReplicaSet(rs) + rsc.enqueueRS(rs) } } } @@ -438,31 +503,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { } rsKey, err := controller.KeyFunc(rs) if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err)) return } klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) - rsc.enqueueReplicaSet(rs) -} - -// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item. -func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { - key, err := controller.KeyFunc(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) - return - } - rsc.queue.Add(key) -} - -// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item. -func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) { - key, err := controller.KeyFunc(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) - return - } - rsc.queue.AddAfter(key, after) + rsc.queue.Add(rsKey) } // worker runs a worker thread that just dequeues items, processes them, and marks them done. @@ -485,7 +531,7 @@ func (rsc *ReplicaSetController) processNextWorkItem() bool { return true } - utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err)) + utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err)) rsc.queue.AddRateLimited(key) return true @@ -498,7 +544,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) return nil } if diff < 0 { @@ -608,7 +654,6 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // invoked concurrently with the same key. func (rsc *ReplicaSetController) syncReplicaSet(key string) error { - startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) @@ -631,7 +676,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err)) + utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err)) return nil } @@ -670,7 +715,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 && updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) && updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) { - rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) + rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) } return manageReplicasErr } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 12aef209480..bbbdb514f56 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -47,11 +47,16 @@ import ( "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" "k8s.io/client-go/util/workqueue" + "k8s.io/klog" "k8s.io/kubernetes/pkg/controller" . "k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/securitycontext" ) +var ( + informerSyncTimeout = 30 * time.Second +) + func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) { informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) @@ -188,16 +193,20 @@ func processSync(rsc *ReplicaSetController, key string) error { return syncErr } -func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) { +func validateSyncReplicaSet(fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) error { if e, a := expectedCreates, len(fakePodControl.Templates); e != a { - t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) + return fmt.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) } + if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a { - t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a) + return fmt.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a) } + if e, a := expectedPatches, len(fakePodControl.Patches); e != a { - t.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a) + return fmt.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a) } + + return nil } func TestSyncReplicaSetDoesNothing(t *testing.T) { @@ -215,7 +224,10 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { manager.podControl = &fakePodControl manager.syncReplicaSet(GetKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) + err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) + if err != nil { + t.Fatal(err) + } } func TestDeleteFinalStateUnknown(t *testing.T) { @@ -270,7 +282,10 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) { manager.podControl = &fakePodControl manager.syncReplicaSet(GetKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, fakePodControl.CreateLimit, 0, 0) + err := validateSyncReplicaSet(&fakePodControl, fakePodControl.CreateLimit, 0, 0) + if err != nil { + t.Fatal(err) + } expectedLimit := 0 for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ { expectedLimit += controller.SlowStartInitialBatchSize << pass @@ -309,7 +324,10 @@ func TestSyncReplicaSetDormancy(t *testing.T) { rsSpec.Status.ReadyReplicas = 1 rsSpec.Status.AvailableReplicas = 1 manager.syncReplicaSet(GetKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) + err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0) + if err != nil { + t.Fatal(err) + } // Expectations prevents replicas but not an update on status rsSpec.Status.Replicas = 0 @@ -317,7 +335,10 @@ func TestSyncReplicaSetDormancy(t *testing.T) { rsSpec.Status.AvailableReplicas = 0 fakePodControl.Clear() manager.syncReplicaSet(GetKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) + err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0) + if err != nil { + t.Fatal(err) + } // Get the key for the controller rsKey, err := controller.KeyFunc(rsSpec) @@ -335,13 +356,19 @@ func TestSyncReplicaSetDormancy(t *testing.T) { fakePodControl.Err = fmt.Errorf("fake Error") manager.syncReplicaSet(GetKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) + err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) + if err != nil { + t.Fatal(err) + } // This replica should not need a Lowering of expectations, since the previous create failed fakePodControl.Clear() fakePodControl.Err = nil manager.syncReplicaSet(GetKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) + err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) + if err != nil { + t.Fatal(err) + } // 2 PUT for the ReplicaSet status during dormancy window. // Note that the pod creates go through pod control so they're not recorded. @@ -806,7 +833,7 @@ func TestControllerUpdateRequeue(t *testing.T) { // Enqueue once. Then process it. Disable rate-limiting for this. manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter()) - manager.enqueueReplicaSet(rs) + manager.enqueueRS(rs) manager.processNextWorkItem() // It should have been requeued. if got, want := manager.queue.Len(), 1; got != want { @@ -901,7 +928,10 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) expectedPods = int32(burstReplicas) } // This validates the ReplicaSet manager sync actually created pods - validateSyncReplicaSet(t, &fakePodControl, int(expectedPods), 0, 0) + err := validateSyncReplicaSet(&fakePodControl, int(expectedPods), 0, 0) + if err != nil { + t.Fatal(err) + } // This simulates the watch events for all but 1 of the expected pods. // None of these should wake the controller because it has expectations==BurstReplicas. @@ -922,7 +952,10 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) if expectedPods > int32(burstReplicas) { expectedPods = int32(burstReplicas) } - validateSyncReplicaSet(t, &fakePodControl, 0, int(expectedPods), 0) + err := validateSyncReplicaSet(&fakePodControl, 0, int(expectedPods), 0) + if err != nil { + t.Fatal(err) + } // To accurately simulate a watch we must delete the exact pods // the rs is waiting for. @@ -961,7 +994,10 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // Check that the ReplicaSet didn't take any action for all the above pods fakePodControl.Clear() manager.syncReplicaSet(GetKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) + err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) + if err != nil { + t.Fatal(err) + } // Create/Delete the last pod // The last add pod will decrease the expectation of the ReplicaSet to 0, @@ -1045,7 +1081,10 @@ func TestRSSyncExpectations(t *testing.T) { }, }) manager.syncReplicaSet(GetKey(rsSpec, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) + err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) + if err != nil { + t.Fatal(err) + } } func TestDeleteControllerAndExpectations(t *testing.T) { @@ -1062,7 +1101,10 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should set expectations for the ReplicaSet manager.syncReplicaSet(GetKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) + err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0) + if err != nil { + t.Fatal(err) + } fakePodControl.Clear() // Get the ReplicaSet key @@ -1078,6 +1120,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { t.Errorf("No expectations found for ReplicaSet") } informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Delete(rs) + manager.deleteRS(rs) manager.syncReplicaSet(GetKey(rs, t)) if _, exists, err = manager.expectations.GetExpectations(rsKey); exists { @@ -1088,7 +1131,157 @@ func TestDeleteControllerAndExpectations(t *testing.T) { podExp.Add(-1, 0) informers.Core().V1().Pods().Informer().GetIndexer().Replace(make([]interface{}, 0), "0") manager.syncReplicaSet(GetKey(rs, t)) - validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) + err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0) + if err != nil { + t.Fatal(err) + } +} + +func TestExpectationsOnRecreate(t *testing.T) { + client := fake.NewSimpleClientset() + stopCh := make(chan struct{}) + defer close(stopCh) + + f := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + manager := NewReplicaSetController( + f.Apps().V1().ReplicaSets(), + f.Core().V1().Pods(), + client, + 100, + ) + f.Start(stopCh) + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + + if manager.queue.Len() != 0 { + t.Fatal("Unexpected item in the queue") + } + + oldRS := newReplicaSet(1, map[string]string{"foo": "bar"}) + oldRS, err := client.AppsV1().ReplicaSets(oldRS.Namespace).Create(oldRS) + if err != nil { + t.Fatal(err) + } + + err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { + klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len()) + return manager.queue.Len() == 1, nil + }) + if err != nil { + t.Fatalf("initial RS didn't result in new item in the queue: %v", err) + } + + ok := manager.processNextWorkItem() + if !ok { + t.Fatal("queue is shutting down") + } + + err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) + if err != nil { + t.Fatal(err) + } + fakePodControl.Clear() + + oldRSKey, err := controller.KeyFunc(oldRS) + if err != nil { + t.Fatal(err) + } + + rsExp, exists, err := manager.expectations.GetExpectations(oldRSKey) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Errorf("No expectations found for ReplicaSet %q", oldRSKey) + } + if rsExp.Fulfilled() { + t.Errorf("There should be unfulfiled expectation for creating new pods for ReplicaSet %q", oldRSKey) + } + + if manager.queue.Len() != 0 { + t.Fatal("Unexpected item in the queue") + } + + err = client.AppsV1().ReplicaSets(oldRS.Namespace).Delete(oldRS.Name, &metav1.DeleteOptions{}) + if err != nil { + t.Fatal(err) + } + + err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { + klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len()) + return manager.queue.Len() == 1, nil + }) + if err != nil { + t.Fatalf("Deleting RS didn't result in new item in the queue: %v", err) + } + + rsExp, exists, err = manager.expectations.GetExpectations(oldRSKey) + if err != nil { + t.Fatal(err) + } + if exists { + t.Errorf("There should be no expectations for ReplicaSet %q after it was deleted", oldRSKey) + } + + // skip sync for the delete event so we only see the new RS in sync + key, quit := manager.queue.Get() + if quit { + t.Fatal("Queue is shutting down!") + } + manager.queue.Done(key) + if key != oldRSKey { + t.Fatal("Keys should be equal!") + } + + if manager.queue.Len() != 0 { + t.Fatal("Unexpected item in the queue") + } + + newRS := oldRS.DeepCopy() + newRS.UID = uuid.NewUUID() + newRS, err = client.AppsV1().ReplicaSets(newRS.Namespace).Create(newRS) + if err != nil { + t.Fatal(err) + } + + // Sanity check + if newRS.UID == oldRS.UID { + t.Fatal("New RS has the same UID as the old one!") + } + + err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { + klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len()) + return manager.queue.Len() == 1, nil + }) + if err != nil { + t.Fatalf("Re-creating RS didn't result in new item in the queue: %v", err) + } + + ok = manager.processNextWorkItem() + if !ok { + t.Fatal("Queue is shutting down!") + } + + newRSKey, err := controller.KeyFunc(newRS) + if err != nil { + t.Fatal(err) + } + rsExp, exists, err = manager.expectations.GetExpectations(newRSKey) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Errorf("No expectations found for ReplicaSet %q", oldRSKey) + } + if rsExp.Fulfilled() { + t.Errorf("There should be unfulfiled expectation for creating new pods for ReplicaSet %q", oldRSKey) + } + + err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) + if err != nil { + t.Fatal(err) + } + fakePodControl.Clear() } // shuffle returns a new shuffled list of container controllers. @@ -1269,7 +1462,10 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { t.Fatal(err) } // because the matching pod already has a controller, so 2 pods should be created. - validateSyncReplicaSet(t, fakePodControl, 2, 0, 0) + err = validateSyncReplicaSet(fakePodControl, 2, 0, 0) + if err != nil { + t.Fatal(err) + } } func TestPatchPodFails(t *testing.T) { @@ -1292,7 +1488,10 @@ func TestPatchPodFails(t *testing.T) { t.Errorf("expected fake Error, got %+v", err) } // 2 patches to take control of pod1 and pod2 (both fail). - validateSyncReplicaSet(t, fakePodControl, 0, 0, 2) + err = validateSyncReplicaSet(fakePodControl, 0, 0, 2) + if err != nil { + t.Fatal(err) + } // RS should requeue itself. queueRS, _ := manager.queue.Get() if queueRS != rsKey { @@ -1319,7 +1518,10 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { if err != nil { t.Fatal(err) } - validateSyncReplicaSet(t, fakePodControl, 0, 0, 0) + err = validateSyncReplicaSet(fakePodControl, 0, 0, 0) + if err != nil { + t.Fatal(err) + } } func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { @@ -1346,7 +1548,10 @@ func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { t.Error("syncReplicaSet() err = nil, expected non-nil") } // no patch, no create. - validateSyncReplicaSet(t, fakePodControl, 0, 0, 0) + err = validateSyncReplicaSet(fakePodControl, 0, 0, 0) + if err != nil { + t.Fatal(err) + } } var (