Honor the RevisionHistoryLimit in StatefulSetSpec

The StatefulSet controller cleans up ControllerRevisions at the end of
the reconcile loop. If something goes wrong during reconcile, it bails
out without actually performing this step. This commit moves the cleanup
to a deferred function call to guarantee it will be executed.

Fixes issue: https://github.com/kubernetes/kubernetes/issues/85690
This commit is contained in:
Nan Yu 2019-12-09 16:54:28 -08:00
parent 1e12d92a51
commit 89bb7d8454
3 changed files with 75 additions and 15 deletions

View File

@ -69,6 +69,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",

View File

@ -20,12 +20,12 @@ import (
"math" "math"
"sort" "sort"
"k8s.io/klog"
apps "k8s.io/api/apps/v1" apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/controller/history"
) )
@ -81,22 +81,34 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
} }
history.SortControllerRevisions(revisions) history.SortControllerRevisions(revisions)
currentRevision, updateRevision, err := ssc.performUpdate(set, pods, revisions)
if err != nil {
return utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
}
// maintain the set's revision history limit
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}
func (ssc *defaultStatefulSetControl) performUpdate(
set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, error) {
// get the current, and update revisions // get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions) currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil { if err != nil {
return err return currentRevision, updateRevision, err
} }
// perform the main update function and get the status // perform the main update function and get the status
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods) status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
if err != nil { if err != nil {
return err return currentRevision, updateRevision, err
} }
// update the set's status // update the set's status
err = ssc.updateStatefulSetStatus(set, status) err = ssc.updateStatefulSetStatus(set, status)
if err != nil { if err != nil {
return err return currentRevision, updateRevision, err
} }
klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d", klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
@ -113,8 +125,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
status.CurrentRevision, status.CurrentRevision,
status.UpdateRevision) status.UpdateRevision)
// maintain the set's revision history limit return currentRevision, updateRevision, nil
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
} }
func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) { func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
@ -151,7 +162,13 @@ func (ssc *defaultStatefulSetControl) truncateHistory(
update *apps.ControllerRevision) error { update *apps.ControllerRevision) error {
history := make([]*apps.ControllerRevision, 0, len(revisions)) history := make([]*apps.ControllerRevision, 0, len(revisions))
// mark all live revisions // mark all live revisions
live := map[string]bool{current.Name: true, update.Name: true} live := map[string]bool{}
if current != nil {
live[current.Name] = true
}
if update != nil {
live[update.Name] = true
}
for i := range pods { for i := range pods {
live[getPodRevision(pods[i])] = true live[getPodRevision(pods[i])] = true
} }

View File

@ -29,10 +29,11 @@ import (
"time" "time"
apps "k8s.io/api/apps/v1" apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
appsinformers "k8s.io/client-go/informers/apps/v1" appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
@ -302,7 +303,7 @@ func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
defer close(stop) defer close(stop)
spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) { if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err) t.Errorf("StatefulSetControl did not return InternalError found %s", err)
} }
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
@ -362,7 +363,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
spc.podsIndexer.Update(pods[0]) spc.podsIndexer.Update(pods[0])
// now it should fail // now it should fail
if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) { if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err) t.Errorf("StatefulSetControl did not return InternalError found %s", err)
} }
} }
@ -373,7 +374,7 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva
defer close(stop) defer close(stop)
ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) { if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err) t.Errorf("StatefulSetControl did not return InternalError found %s", err)
} }
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
@ -421,7 +422,7 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in
pods[0].Status.Phase = v1.PodFailed pods[0].Status.Phase = v1.PodFailed
spc.podsIndexer.Update(pods[0]) spc.podsIndexer.Update(pods[0])
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) { if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSet failed to %s", err) t.Errorf("StatefulSet failed to %s", err)
} }
if err := invariants(set, spc); err != nil { if err := invariants(set, spc); err != nil {
@ -459,7 +460,7 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
} }
*set.Spec.Replicas = 0 *set.Spec.Replicas = 0
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) { if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSetControl failed to throw error on delete %s", err) t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
} }
if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil { if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil {
@ -1201,6 +1202,42 @@ func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) {
} }
} }
func TestStatefulSetHonorRevisionHistoryLimit(t *testing.T) {
invariants := assertMonotonicInvariants
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
spc, ssu, ssc, stop := setupController(client)
defer close(stop)
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
var err error
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
for i := 0; i < int(*set.Spec.RevisionHistoryLimit)+5; i++ {
set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants)
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
revisions, err := ssc.ListRevisions(set)
if err != nil {
t.Fatalf("Error listing revisions: %v", err)
}
// the extra 2 revisions are `currentRevision` and `updateRevision`
// They're considered as `live`, and truncateHistory only cleans up non-live revisions
if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
t.Fatalf("%s: %d greater than limit %d", "", len(revisions), *set.Spec.RevisionHistoryLimit)
}
}
}
func TestStatefulSetControlLimitsHistory(t *testing.T) { func TestStatefulSetControlLimitsHistory(t *testing.T) {
type testcase struct { type testcase struct {
name string name string
@ -2135,3 +2172,8 @@ func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRev
} }
return rev return rev
} }
func isOrHasInternalError(err error) bool {
agg, ok := err.(utilerrors.Aggregate)
return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0])
}