mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 18:54:06 +00:00
Merge pull request #86097 from nan-yu/statefulset_fix
Honor the RevisionHistoryLimit in StatefulSetSpec
This commit is contained in:
commit
65879f9495
@ -69,6 +69,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
|
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
|
||||||
|
@ -20,12 +20,12 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"k8s.io/klog"
|
|
||||||
|
|
||||||
apps "k8s.io/api/apps/v1"
|
apps "k8s.io/api/apps/v1"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/klog"
|
||||||
"k8s.io/kubernetes/pkg/controller/history"
|
"k8s.io/kubernetes/pkg/controller/history"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -81,22 +81,34 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
|
|||||||
}
|
}
|
||||||
history.SortControllerRevisions(revisions)
|
history.SortControllerRevisions(revisions)
|
||||||
|
|
||||||
|
currentRevision, updateRevision, err := ssc.performUpdate(set, pods, revisions)
|
||||||
|
if err != nil {
|
||||||
|
return utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// maintain the set's revision history limit
|
||||||
|
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ssc *defaultStatefulSetControl) performUpdate(
|
||||||
|
set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, error) {
|
||||||
|
|
||||||
// get the current, and update revisions
|
// get the current, and update revisions
|
||||||
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
|
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return currentRevision, updateRevision, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// perform the main update function and get the status
|
// perform the main update function and get the status
|
||||||
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
|
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return currentRevision, updateRevision, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the set's status
|
// update the set's status
|
||||||
err = ssc.updateStatefulSetStatus(set, status)
|
err = ssc.updateStatefulSetStatus(set, status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return currentRevision, updateRevision, err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
|
klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
|
||||||
@ -113,8 +125,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
|
|||||||
status.CurrentRevision,
|
status.CurrentRevision,
|
||||||
status.UpdateRevision)
|
status.UpdateRevision)
|
||||||
|
|
||||||
// maintain the set's revision history limit
|
return currentRevision, updateRevision, nil
|
||||||
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
|
func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
|
||||||
@ -151,7 +162,13 @@ func (ssc *defaultStatefulSetControl) truncateHistory(
|
|||||||
update *apps.ControllerRevision) error {
|
update *apps.ControllerRevision) error {
|
||||||
history := make([]*apps.ControllerRevision, 0, len(revisions))
|
history := make([]*apps.ControllerRevision, 0, len(revisions))
|
||||||
// mark all live revisions
|
// mark all live revisions
|
||||||
live := map[string]bool{current.Name: true, update.Name: true}
|
live := map[string]bool{}
|
||||||
|
if current != nil {
|
||||||
|
live[current.Name] = true
|
||||||
|
}
|
||||||
|
if update != nil {
|
||||||
|
live[update.Name] = true
|
||||||
|
}
|
||||||
for i := range pods {
|
for i := range pods {
|
||||||
live[getPodRevision(pods[i])] = true
|
live[getPodRevision(pods[i])] = true
|
||||||
}
|
}
|
||||||
|
@ -29,10 +29,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
apps "k8s.io/api/apps/v1"
|
apps "k8s.io/api/apps/v1"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
appsinformers "k8s.io/client-go/informers/apps/v1"
|
appsinformers "k8s.io/client-go/informers/apps/v1"
|
||||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
@ -302,7 +303,7 @@ func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
|
|||||||
defer close(stop)
|
defer close(stop)
|
||||||
spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
||||||
|
|
||||||
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
|
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) {
|
||||||
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
|
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
|
||||||
}
|
}
|
||||||
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
|
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
|
||||||
@ -362,7 +363,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
|
|||||||
spc.podsIndexer.Update(pods[0])
|
spc.podsIndexer.Update(pods[0])
|
||||||
|
|
||||||
// now it should fail
|
// now it should fail
|
||||||
if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) {
|
if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
|
||||||
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
|
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -373,7 +374,7 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva
|
|||||||
defer close(stop)
|
defer close(stop)
|
||||||
ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
||||||
|
|
||||||
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
|
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) {
|
||||||
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
|
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
|
||||||
}
|
}
|
||||||
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
|
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
|
||||||
@ -421,7 +422,7 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in
|
|||||||
pods[0].Status.Phase = v1.PodFailed
|
pods[0].Status.Phase = v1.PodFailed
|
||||||
spc.podsIndexer.Update(pods[0])
|
spc.podsIndexer.Update(pods[0])
|
||||||
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
|
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
|
||||||
if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) {
|
if err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) {
|
||||||
t.Errorf("StatefulSet failed to %s", err)
|
t.Errorf("StatefulSet failed to %s", err)
|
||||||
}
|
}
|
||||||
if err := invariants(set, spc); err != nil {
|
if err := invariants(set, spc); err != nil {
|
||||||
@ -459,7 +460,7 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
*set.Spec.Replicas = 0
|
*set.Spec.Replicas = 0
|
||||||
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
||||||
if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
|
if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) {
|
||||||
t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
|
t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
|
||||||
}
|
}
|
||||||
if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil {
|
if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil {
|
||||||
@ -1201,6 +1202,42 @@ func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStatefulSetHonorRevisionHistoryLimit(t *testing.T) {
|
||||||
|
invariants := assertMonotonicInvariants
|
||||||
|
set := newStatefulSet(3)
|
||||||
|
client := fake.NewSimpleClientset(set)
|
||||||
|
spc, ssu, ssc, stop := setupController(client)
|
||||||
|
defer close(stop)
|
||||||
|
|
||||||
|
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
|
||||||
|
t.Errorf("Failed to turn up StatefulSet : %s", err)
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error getting updated StatefulSet: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < int(*set.Spec.RevisionHistoryLimit)+5; i++ {
|
||||||
|
set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
|
||||||
|
ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
|
||||||
|
updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants)
|
||||||
|
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error getting updated StatefulSet: %v", err)
|
||||||
|
}
|
||||||
|
revisions, err := ssc.ListRevisions(set)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error listing revisions: %v", err)
|
||||||
|
}
|
||||||
|
// the extra 2 revisions are `currentRevision` and `updateRevision`
|
||||||
|
// They're considered as `live`, and truncateHistory only cleans up non-live revisions
|
||||||
|
if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
|
||||||
|
t.Fatalf("%s: %d greater than limit %d", "", len(revisions), *set.Spec.RevisionHistoryLimit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStatefulSetControlLimitsHistory(t *testing.T) {
|
func TestStatefulSetControlLimitsHistory(t *testing.T) {
|
||||||
type testcase struct {
|
type testcase struct {
|
||||||
name string
|
name string
|
||||||
@ -2135,3 +2172,8 @@ func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRev
|
|||||||
}
|
}
|
||||||
return rev
|
return rev
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isOrHasInternalError(err error) bool {
|
||||||
|
agg, ok := err.(utilerrors.Aggregate)
|
||||||
|
return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0])
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user