Merge pull request #42429 from kargakis/sts-observed-generation-fix

Automatic merge from submit-queue (batch tested with PRs 42365, 42429, 41770, 42018, 35055)

controller: statefulsets respect observed generation

StatefulSets do not update ObservedGeneration even though the API field is in place. This means that clients can never be sure whether the StatefulSet controller has observed the latest spec of a StatefulSet.

@kubernetes/sig-apps-bugs
This commit is contained in:
Kubernetes Submit Queue 2017-03-03 09:24:42 -08:00 committed by GitHub
commit 66a0311fd3
5 changed files with 35 additions and 11 deletions

View File

@ -2747,6 +2747,7 @@ runTests() {
hpa_max_field=".spec.maxReplicas"
hpa_cpu_field=".spec.targetCPUUtilizationPercentage"
statefulset_replicas_field=".spec.replicas"
statefulset_observed_generation=".status.observedGeneration"
job_parallelism_field=".spec.parallelism"
deployment_replicas=".spec.replicas"
secret_data=".data"
@ -3180,10 +3181,12 @@ runTests() {
### Scale statefulset test with current-replicas and replicas
# Pre-condition: 0 replicas
kube::test::get_object_assert 'statefulset nginx' "{{$statefulset_replicas_field}}" '0'
kube::test::wait_object_assert 'statefulset nginx' "{{$statefulset_observed_generation}}" '1'
# Command: Scale up
kubectl scale --current-replicas=0 --replicas=1 statefulset nginx "${kube_flags[@]}"
# Post-condition: 1 replica, named nginx-0
kube::test::get_object_assert 'statefulset nginx' "{{$statefulset_replicas_field}}" '1'
kube::test::wait_object_assert 'statefulset nginx' "{{$statefulset_observed_generation}}" '2'
# Typically we'd wait and confirm that N>1 replicas are up, but this framework
# doesn't start the scheduler, so pet-0 will block all others.
# TODO: test robust scaling in an e2e.

View File

@ -49,10 +49,10 @@ type StatefulPodControlInterface interface {
// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
// the returned error is nil.
DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
// UpdateStatefulSetStatus Updates the Status.Replicas of a StatefulSet. set is an in-out parameter, and any
// UpdateStatefulSetStatus updates the status of a StatefulSet. set is an in-out parameter, and any
// updates made to the set are made visible as mutations to the parameter. If the method is successful, the
// returned error is nil, and set has its Status.Replicas field set to replicas.
UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error
// returned error is nil, and set has its status updated.
UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error
}
func NewRealStatefulPodControl(
@ -150,9 +150,10 @@ func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod
return err
}
func (spc *realStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error {
func (spc *realStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
set.Status.Replicas = replicas
set.Status.ObservedGeneration = &generation
_, err := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
if err == nil {
return nil

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
podapi "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
@ -470,7 +471,7 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
})
if err := control.UpdateStatefulSetReplicas(set, 2); err != nil {
if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
if set.Status.Replicas != 2 {
@ -482,6 +483,24 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) {
}
}
func TestStatefulPodControlUpdatesObservedGeneration(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
sts := update.GetObject().(*apps.StatefulSet)
if sts.Status.ObservedGeneration == nil || *sts.Status.ObservedGeneration != int64(3) {
t.Errorf("expected observedGeneration to be synced with generation for statefulset %q", sts.Name)
}
return true, sts, nil
})
if err := control.UpdateStatefulSetStatus(set, 2, 3); err != nil {
t.Errorf("Error returned on successful status update: %s", err)
}
}
func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
@ -493,7 +512,7 @@ func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
if err := control.UpdateStatefulSetReplicas(set, 2); err == nil {
if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil {
t.Error("Failed update did not return error")
}
events := collectEvents(recorder.Events)
@ -520,7 +539,7 @@ func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) {
return true, update.GetObject(), nil
}
})
if err := control.UpdateStatefulSetReplicas(set, 2); err != nil {
if err := control.UpdateStatefulSetStatus(set, 2, 1); err != nil {
t.Errorf("UpdateStatefulSetStatus returned an error: %s", err)
}
if set.Status.Replicas != 2 {
@ -544,7 +563,7 @@ func TestStatefulPodControlUpdateReplicasConflictFailure(t *testing.T) {
update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
})
if err := control.UpdateStatefulSetReplicas(set, 2); err == nil {
if err := control.UpdateStatefulSetStatus(set, 2, 1); err == nil {
t.Error("UpdateStatefulSetStatus failed to return an error on get failure")
}
events := collectEvents(recorder.Events)

View File

@ -99,14 +99,14 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
sort.Sort(ascendingOrdinal(condemned))
// if the current number of replicas has changed update the statefulSets replicas
if set.Status.Replicas != int32(ready) {
if set.Status.Replicas != int32(ready) || set.Status.ObservedGeneration == nil || set.Generation > *set.Status.ObservedGeneration {
obj, err := api.Scheme.Copy(set)
if err != nil {
return fmt.Errorf("unable to copy set: %v", err)
}
set = obj.(*apps.StatefulSet)
if err := ssc.podControl.UpdateStatefulSetReplicas(set, int32(ready)); err != nil {
if err := ssc.podControl.UpdateStatefulSetStatus(set, int32(ready), set.Generation); err != nil {
return err
}
}

View File

@ -769,13 +769,14 @@ func (spc *fakeStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod
return nil
}
func (spc *fakeStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error {
func (spc *fakeStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet, replicas int32, generation int64) error {
defer spc.updateStatusTracker.inc()
if spc.updateStatusTracker.errorReady() {
defer spc.updateStatusTracker.reset()
return spc.updateStatusTracker.err
}
set.Status.Replicas = replicas
set.Status.ObservedGeneration = &generation
spc.setsIndexer.Update(set)
return nil
}