diff --git a/pkg/controller/statefulset/BUILD b/pkg/controller/statefulset/BUILD index 52810172979..e0871ccb039 100644 --- a/pkg/controller/statefulset/BUILD +++ b/pkg/controller/statefulset/BUILD @@ -69,6 +69,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index fdcda9175b7..49ad241e149 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -20,12 +20,12 @@ import ( "math" "sort" - "k8s.io/klog" - apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/record" + "k8s.io/klog" "k8s.io/kubernetes/pkg/controller/history" ) @@ -81,22 +81,34 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p } history.SortControllerRevisions(revisions) + currentRevision, updateRevision, err := ssc.performUpdate(set, pods, revisions) + if err != nil { + return utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)}) + } + + // maintain the set's revision history limit + return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision) +} + +func (ssc *defaultStatefulSetControl) performUpdate( + set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, error) { + // get the current, and update revisions currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions) if err != nil { - return err + return currentRevision, updateRevision, err } // perform the main update function and get the status status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods) if err != nil { - return err + return currentRevision, updateRevision, err } // update the set's status err = ssc.updateStatefulSetStatus(set, status) if err != nil { - return err + return currentRevision, updateRevision, err } klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d", @@ -113,8 +125,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p status.CurrentRevision, status.UpdateRevision) - // maintain the set's revision history limit - return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision) + return currentRevision, updateRevision, nil } func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) { @@ -151,7 +162,13 @@ func (ssc *defaultStatefulSetControl) truncateHistory( update *apps.ControllerRevision) error { history := make([]*apps.ControllerRevision, 0, len(revisions)) // mark all live revisions - live := map[string]bool{current.Name: true, update.Name: true} + live := map[string]bool{} + if current != nil { + live[current.Name] = true + } + if update != nil { + live[update.Name] = true + } for i := range pods { live[getPodRevision(pods[i])] = true } diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 36a11a04cfe..9b222ee4375 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -29,10 +29,11 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/informers" appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" @@ -302,7 +303,7 @@ func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF defer close(stop) spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) { + if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { @@ -362,7 +363,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF spc.podsIndexer.Update(pods[0]) // now it should fail - if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) { + if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } } @@ -373,7 +374,7 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva defer close(stop) ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) { + if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { @@ -421,7 +422,7 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in pods[0].Status.Phase = v1.PodFailed spc.podsIndexer.Update(pods[0]) spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) - if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) { + if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSet failed to %s", err) } if err := invariants(set, spc); err != nil { @@ -459,7 +460,7 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { } *set.Spec.Replicas = 0 spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) { + if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl failed to throw error on delete %s", err) } if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil { @@ -1201,6 +1202,42 @@ func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) { } } +func TestStatefulSetHonorRevisionHistoryLimit(t *testing.T) { + invariants := assertMonotonicInvariants + set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + spc, ssu, ssc, stop := setupController(client) + defer close(stop) + + if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { + t.Errorf("Failed to turn up StatefulSet : %s", err) + } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + + for i := 0; i < int(*set.Spec.RevisionHistoryLimit)+5; i++ { + set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i) + ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) + updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants) + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + revisions, err := ssc.ListRevisions(set) + if err != nil { + t.Fatalf("Error listing revisions: %v", err) + } + // the extra 2 revisions are `currentRevision` and `updateRevision` + // They're considered as `live`, and truncateHistory only cleans up non-live revisions + if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 { + t.Fatalf("%s: %d greater than limit %d", "", len(revisions), *set.Spec.RevisionHistoryLimit) + } + } +} + func TestStatefulSetControlLimitsHistory(t *testing.T) { type testcase struct { name string @@ -2135,3 +2172,8 @@ func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRev } return rev } + +func isOrHasInternalError(err error) bool { + agg, ok := err.(utilerrors.Aggregate) + return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0]) +}