From 85d55b67371bae83aa73a3a65b23520e0f22a74b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20K=C5=99epinsk=C3=BD?= Date: Thu, 18 Apr 2024 10:03:46 +0200 Subject: [PATCH] fix stateful set pod recreation and event spam (#123809) * fix pods tracking and internal error checking in statefulset tests * fix stateful set pod recreation and event spam - do not emit events when pod reaches terminal phase - do not try to recreate pod until the old pod has been removed from etcd storage * fix conflict race in statefulset rest update statefulset controller does less requests per sync now and thus can reconcile status faster, thus resulting in a higher chance for conflicts --- pkg/controller/statefulset/stateful_set.go | 4 +- .../statefulset/stateful_set_control.go | 43 +---- .../statefulset/stateful_set_control_test.go | 180 +++++++++++------- .../statefulset/stateful_set_test.go | 4 +- test/e2e/framework/statefulset/rest.go | 24 +-- 5 files changed, 137 insertions(+), 118 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 8f3a42d23c9..3f67ef15da0 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -98,7 +98,6 @@ func NewStatefulSetController( recorder), NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()), history.NewHistory(kubeClient, revInformer.Lister()), - recorder, ), pvcListerSynced: pvcInformer.Informer().HasSynced, revListerSynced: revInformer.Informer().HasSynced, @@ -235,6 +234,9 @@ func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interfa return } logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta) + if oldPod.Status.Phase != curPod.Status.Phase { + logger.V(4).Info("StatefulSet Pod phase changed", "pod", klog.KObj(curPod), "statefulSet", klog.KObj(set), "podPhase", curPod.Status.Phase) + } ssc.enqueueStatefulSet(set) // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // the Pod status which in turn will trigger a requeue of the owning replica set thus diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index ce8c693fc8b..01886d6895e 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -27,7 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/features" @@ -61,16 +60,14 @@ type StatefulSetControlInterface interface { func NewDefaultStatefulSetControl( podControl *StatefulPodControl, statusUpdater StatefulSetStatusUpdaterInterface, - controllerHistory history.Interface, - recorder record.EventRecorder) StatefulSetControlInterface { - return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder} + controllerHistory history.Interface) StatefulSetControlInterface { + return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory} } type defaultStatefulSetControl struct { podControl *StatefulPodControl statusUpdater StatefulSetStatusUpdaterInterface controllerHistory history.Interface - recorder record.EventRecorder } // UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and @@ -367,45 +364,25 @@ func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, current func (ssc *defaultStatefulSetControl) processReplica( ctx context.Context, set *apps.StatefulSet, - currentRevision *apps.ControllerRevision, - updateRevision *apps.ControllerRevision, - currentSet *apps.StatefulSet, updateSet *apps.StatefulSet, monotonic bool, replicas []*v1.Pod, i int) (bool, error) { logger := klog.FromContext(ctx) - // Delete and recreate pods which finished running. - // + // Note that pods with phase Succeeded will also trigger this event. This is // because final pod phase of evicted or otherwise forcibly stopped pods // (e.g. terminated on node reboot) is determined by the exit code of the // container, not by the reason for pod termination. We should restart the pod // regardless of the exit code. if isFailed(replicas[i]) || isSucceeded(replicas[i]) { - if isFailed(replicas[i]) { - ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", - "StatefulSet %s/%s is recreating failed Pod %s", - set.Namespace, - set.Name, - replicas[i].Name) - } else { - ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod", - "StatefulSet %s/%s is recreating terminated Pod %s", - set.Namespace, - set.Name, - replicas[i].Name) + if replicas[i].DeletionTimestamp == nil { + if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { + return true, err + } } - if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { - return true, err - } - replicaOrd := i + getStartOrdinal(set) - replicas[i] = newVersionedStatefulSetPod( - currentSet, - updateSet, - currentRevision.Name, - updateRevision.Name, - replicaOrd) + // New pod should be generated on the next sync after the current pod is removed from etcd. + return true, nil } // If we find a Pod that has not been created we create the Pod if !isCreated(replicas[i]) { @@ -637,7 +614,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // First, process each living replica. Exit if we run into an error or something blocking in monotonic mode. processReplicaFn := func(i int) (bool, error) { - return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i) + return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i) } if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil { updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index ddaef5324da..a61eed2333e 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -60,8 +60,7 @@ func setupController(client clientset.Interface) (*fakeObjectManager, *fakeState om := newFakeObjectManager(informerFactory) spc := NewStatefulPodControlFromManager(om, &noopRecorder{}) ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets()) - recorder := &noopRecorder{} - ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder) + ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())) // The informer is not started. The tests here manipulate the local cache (indexers) directly, and there is no waiting // for client state to sync. In fact, because the client is not updated during tests, informer updates will break tests @@ -171,10 +170,11 @@ func TestStatefulSetControl(t *testing.T) { {ReplacesPods, largeSetFn}, {RecreatesFailedPod, simpleSetFn}, {RecreatesSucceededPod, simpleSetFn}, + {RecreatesFailedPodWithDeleteFailure, simpleSetFn}, + {RecreatesSucceededPodWithDeleteFailure, simpleSetFn}, {CreatePodFailure, simpleSetFn}, {UpdatePodFailure, simpleSetFn}, {UpdateSetStatusFailure, simpleSetFn}, - {PodRecreateDeleteFailure, simpleSetFn}, {NewRevisionDeletePodFailure, simpleSetFn}, {RecreatesPVCForPendingPod, simpleSetFn}, } @@ -398,9 +398,10 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) } } -func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) { +func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, terminalPhase v1.PodPhase, testDeletePodFailure bool) { client := fake.NewSimpleClientset() om, _, ssc := setupController(client) + expectedNumOfDeleteRequests := 0 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { t.Error(err) @@ -415,34 +416,105 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, if err := invariants(set, om); err != nil { t.Error(err) } - pods, err = om.podsLister.Pods(set.Namespace).List(selector) - if err != nil { + if om.deletePodTracker.requests != expectedNumOfDeleteRequests { + t.Errorf("Found unexpected number of delete calls, got %v, expected 1", om.deletePodTracker.requests) + } + if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil { t.Error(err) } - pods[0].Status.Phase = phase - om.podsIndexer.Update(pods[0]) + + terminalPodOrdinal := -1 + for i, pod := range pods { + // Set at least Pending phase to acknowledge the creation of pods + newPhase := v1.PodPending + if i == 0 { + // Set terminal phase for the first pod + newPhase = terminalPhase + terminalPodOrdinal = getOrdinal(pod) + } + pod.Status.Phase = newPhase + if err = om.podsIndexer.Update(pod); err != nil { + t.Error(err) + } + } + if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil { + t.Error(err) + } + if testDeletePodFailure { + // Expect pod deletion failure + om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) + expectedNumOfDeleteRequests++ + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) { + t.Errorf("StatefulSetControl did not return InternalError, found %s", err) + } + if err := invariants(set, om); err != nil { + t.Error(err) + } + if om.deletePodTracker.requests != expectedNumOfDeleteRequests { + t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests) + } + if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil { + t.Error(err) + } + } + + // Expect pod deletion + expectedNumOfDeleteRequests++ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, om); err != nil { t.Error(err) } - pods, err = om.podsLister.Pods(set.Namespace).List(selector) - if err != nil { + if om.deletePodTracker.requests != expectedNumOfDeleteRequests { + t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests) + } + if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil { t.Error(err) } - if isCreated(pods[0]) { + + // Expect no additional delete calls and expect pod creation + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Errorf("Error updating StatefulSet %s", err) + } + if err := invariants(set, om); err != nil { + t.Error(err) + } + if om.deletePodTracker.requests != expectedNumOfDeleteRequests { + t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests) + } + if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil { + t.Error(err) + } + recreatedPod := findPodByOrdinal(pods, terminalPodOrdinal) + // new recreated pod should have empty phase + if recreatedPod == nil || isCreated(recreatedPod) { t.Error("StatefulSet did not recreate failed Pod") } + expectedNumberOfCreateRequests := 2 + if monotonic := !allowsBurst(set); !monotonic { + expectedNumberOfCreateRequests = int(*set.Spec.Replicas + 1) + } + if om.createPodTracker.requests != expectedNumberOfCreateRequests { + t.Errorf("Found unexpected number of create calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumberOfCreateRequests) + } } func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { - recreatesPod(t, set, invariants, v1.PodFailed) + recreatesPod(t, set, invariants, v1.PodFailed, false) } func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { - recreatesPod(t, set, invariants, v1.PodSucceeded) + recreatesPod(t, set, invariants, v1.PodSucceeded, false) +} + +func RecreatesFailedPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { + recreatesPod(t, set, invariants, v1.PodFailed, true) +} + +func RecreatesSucceededPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { + recreatesPod(t, set, invariants, v1.PodSucceeded, true) } func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { @@ -450,8 +522,8 @@ func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF om, _, ssc := setupController(client) om.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) { - t.Errorf("StatefulSetControl did not return InternalError found %s", err) + if err := scaleUpStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) { + t.Errorf("StatefulSetControl did not return InternalError, found %s", err) } // Update so set.Status is set for the next scaleUpStatefulSetControl call. var err error @@ -514,8 +586,8 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF om.podsIndexer.Update(pods[0]) // now it should fail - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) { - t.Errorf("StatefulSetControl did not return InternalError found %s", err) + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) { + t.Errorf("StatefulSetControl did not return InternalError, found %s", err) } } @@ -524,8 +596,8 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva om, ssu, ssc := setupController(client) ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) { - t.Errorf("StatefulSetControl did not return InternalError found %s", err) + if err := scaleUpStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) { + t.Errorf("StatefulSetControl did not return InternalError, found %s", err) } // Update so set.Status is set for the next scaleUpStatefulSetControl call. var err error @@ -551,52 +623,6 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva } } -func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { - client := fake.NewSimpleClientset(set) - om, _, ssc := setupController(client) - - selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) - if err != nil { - t.Error(err) - } - pods, err := om.podsLister.Pods(set.Namespace).List(selector) - if err != nil { - t.Error(err) - } - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { - t.Errorf("Error updating StatefulSet %s", err) - } - if err := invariants(set, om); err != nil { - t.Error(err) - } - pods, err = om.podsLister.Pods(set.Namespace).List(selector) - if err != nil { - t.Error(err) - } - pods[0].Status.Phase = v1.PodFailed - om.podsIndexer.Update(pods[0]) - om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) { - t.Errorf("StatefulSet failed to %s", err) - } - if err := invariants(set, om); err != nil { - t.Error(err) - } - if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { - t.Errorf("Error updating StatefulSet %s", err) - } - if err := invariants(set, om); err != nil { - t.Error(err) - } - pods, err = om.podsLister.Pods(set.Namespace).List(selector) - if err != nil { - t.Error(err) - } - if isCreated(pods[0]) { - t.Error("StatefulSet did not recreate failed Pod") - } -} - func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) om, _, ssc := setupController(client) @@ -792,7 +818,7 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { } *set.Spec.Replicas = 0 om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) { + if err := scaleDownStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) { t.Errorf("StatefulSetControl failed to throw error on delete %s", err) } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -834,8 +860,7 @@ func TestStatefulSetControl_getSetRevisions(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) spc := NewStatefulPodControlFromManager(newFakeObjectManager(informerFactory), &noopRecorder{}) ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets()) - recorder := &noopRecorder{} - ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder} + ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())} stop := make(chan struct{}) defer close(stop) @@ -2501,6 +2526,11 @@ func (om *fakeObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) } func (om *fakeObjectManager) UpdatePod(pod *v1.Pod) error { + defer om.updatePodTracker.inc() + if om.updatePodTracker.errorReady() { + defer om.updatePodTracker.reset() + return om.updatePodTracker.getErr() + } return om.podsIndexer.Update(pod) } @@ -3356,6 +3386,16 @@ func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRev } func isOrHasInternalError(err error) bool { - agg, ok := err.(utilerrors.Aggregate) - return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0]) + if err == nil { + return false + } + var agg utilerrors.Aggregate + if errors.As(err, &agg) { + for _, e := range agg.Errors() { + if apierrors.IsInternalError(e) { + return true + } + } + } + return apierrors.IsInternalError(err) } diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index a3934a3194b..8f54b3598ce 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -37,7 +37,6 @@ import ( "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" @@ -946,8 +945,7 @@ func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime ssh := history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()) ssc.podListerSynced = alwaysReady ssc.setListerSynced = alwaysReady - recorder := record.NewFakeRecorder(10) - ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh, recorder) + ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh) return ssc, spc, om, ssh } diff --git a/test/e2e/framework/statefulset/rest.go b/test/e2e/framework/statefulset/rest.go index 5047d86c6a1..2f18d6e333e 100644 --- a/test/e2e/framework/statefulset/rest.go +++ b/test/e2e/framework/statefulset/rest.go @@ -25,12 +25,12 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" "k8s.io/kubectl/pkg/util/podutils" "k8s.io/kubernetes/test/e2e/framework" e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest" @@ -247,21 +247,23 @@ func ExecInStatefulPods(ctx context.Context, c clientset.Interface, ss *appsv1.S } // update updates a statefulset, and it is only used within rest.go -func update(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) *appsv1.StatefulSet { - for i := 0; i < 3; i++ { - ss, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{}) +func update(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) (ss *appsv1.StatefulSet) { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var err error + ss, err = c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{}) if err != nil { framework.Failf("failed to get statefulset %q: %v", name, err) } + if *(ss.Spec.Replicas) == replicas { + return nil + } *(ss.Spec.Replicas) = replicas ss, err = c.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{}) - if err == nil { - return ss - } - if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) { - framework.Failf("failed to update statefulset %q: %v", name, err) - } + return err + }) + if err == nil { + return ss } - framework.Failf("too many retries draining statefulset %q", name) + framework.Failf("failed to update statefulset %q: %v", name, err) return nil }