Merge pull request #82572 from tnozicka/fix-rs-expectations

Fix RS expectations for recreate case
This commit is contained in:
Kubernetes Prow Robot 2019-11-11 05:49:42 -08:00 committed by GitHub
commit b2fb0f77ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 336 additions and 59 deletions

View File

@ -46,6 +46,7 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"init_test.go",
"replica_set_test.go", "replica_set_test.go",
"replica_set_utils_test.go", "replica_set_utils_test.go",
], ],
@ -72,6 +73,7 @@ go_test(
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
], ],
) )

View File

@ -0,0 +1,25 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replicaset
import (
"k8s.io/klog"
)
func init() {
klog.InitFlags(nil)
}

View File

@ -140,12 +140,9 @@ func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer
} }
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet, AddFunc: rsc.addRS,
UpdateFunc: rsc.updateRS, UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store. DeleteFunc: rsc.deleteRS,
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
}) })
rsc.rsLister = rsInformer.Lister() rsc.rsLister = rsInformer.Lister()
rsc.rsListerSynced = rsInformer.Informer().HasSynced rsc.rsListerSynced = rsInformer.Informer().HasSynced
@ -266,11 +263,50 @@ func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controll
return rs return rs
} }
func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
key, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
rsc.queue.Add(key)
}
func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration time.Duration) {
key, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
rsc.queue.AddAfter(key, duration)
}
func (rsc *ReplicaSetController) addRS(obj interface{}) {
rs := obj.(*apps.ReplicaSet)
klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
rsc.enqueueRS(rs)
}
// callback when RS is updated // callback when RS is updated
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
oldRS := old.(*apps.ReplicaSet) oldRS := old.(*apps.ReplicaSet)
curRS := cur.(*apps.ReplicaSet) curRS := cur.(*apps.ReplicaSet)
// TODO: make a KEP and fix informers to always call the delete event handler on re-create
if curRS.UID != oldRS.UID {
key, err := controller.KeyFunc(oldRS)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
return
}
rsc.deleteRS(cache.DeletedFinalStateUnknown{
Key: key,
Obj: oldRS,
})
}
// You might imagine that we only really need to enqueue the // You might imagine that we only really need to enqueue the
// replica set when Spec changes, but it is safer to sync any // replica set when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer // time this function is triggered. That way a full informer
@ -286,7 +322,36 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) { if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas)) klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
} }
rsc.enqueueReplicaSet(cur) rsc.enqueueRS(curRS)
}
func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
rs, ok := obj.(*apps.ReplicaSet)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
rs, ok = tombstone.Obj.(*apps.ReplicaSet)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
return
}
}
key, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
klog.V(4).Infof("Deleting %s %q", rsc.Kind, key)
// Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
rsc.expectations.DeleteExpectations(key)
rsc.queue.Add(key)
} }
// When a pod is created, enqueue the replica set that manages it and update its expectations. // When a pod is created, enqueue the replica set that manages it and update its expectations.
@ -312,7 +377,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
} }
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rsc.expectations.CreationObserved(rsKey) rsc.expectations.CreationObserved(rsKey)
rsc.enqueueReplicaSet(rs) rsc.queue.Add(rsKey)
return return
} }
@ -326,7 +391,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
} }
klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod) klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
for _, rs := range rss { for _, rs := range rss {
rsc.enqueueReplicaSet(rs) rsc.enqueueRS(rs)
} }
} }
@ -363,7 +428,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
if controllerRefChanged && oldControllerRef != nil { if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any. // The ControllerRef was changed. Sync the old controller, if any.
if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil { if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
rsc.enqueueReplicaSet(rs) rsc.enqueueRS(rs)
} }
} }
@ -374,7 +439,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
return return
} }
klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
rsc.enqueueReplicaSet(rs) rsc.enqueueRS(rs)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus // the Pod status which in turn will trigger a requeue of the owning replica set thus
// having its status updated with the newly available replica. For now, we can fake the // having its status updated with the newly available replica. For now, we can fake the
@ -386,7 +451,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds) klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
// Add a second to avoid milliseconds skew in AddAfter. // Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second) rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
} }
return return
} }
@ -400,7 +465,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
} }
klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
for _, rs := range rss { for _, rs := range rss {
rsc.enqueueReplicaSet(rs) rsc.enqueueRS(rs)
} }
} }
} }
@ -438,31 +503,12 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
} }
rsKey, err := controller.KeyFunc(rs) rsKey, err := controller.KeyFunc(rs)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return return
} }
klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rsc.enqueueReplicaSet(rs) rsc.queue.Add(rsKey)
}
// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rsc.queue.Add(key)
}
// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rsc.queue.AddAfter(key, after)
} }
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
@ -485,7 +531,7 @@ func (rsc *ReplicaSetController) processNextWorkItem() bool {
return true return true
} }
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err)) utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key) rsc.queue.AddRateLimited(key)
return true return true
@ -498,7 +544,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
diff := len(filteredPods) - int(*(rs.Spec.Replicas)) diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs) rsKey, err := controller.KeyFunc(rs)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil return nil
} }
if diff < 0 { if diff < 0 {
@ -608,7 +654,6 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key. // invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error { func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
@ -631,7 +676,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err)) utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
return nil return nil
} }
@ -670,7 +715,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 && if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) && updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) { updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
} }
return manageReplicasErr return manageReplicasErr
} }

View File

@ -47,11 +47,16 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing" utiltesting "k8s.io/client-go/util/testing"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
. "k8s.io/kubernetes/pkg/controller/testutil" . "k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
) )
var (
informerSyncTimeout = 30 * time.Second
)
func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) { func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) {
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
@ -188,16 +193,20 @@ func processSync(rsc *ReplicaSetController, key string) error {
return syncErr return syncErr
} }
func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) { func validateSyncReplicaSet(fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) error {
if e, a := expectedCreates, len(fakePodControl.Templates); e != a { if e, a := expectedCreates, len(fakePodControl.Templates); e != a {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) return fmt.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a)
} }
if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a { if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a) return fmt.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a)
} }
if e, a := expectedPatches, len(fakePodControl.Patches); e != a { if e, a := expectedPatches, len(fakePodControl.Patches); e != a {
t.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a) return fmt.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a)
} }
return nil
} }
func TestSyncReplicaSetDoesNothing(t *testing.T) { func TestSyncReplicaSetDoesNothing(t *testing.T) {
@ -215,7 +224,10 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.syncReplicaSet(GetKey(rsSpec, t)) manager.syncReplicaSet(GetKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
if err != nil {
t.Fatal(err)
}
} }
func TestDeleteFinalStateUnknown(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) {
@ -270,7 +282,10 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) {
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.syncReplicaSet(GetKey(rs, t)) manager.syncReplicaSet(GetKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, fakePodControl.CreateLimit, 0, 0) err := validateSyncReplicaSet(&fakePodControl, fakePodControl.CreateLimit, 0, 0)
if err != nil {
t.Fatal(err)
}
expectedLimit := 0 expectedLimit := 0
for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ { for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ {
expectedLimit += controller.SlowStartInitialBatchSize << pass expectedLimit += controller.SlowStartInitialBatchSize << pass
@ -309,7 +324,10 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
rsSpec.Status.ReadyReplicas = 1 rsSpec.Status.ReadyReplicas = 1
rsSpec.Status.AvailableReplicas = 1 rsSpec.Status.AvailableReplicas = 1
manager.syncReplicaSet(GetKey(rsSpec, t)) manager.syncReplicaSet(GetKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
if err != nil {
t.Fatal(err)
}
// Expectations prevents replicas but not an update on status // Expectations prevents replicas but not an update on status
rsSpec.Status.Replicas = 0 rsSpec.Status.Replicas = 0
@ -317,7 +335,10 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
rsSpec.Status.AvailableReplicas = 0 rsSpec.Status.AvailableReplicas = 0
fakePodControl.Clear() fakePodControl.Clear()
manager.syncReplicaSet(GetKey(rsSpec, t)) manager.syncReplicaSet(GetKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
if err != nil {
t.Fatal(err)
}
// Get the key for the controller // Get the key for the controller
rsKey, err := controller.KeyFunc(rsSpec) rsKey, err := controller.KeyFunc(rsSpec)
@ -335,13 +356,19 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
fakePodControl.Err = fmt.Errorf("fake Error") fakePodControl.Err = fmt.Errorf("fake Error")
manager.syncReplicaSet(GetKey(rsSpec, t)) manager.syncReplicaSet(GetKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
if err != nil {
t.Fatal(err)
}
// This replica should not need a Lowering of expectations, since the previous create failed // This replica should not need a Lowering of expectations, since the previous create failed
fakePodControl.Clear() fakePodControl.Clear()
fakePodControl.Err = nil fakePodControl.Err = nil
manager.syncReplicaSet(GetKey(rsSpec, t)) manager.syncReplicaSet(GetKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
if err != nil {
t.Fatal(err)
}
// 2 PUT for the ReplicaSet status during dormancy window. // 2 PUT for the ReplicaSet status during dormancy window.
// Note that the pod creates go through pod control so they're not recorded. // Note that the pod creates go through pod control so they're not recorded.
@ -806,7 +833,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
// Enqueue once. Then process it. Disable rate-limiting for this. // Enqueue once. Then process it. Disable rate-limiting for this.
manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter()) manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter())
manager.enqueueReplicaSet(rs) manager.enqueueRS(rs)
manager.processNextWorkItem() manager.processNextWorkItem()
// It should have been requeued. // It should have been requeued.
if got, want := manager.queue.Len(), 1; got != want { if got, want := manager.queue.Len(), 1; got != want {
@ -901,7 +928,10 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
expectedPods = int32(burstReplicas) expectedPods = int32(burstReplicas)
} }
// This validates the ReplicaSet manager sync actually created pods // This validates the ReplicaSet manager sync actually created pods
validateSyncReplicaSet(t, &fakePodControl, int(expectedPods), 0, 0) err := validateSyncReplicaSet(&fakePodControl, int(expectedPods), 0, 0)
if err != nil {
t.Fatal(err)
}
// This simulates the watch events for all but 1 of the expected pods. // This simulates the watch events for all but 1 of the expected pods.
// None of these should wake the controller because it has expectations==BurstReplicas. // None of these should wake the controller because it has expectations==BurstReplicas.
@ -922,7 +952,10 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
if expectedPods > int32(burstReplicas) { if expectedPods > int32(burstReplicas) {
expectedPods = int32(burstReplicas) expectedPods = int32(burstReplicas)
} }
validateSyncReplicaSet(t, &fakePodControl, 0, int(expectedPods), 0) err := validateSyncReplicaSet(&fakePodControl, 0, int(expectedPods), 0)
if err != nil {
t.Fatal(err)
}
// To accurately simulate a watch we must delete the exact pods // To accurately simulate a watch we must delete the exact pods
// the rs is waiting for. // the rs is waiting for.
@ -961,7 +994,10 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// Check that the ReplicaSet didn't take any action for all the above pods // Check that the ReplicaSet didn't take any action for all the above pods
fakePodControl.Clear() fakePodControl.Clear()
manager.syncReplicaSet(GetKey(rsSpec, t)) manager.syncReplicaSet(GetKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
if err != nil {
t.Fatal(err)
}
// Create/Delete the last pod // Create/Delete the last pod
// The last add pod will decrease the expectation of the ReplicaSet to 0, // The last add pod will decrease the expectation of the ReplicaSet to 0,
@ -1045,7 +1081,10 @@ func TestRSSyncExpectations(t *testing.T) {
}, },
}) })
manager.syncReplicaSet(GetKey(rsSpec, t)) manager.syncReplicaSet(GetKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
if err != nil {
t.Fatal(err)
}
} }
func TestDeleteControllerAndExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) {
@ -1062,7 +1101,10 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
// This should set expectations for the ReplicaSet // This should set expectations for the ReplicaSet
manager.syncReplicaSet(GetKey(rs, t)) manager.syncReplicaSet(GetKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0) err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
if err != nil {
t.Fatal(err)
}
fakePodControl.Clear() fakePodControl.Clear()
// Get the ReplicaSet key // Get the ReplicaSet key
@ -1078,6 +1120,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
t.Errorf("No expectations found for ReplicaSet") t.Errorf("No expectations found for ReplicaSet")
} }
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Delete(rs) informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Delete(rs)
manager.deleteRS(rs)
manager.syncReplicaSet(GetKey(rs, t)) manager.syncReplicaSet(GetKey(rs, t))
if _, exists, err = manager.expectations.GetExpectations(rsKey); exists { if _, exists, err = manager.expectations.GetExpectations(rsKey); exists {
@ -1088,7 +1131,157 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
podExp.Add(-1, 0) podExp.Add(-1, 0)
informers.Core().V1().Pods().Informer().GetIndexer().Replace(make([]interface{}, 0), "0") informers.Core().V1().Pods().Informer().GetIndexer().Replace(make([]interface{}, 0), "0")
manager.syncReplicaSet(GetKey(rs, t)) manager.syncReplicaSet(GetKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0) err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
if err != nil {
t.Fatal(err)
}
}
func TestExpectationsOnRecreate(t *testing.T) {
client := fake.NewSimpleClientset()
stopCh := make(chan struct{})
defer close(stopCh)
f := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
manager := NewReplicaSetController(
f.Apps().V1().ReplicaSets(),
f.Core().V1().Pods(),
client,
100,
)
f.Start(stopCh)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
if manager.queue.Len() != 0 {
t.Fatal("Unexpected item in the queue")
}
oldRS := newReplicaSet(1, map[string]string{"foo": "bar"})
oldRS, err := client.AppsV1().ReplicaSets(oldRS.Namespace).Create(oldRS)
if err != nil {
t.Fatal(err)
}
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len())
return manager.queue.Len() == 1, nil
})
if err != nil {
t.Fatalf("initial RS didn't result in new item in the queue: %v", err)
}
ok := manager.processNextWorkItem()
if !ok {
t.Fatal("queue is shutting down")
}
err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
if err != nil {
t.Fatal(err)
}
fakePodControl.Clear()
oldRSKey, err := controller.KeyFunc(oldRS)
if err != nil {
t.Fatal(err)
}
rsExp, exists, err := manager.expectations.GetExpectations(oldRSKey)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Errorf("No expectations found for ReplicaSet %q", oldRSKey)
}
if rsExp.Fulfilled() {
t.Errorf("There should be unfulfiled expectation for creating new pods for ReplicaSet %q", oldRSKey)
}
if manager.queue.Len() != 0 {
t.Fatal("Unexpected item in the queue")
}
err = client.AppsV1().ReplicaSets(oldRS.Namespace).Delete(oldRS.Name, &metav1.DeleteOptions{})
if err != nil {
t.Fatal(err)
}
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len())
return manager.queue.Len() == 1, nil
})
if err != nil {
t.Fatalf("Deleting RS didn't result in new item in the queue: %v", err)
}
rsExp, exists, err = manager.expectations.GetExpectations(oldRSKey)
if err != nil {
t.Fatal(err)
}
if exists {
t.Errorf("There should be no expectations for ReplicaSet %q after it was deleted", oldRSKey)
}
// skip sync for the delete event so we only see the new RS in sync
key, quit := manager.queue.Get()
if quit {
t.Fatal("Queue is shutting down!")
}
manager.queue.Done(key)
if key != oldRSKey {
t.Fatal("Keys should be equal!")
}
if manager.queue.Len() != 0 {
t.Fatal("Unexpected item in the queue")
}
newRS := oldRS.DeepCopy()
newRS.UID = uuid.NewUUID()
newRS, err = client.AppsV1().ReplicaSets(newRS.Namespace).Create(newRS)
if err != nil {
t.Fatal(err)
}
// Sanity check
if newRS.UID == oldRS.UID {
t.Fatal("New RS has the same UID as the old one!")
}
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len())
return manager.queue.Len() == 1, nil
})
if err != nil {
t.Fatalf("Re-creating RS didn't result in new item in the queue: %v", err)
}
ok = manager.processNextWorkItem()
if !ok {
t.Fatal("Queue is shutting down!")
}
newRSKey, err := controller.KeyFunc(newRS)
if err != nil {
t.Fatal(err)
}
rsExp, exists, err = manager.expectations.GetExpectations(newRSKey)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Errorf("No expectations found for ReplicaSet %q", oldRSKey)
}
if rsExp.Fulfilled() {
t.Errorf("There should be unfulfiled expectation for creating new pods for ReplicaSet %q", oldRSKey)
}
err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
if err != nil {
t.Fatal(err)
}
fakePodControl.Clear()
} }
// shuffle returns a new shuffled list of container controllers. // shuffle returns a new shuffled list of container controllers.
@ -1269,7 +1462,10 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// because the matching pod already has a controller, so 2 pods should be created. // because the matching pod already has a controller, so 2 pods should be created.
validateSyncReplicaSet(t, fakePodControl, 2, 0, 0) err = validateSyncReplicaSet(fakePodControl, 2, 0, 0)
if err != nil {
t.Fatal(err)
}
} }
func TestPatchPodFails(t *testing.T) { func TestPatchPodFails(t *testing.T) {
@ -1292,7 +1488,10 @@ func TestPatchPodFails(t *testing.T) {
t.Errorf("expected fake Error, got %+v", err) t.Errorf("expected fake Error, got %+v", err)
} }
// 2 patches to take control of pod1 and pod2 (both fail). // 2 patches to take control of pod1 and pod2 (both fail).
validateSyncReplicaSet(t, fakePodControl, 0, 0, 2) err = validateSyncReplicaSet(fakePodControl, 0, 0, 2)
if err != nil {
t.Fatal(err)
}
// RS should requeue itself. // RS should requeue itself.
queueRS, _ := manager.queue.Get() queueRS, _ := manager.queue.Get()
if queueRS != rsKey { if queueRS != rsKey {
@ -1319,7 +1518,10 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
validateSyncReplicaSet(t, fakePodControl, 0, 0, 0) err = validateSyncReplicaSet(fakePodControl, 0, 0, 0)
if err != nil {
t.Fatal(err)
}
} }
func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) {
@ -1346,7 +1548,10 @@ func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) {
t.Error("syncReplicaSet() err = nil, expected non-nil") t.Error("syncReplicaSet() err = nil, expected non-nil")
} }
// no patch, no create. // no patch, no create.
validateSyncReplicaSet(t, fakePodControl, 0, 0, 0) err = validateSyncReplicaSet(fakePodControl, 0, 0, 0)
if err != nil {
t.Fatal(err)
}
} }
var ( var (