diff --git a/hack/make-rules/test-cmd-util.sh b/hack/make-rules/test-cmd-util.sh index 54b5757bdda..ef0a924cbac 100644 --- a/hack/make-rules/test-cmd-util.sh +++ b/hack/make-rules/test-cmd-util.sh @@ -2747,6 +2747,7 @@ runTests() { hpa_max_field=".spec.maxReplicas" hpa_cpu_field=".spec.targetCPUUtilizationPercentage" statefulset_replicas_field=".spec.replicas" + statefulset_observed_generation=".status.observedGeneration" job_parallelism_field=".spec.parallelism" deployment_replicas=".spec.replicas" secret_data=".data" @@ -3180,10 +3181,12 @@ runTests() { ### Scale statefulset test with current-replicas and replicas # Pre-condition: 0 replicas kube::test::get_object_assert 'statefulset nginx' "{{$statefulset_replicas_field}}" '0' + kube::test::wait_object_assert 'statefulset nginx' "{{$statefulset_observed_generation}}" '1' # Command: Scale up kubectl scale --current-replicas=0 --replicas=1 statefulset nginx "${kube_flags[@]}" # Post-condition: 1 replica, named nginx-0 kube::test::get_object_assert 'statefulset nginx' "{{$statefulset_replicas_field}}" '1' + kube::test::wait_object_assert 'statefulset nginx' "{{$statefulset_observed_generation}}" '2' # Typically we'd wait and confirm that N>1 replicas are up, but this framework # doesn't start the scheduler, so pet-0 will block all others. # TODO: test robust scaling in an e2e. diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index 94b8954bfb1..9c75d99869f 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -49,10 +49,10 @@ type StatefulPodControlInterface interface { // DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful, // the returned error is nil. DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error - // UpdateStatefulSetStatus Updates the Status.Replicas of a StatefulSet. set is an in-out parameter, and any + // UpdateStatefulSetStatus updates the status of a StatefulSet. set is an in-out parameter, and any // updates made to the set are made visible as mutations to the parameter. If the method is successful, the - // returned error is nil, and set has its Status.Replicas field set to replicas. - UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error + // returned error is nil, and set has its status updated. + UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error } func NewRealStatefulPodControl( @@ -150,9 +150,10 @@ func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod return err } -func (spc *realStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error { +func (spc *realStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error { return retry.RetryOnConflict(retry.DefaultBackoff, func() error { set.Status.Replicas = replicas + set.Status.ObservedGeneration = &generation _, err := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set) if err == nil { return nil diff --git a/pkg/controller/statefulset/stateful_pod_control_test.go b/pkg/controller/statefulset/stateful_pod_control_test.go index 163947ccaac..2ba4fd20550 100644 --- a/pkg/controller/statefulset/stateful_pod_control_test.go +++ b/pkg/controller/statefulset/stateful_pod_control_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" podapi "k8s.io/kubernetes/pkg/api/v1/pod" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" @@ -470,7 +471,7 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) { update := action.(core.UpdateAction) return true, update.GetObject(), nil }) - if err := control.UpdateStatefulSetReplicas(set, 2); err != nil { + if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil { t.Errorf("Error returned on successful status update: %s", err) } if set.Status.Replicas != 2 { @@ -482,6 +483,24 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) { } } +func TestStatefulPodControlUpdatesObservedGeneration(t *testing.T) { + recorder := record.NewFakeRecorder(10) + set := newStatefulSet(3) + fakeClient := &fake.Clientset{} + control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder) + fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { + update := action.(core.UpdateAction) + sts := update.GetObject().(*apps.StatefulSet) + if sts.Status.ObservedGeneration == nil || *sts.Status.ObservedGeneration != int64(3) { + t.Errorf("expected observedGeneration to be synced with generation for statefulset %q", sts.Name) + } + return true, sts, nil + }) + if err := control.UpdateStatefulSetStatus(set, 2, 3); err != nil { + t.Errorf("Error returned on successful status update: %s", err) + } +} + func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) { recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) @@ -493,7 +512,7 @@ func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) { fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) - if err := control.UpdateStatefulSetReplicas(set, 2); err == nil { + if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil { t.Error("Failed update did not return error") } events := collectEvents(recorder.Events) @@ -520,7 +539,7 @@ func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) { return true, update.GetObject(), nil } }) - if err := control.UpdateStatefulSetReplicas(set, 2); err != nil { + if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil { t.Errorf("UpdateStatefulSetStatus returned an error: %s", err) } if set.Status.Replicas != 2 { @@ -544,7 +563,7 @@ func TestStatefulPodControlUpdateReplicasConflictFailure(t *testing.T) { update := action.(core.UpdateAction) return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists")) }) - if err := control.UpdateStatefulSetReplicas(set, 2); err == nil { + if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil { t.Error("UpdateStatefulSetStatus failed to return an error on get failure") } events := collectEvents(recorder.Events) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index dcac5e3ecbf..d3ce799b2ef 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -99,14 +99,14 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p sort.Sort(ascendingOrdinal(condemned)) // if the current number of replicas has changed update the statefulSets replicas - if set.Status.Replicas != int32(ready) { + if set.Status.Replicas != int32(ready) || set.Status.ObservedGeneration == nil || set.Generation > *set.Status.ObservedGeneration { obj, err := api.Scheme.Copy(set) if err != nil { return fmt.Errorf("unable to copy set: %v", err) } set = obj.(*apps.StatefulSet) - if err := ssc.podControl.UpdateStatefulSetReplicas(set, int32(ready)); err != nil { + if err := ssc.podControl.UpdateStatefulSetStatus(set, int32(ready), set.Generation); err != nil { return err } } diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 3e6277e6674..2a74eeea89f 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -769,13 +769,14 @@ func (spc *fakeStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod return nil } -func (spc *fakeStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error { +func (spc *fakeStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error { defer spc.updateStatusTracker.inc() if spc.updateStatusTracker.errorReady() { defer spc.updateStatusTracker.reset() return spc.updateStatusTracker.err } set.Status.Replicas = replicas + set.Status.ObservedGeneration = &generation spc.setsIndexer.Update(set) return nil }