From 89bb7d845415206a25b6676eba99a95b6d94af94 Mon Sep 17 00:00:00 2001 From: Nan Yu Date: Mon, 9 Dec 2019 16:54:28 -0800 Subject: [PATCH] Honor the RevisionHistoryLimit in StatefulSetSpec The StatefulSet controller cleans up ControllerRevisions at the end of the reconcile loop. If something goes wrong during reconcile, it bails out without actually performing this step. This commit moves the cleanup to a deferred function call to guarantee it will be executed. Fixes issue: https://github.com/kubernetes/kubernetes/issues/85690 --- pkg/controller/statefulset/BUILD | 1 + .../statefulset/stateful_set_control.go | 35 ++++++++---- .../statefulset/stateful_set_control_test.go | 54 ++++++++++++++++--- 3 files changed, 75 insertions(+), 15 deletions(-) diff --git a/pkg/controller/statefulset/BUILD b/pkg/controller/statefulset/BUILD index 52810172979..e0871ccb039 100644 --- a/pkg/controller/statefulset/BUILD +++ b/pkg/controller/statefulset/BUILD @@ -69,6 +69,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index fdcda9175b7..49ad241e149 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -20,12 +20,12 @@ import ( "math" "sort" - "k8s.io/klog" - apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/record" + "k8s.io/klog" "k8s.io/kubernetes/pkg/controller/history" ) @@ -81,22 +81,34 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p } history.SortControllerRevisions(revisions) + currentRevision, updateRevision, err := ssc.performUpdate(set, pods, revisions) + if err != nil { + return utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)}) + } + + // maintain the set's revision history limit + return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision) +} + +func (ssc *defaultStatefulSetControl) performUpdate( + set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, error) { + // get the current, and update revisions currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions) if err != nil { - return err + return currentRevision, updateRevision, err } // perform the main update function and get the status status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods) if err != nil { - return err + return currentRevision, updateRevision, err } // update the set's status err = ssc.updateStatefulSetStatus(set, status) if err != nil { - return err + return currentRevision, updateRevision, err } klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d", @@ -113,8 +125,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p status.CurrentRevision, status.UpdateRevision) - // maintain the set's revision history limit - return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision) + return currentRevision, updateRevision, nil } func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) { @@ -151,7 +162,13 @@ func (ssc *defaultStatefulSetControl) truncateHistory( update *apps.ControllerRevision) error { history := make([]*apps.ControllerRevision, 0, len(revisions)) // mark all live revisions - live := map[string]bool{current.Name: true, update.Name: true} + live := map[string]bool{} + if current != nil { + live[current.Name] = true + } + if update != nil { + live[update.Name] = true + } for i := range pods { live[getPodRevision(pods[i])] = true } diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 36a11a04cfe..9b222ee4375 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -29,10 +29,11 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/informers" appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" @@ -302,7 +303,7 @@ func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF defer close(stop) spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) { + if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { @@ -362,7 +363,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF spc.podsIndexer.Update(pods[0]) // now it should fail - if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) { + if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } } @@ -373,7 +374,7 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva defer close(stop) ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) { + if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { @@ -421,7 +422,7 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in pods[0].Status.Phase = v1.PodFailed spc.podsIndexer.Update(pods[0]) spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) - if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) { + if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSet failed to %s", err) } if err := invariants(set, spc); err != nil { @@ -459,7 +460,7 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { } *set.Spec.Replicas = 0 spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) { + if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl failed to throw error on delete %s", err) } if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil { @@ -1201,6 +1202,42 @@ func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) { } } +func TestStatefulSetHonorRevisionHistoryLimit(t *testing.T) { + invariants := assertMonotonicInvariants + set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + spc, ssu, ssc, stop := setupController(client) + defer close(stop) + + if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { + t.Errorf("Failed to turn up StatefulSet : %s", err) + } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + + for i := 0; i < int(*set.Spec.RevisionHistoryLimit)+5; i++ { + set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i) + ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) + updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants) + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + revisions, err := ssc.ListRevisions(set) + if err != nil { + t.Fatalf("Error listing revisions: %v", err) + } + // the extra 2 revisions are `currentRevision` and `updateRevision` + // They're considered as `live`, and truncateHistory only cleans up non-live revisions + if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 { + t.Fatalf("%s: %d greater than limit %d", "", len(revisions), *set.Spec.RevisionHistoryLimit) + } + } +} + func TestStatefulSetControlLimitsHistory(t *testing.T) { type testcase struct { name string @@ -2135,3 +2172,8 @@ func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRev } return rev } + +func isOrHasInternalError(err error) bool { + agg, ok := err.(utilerrors.Aggregate) + return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0]) +}