fix stateful set pod recreation and event spam (#123809)

* fix pods tracking and internal error checking in statefulset tests

* fix stateful set pod recreation and event spam

- do not emit events when pod reaches terminal phase
- do not try to recreate pod until the old pod has been removed from
  etcd storage

* fix conflict race in statefulset rest update

statefulset controller does less requests per sync now and thus can
reconcile status faster, thus resulting in a higher chance for conflicts
This commit is contained in:
Filip Křepinský 2024-04-18 10:03:46 +02:00 committed by GitHub
parent 99735ccba8
commit 85d55b6737
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 137 additions and 118 deletions

View File

@ -98,7 +98,6 @@ func NewStatefulSetController(
recorder), recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()), NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()), history.NewHistory(kubeClient, revInformer.Lister()),
recorder,
), ),
pvcListerSynced: pvcInformer.Informer().HasSynced, pvcListerSynced: pvcInformer.Informer().HasSynced,
revListerSynced: revInformer.Informer().HasSynced, revListerSynced: revInformer.Informer().HasSynced,
@ -235,6 +234,9 @@ func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interfa
return return
} }
logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta) logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
if oldPod.Status.Phase != curPod.Status.Phase {
logger.V(4).Info("StatefulSet Pod phase changed", "pod", klog.KObj(curPod), "statefulSet", klog.KObj(set), "podPhase", curPod.Status.Phase)
}
ssc.enqueueStatefulSet(set) ssc.enqueueStatefulSet(set)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus // the Pod status which in turn will trigger a requeue of the owning replica set thus

View File

@ -27,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
@ -61,16 +60,14 @@ type StatefulSetControlInterface interface {
func NewDefaultStatefulSetControl( func NewDefaultStatefulSetControl(
podControl *StatefulPodControl, podControl *StatefulPodControl,
statusUpdater StatefulSetStatusUpdaterInterface, statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface, controllerHistory history.Interface) StatefulSetControlInterface {
recorder record.EventRecorder) StatefulSetControlInterface { return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory}
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
} }
type defaultStatefulSetControl struct { type defaultStatefulSetControl struct {
podControl *StatefulPodControl podControl *StatefulPodControl
statusUpdater StatefulSetStatusUpdaterInterface statusUpdater StatefulSetStatusUpdaterInterface
controllerHistory history.Interface controllerHistory history.Interface
recorder record.EventRecorder
} }
// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and // UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
@ -367,45 +364,25 @@ func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, current
func (ssc *defaultStatefulSetControl) processReplica( func (ssc *defaultStatefulSetControl) processReplica(
ctx context.Context, ctx context.Context,
set *apps.StatefulSet, set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
currentSet *apps.StatefulSet,
updateSet *apps.StatefulSet, updateSet *apps.StatefulSet,
monotonic bool, monotonic bool,
replicas []*v1.Pod, replicas []*v1.Pod,
i int) (bool, error) { i int) (bool, error) {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
// Delete and recreate pods which finished running.
//
// Note that pods with phase Succeeded will also trigger this event. This is // Note that pods with phase Succeeded will also trigger this event. This is
// because final pod phase of evicted or otherwise forcibly stopped pods // because final pod phase of evicted or otherwise forcibly stopped pods
// (e.g. terminated on node reboot) is determined by the exit code of the // (e.g. terminated on node reboot) is determined by the exit code of the
// container, not by the reason for pod termination. We should restart the pod // container, not by the reason for pod termination. We should restart the pod
// regardless of the exit code. // regardless of the exit code.
if isFailed(replicas[i]) || isSucceeded(replicas[i]) { if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if isFailed(replicas[i]) { if replicas[i].DeletionTimestamp == nil {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
"StatefulSet %s/%s is recreating failed Pod %s", return true, err
set.Namespace, }
set.Name,
replicas[i].Name)
} else {
ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
"StatefulSet %s/%s is recreating terminated Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
} }
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { // New pod should be generated on the next sync after the current pod is removed from etcd.
return true, err return true, nil
}
replicaOrd := i + getStartOrdinal(set)
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
replicaOrd)
} }
// If we find a Pod that has not been created we create the Pod // If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) { if !isCreated(replicas[i]) {
@ -637,7 +614,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode. // First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
processReplicaFn := func(i int) (bool, error) { processReplicaFn := func(i int) (bool, error) {
return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i) return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i)
} }
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil { if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)

View File

@ -60,8 +60,7 @@ func setupController(client clientset.Interface) (*fakeObjectManager, *fakeState
om := newFakeObjectManager(informerFactory) om := newFakeObjectManager(informerFactory)
spc := NewStatefulPodControlFromManager(om, &noopRecorder{}) spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets()) ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
recorder := &noopRecorder{} ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()))
ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder)
// The informer is not started. The tests here manipulate the local cache (indexers) directly, and there is no waiting // The informer is not started. The tests here manipulate the local cache (indexers) directly, and there is no waiting
// for client state to sync. In fact, because the client is not updated during tests, informer updates will break tests // for client state to sync. In fact, because the client is not updated during tests, informer updates will break tests
@ -171,10 +170,11 @@ func TestStatefulSetControl(t *testing.T) {
{ReplacesPods, largeSetFn}, {ReplacesPods, largeSetFn},
{RecreatesFailedPod, simpleSetFn}, {RecreatesFailedPod, simpleSetFn},
{RecreatesSucceededPod, simpleSetFn}, {RecreatesSucceededPod, simpleSetFn},
{RecreatesFailedPodWithDeleteFailure, simpleSetFn},
{RecreatesSucceededPodWithDeleteFailure, simpleSetFn},
{CreatePodFailure, simpleSetFn}, {CreatePodFailure, simpleSetFn},
{UpdatePodFailure, simpleSetFn}, {UpdatePodFailure, simpleSetFn},
{UpdateSetStatusFailure, simpleSetFn}, {UpdateSetStatusFailure, simpleSetFn},
{PodRecreateDeleteFailure, simpleSetFn},
{NewRevisionDeletePodFailure, simpleSetFn}, {NewRevisionDeletePodFailure, simpleSetFn},
{RecreatesPVCForPendingPod, simpleSetFn}, {RecreatesPVCForPendingPod, simpleSetFn},
} }
@ -398,9 +398,10 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
} }
} }
func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) { func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, terminalPhase v1.PodPhase, testDeletePodFailure bool) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
om, _, ssc := setupController(client) om, _, ssc := setupController(client)
expectedNumOfDeleteRequests := 0
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -415,34 +416,105 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
if err := invariants(set, om); err != nil { if err := invariants(set, om); err != nil {
t.Error(err) t.Error(err)
} }
pods, err = om.podsLister.Pods(set.Namespace).List(selector) if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
if err != nil { t.Errorf("Found unexpected number of delete calls, got %v, expected 1", om.deletePodTracker.requests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err) t.Error(err)
} }
pods[0].Status.Phase = phase
om.podsIndexer.Update(pods[0]) terminalPodOrdinal := -1
for i, pod := range pods {
// Set at least Pending phase to acknowledge the creation of pods
newPhase := v1.PodPending
if i == 0 {
// Set terminal phase for the first pod
newPhase = terminalPhase
terminalPodOrdinal = getOrdinal(pod)
}
pod.Status.Phase = newPhase
if err = om.podsIndexer.Update(pod); err != nil {
t.Error(err)
}
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
if testDeletePodFailure {
// Expect pod deletion failure
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
expectedNumOfDeleteRequests++
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
}
// Expect pod deletion
expectedNumOfDeleteRequests++
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err) t.Errorf("Error updating StatefulSet %s", err)
} }
if err := invariants(set, om); err != nil { if err := invariants(set, om); err != nil {
t.Error(err) t.Error(err)
} }
pods, err = om.podsLister.Pods(set.Namespace).List(selector) if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
if err != nil { t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err) t.Error(err)
} }
if isCreated(pods[0]) {
// Expect no additional delete calls and expect pod creation
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
recreatedPod := findPodByOrdinal(pods, terminalPodOrdinal)
// new recreated pod should have empty phase
if recreatedPod == nil || isCreated(recreatedPod) {
t.Error("StatefulSet did not recreate failed Pod") t.Error("StatefulSet did not recreate failed Pod")
} }
expectedNumberOfCreateRequests := 2
if monotonic := !allowsBurst(set); !monotonic {
expectedNumberOfCreateRequests = int(*set.Spec.Replicas + 1)
}
if om.createPodTracker.requests != expectedNumberOfCreateRequests {
t.Errorf("Found unexpected number of create calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumberOfCreateRequests)
}
} }
func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodFailed) recreatesPod(t, set, invariants, v1.PodFailed, false)
} }
func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodSucceeded) recreatesPod(t, set, invariants, v1.PodSucceeded, false)
}
func RecreatesFailedPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodFailed, true)
}
func RecreatesSucceededPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodSucceeded, true)
} }
func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
@ -450,8 +522,8 @@ func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
om, _, ssc := setupController(client) om, _, ssc := setupController(client)
om.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) om.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) { if err := scaleUpStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err) t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
} }
// Update so set.Status is set for the next scaleUpStatefulSetControl call. // Update so set.Status is set for the next scaleUpStatefulSetControl call.
var err error var err error
@ -514,8 +586,8 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
om.podsIndexer.Update(pods[0]) om.podsIndexer.Update(pods[0])
// now it should fail // now it should fail
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) { if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err) t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
} }
} }
@ -524,8 +596,8 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva
om, ssu, ssc := setupController(client) om, ssu, ssc := setupController(client)
ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) { if err := scaleUpStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err) t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
} }
// Update so set.Status is set for the next scaleUpStatefulSetControl call. // Update so set.Status is set for the next scaleUpStatefulSetControl call.
var err error var err error
@ -551,52 +623,6 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva
} }
} }
func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
om, _, ssc := setupController(client)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
}
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
pods[0].Status.Phase = v1.PodFailed
om.podsIndexer.Update(pods[0])
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSet failed to %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
if isCreated(pods[0]) {
t.Error("StatefulSet did not recreate failed Pod")
}
}
func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
om, _, ssc := setupController(client) om, _, ssc := setupController(client)
@ -792,7 +818,7 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
} }
*set.Spec.Replicas = 0 *set.Spec.Replicas = 0
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) { if err := scaleDownStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl failed to throw error on delete %s", err) t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
} }
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -834,8 +860,7 @@ func TestStatefulSetControl_getSetRevisions(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := NewStatefulPodControlFromManager(newFakeObjectManager(informerFactory), &noopRecorder{}) spc := NewStatefulPodControlFromManager(newFakeObjectManager(informerFactory), &noopRecorder{})
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets()) ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
recorder := &noopRecorder{} ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())}
ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder}
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)
@ -2501,6 +2526,11 @@ func (om *fakeObjectManager) GetPod(namespace, podName string) (*v1.Pod, error)
} }
func (om *fakeObjectManager) UpdatePod(pod *v1.Pod) error { func (om *fakeObjectManager) UpdatePod(pod *v1.Pod) error {
defer om.updatePodTracker.inc()
if om.updatePodTracker.errorReady() {
defer om.updatePodTracker.reset()
return om.updatePodTracker.getErr()
}
return om.podsIndexer.Update(pod) return om.podsIndexer.Update(pod)
} }
@ -3356,6 +3386,16 @@ func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRev
} }
func isOrHasInternalError(err error) bool { func isOrHasInternalError(err error) bool {
agg, ok := err.(utilerrors.Aggregate) if err == nil {
return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0]) return false
}
var agg utilerrors.Aggregate
if errors.As(err, &agg) {
for _, e := range agg.Errors() {
if apierrors.IsInternalError(e) {
return true
}
}
}
return apierrors.IsInternalError(err)
} }

View File

@ -37,7 +37,6 @@ import (
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
@ -946,8 +945,7 @@ func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime
ssh := history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()) ssh := history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())
ssc.podListerSynced = alwaysReady ssc.podListerSynced = alwaysReady
ssc.setListerSynced = alwaysReady ssc.setListerSynced = alwaysReady
recorder := record.NewFakeRecorder(10) ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh)
ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh, recorder)
return ssc, spc, om, ssh return ssc, spc, om, ssh
} }

View File

@ -25,12 +25,12 @@ import (
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/kubectl/pkg/util/podutils" "k8s.io/kubectl/pkg/util/podutils"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest" e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest"
@ -247,21 +247,23 @@ func ExecInStatefulPods(ctx context.Context, c clientset.Interface, ss *appsv1.S
} }
// update updates a statefulset, and it is only used within rest.go // update updates a statefulset, and it is only used within rest.go
func update(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) *appsv1.StatefulSet { func update(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) (ss *appsv1.StatefulSet) {
for i := 0; i < 3; i++ { err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ss, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{}) var err error
ss, err = c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil { if err != nil {
framework.Failf("failed to get statefulset %q: %v", name, err) framework.Failf("failed to get statefulset %q: %v", name, err)
} }
if *(ss.Spec.Replicas) == replicas {
return nil
}
*(ss.Spec.Replicas) = replicas *(ss.Spec.Replicas) = replicas
ss, err = c.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{}) ss, err = c.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
if err == nil { return err
return ss })
} if err == nil {
if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) { return ss
framework.Failf("failed to update statefulset %q: %v", name, err)
}
} }
framework.Failf("too many retries draining statefulset %q", name) framework.Failf("failed to update statefulset %q: %v", name, err)
return nil return nil
} }