From e07d898cfd203502792f9c2d1f30b0cdc3be773a Mon Sep 17 00:00:00 2001 From: Aleksandra Malinowska Date: Mon, 4 Sep 2023 10:46:34 +0200 Subject: [PATCH] Make StatefulSet restart pods with phase Succeeded --- .../statefulset/stateful_set_control.go | 28 +++++++--- .../statefulset/stateful_set_control_test.go | 14 ++++- .../statefulset/stateful_set_utils.go | 5 ++ .../statefulset/statefulset_test.go | 54 ++++++++++++++----- 4 files changed, 79 insertions(+), 22 deletions(-) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index f55b7d1d67c..0d01dc0844e 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -375,13 +375,27 @@ func (ssc *defaultStatefulSetControl) processReplica( replicas []*v1.Pod, i int) (bool, error) { logger := klog.FromContext(ctx) - // delete and recreate failed pods - if isFailed(replicas[i]) { - ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", - "StatefulSet %s/%s is recreating failed Pod %s", - set.Namespace, - set.Name, - replicas[i].Name) + // Delete and recreate pods which finished running. + // + // Note that pods with phase Succeeded will also trigger this event. This is + // because final pod phase of evicted or otherwise forcibly stopped pods + // (e.g. terminated on node reboot) is determined by the exit code of the + // container, not by the reason for pod termination. We should restart the pod + // regardless of the exit code. + if isFailed(replicas[i]) || isSucceeded(replicas[i]) { + if isFailed(replicas[i]) { + ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", + "StatefulSet %s/%s is recreating failed Pod %s", + set.Namespace, + set.Name, + replicas[i].Name) + } else { + ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod", + "StatefulSet %s/%s is recreating terminated Pod %s", + set.Namespace, + set.Name, + replicas[i].Name) + } if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { return true, err } diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 08af86bedc4..ddaef5324da 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -170,6 +170,7 @@ func TestStatefulSetControl(t *testing.T) { {ScalesDown, simpleSetFn}, {ReplacesPods, largeSetFn}, {RecreatesFailedPod, simpleSetFn}, + {RecreatesSucceededPod, simpleSetFn}, {CreatePodFailure, simpleSetFn}, {UpdatePodFailure, simpleSetFn}, {UpdateSetStatusFailure, simpleSetFn}, @@ -397,7 +398,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) } } -func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { +func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) { client := fake.NewSimpleClientset() om, _, ssc := setupController(client) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) @@ -418,7 +419,7 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian if err != nil { t.Error(err) } - pods[0].Status.Phase = v1.PodFailed + pods[0].Status.Phase = phase om.podsIndexer.Update(pods[0]) if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) @@ -433,6 +434,15 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian if isCreated(pods[0]) { t.Error("StatefulSet did not recreate failed Pod") } + +} + +func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { + recreatesPod(t, set, invariants, v1.PodFailed) +} + +func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { + recreatesPod(t, set, invariants, v1.PodSucceeded) } func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index f46cbed61ba..f222cf65ef5 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -426,6 +426,11 @@ func isFailed(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodFailed } +// isSucceeded returns true if pod has a Phase of PodSucceeded +func isSucceeded(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodSucceeded +} + // isTerminating returns true if pod's DeletionTimestamp has been set func isTerminating(pod *v1.Pod) bool { return pod.DeletionTimestamp != nil diff --git a/test/integration/statefulset/statefulset_test.go b/test/integration/statefulset/statefulset_test.go index 4bd42ff0f12..ae8e0fa31a5 100644 --- a/test/integration/statefulset/statefulset_test.go +++ b/test/integration/statefulset/statefulset_test.go @@ -28,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -168,7 +169,7 @@ func TestSpecReplicasChange(t *testing.T) { } } -func TestDeletingAndFailedPods(t *testing.T) { +func TestDeletingAndTerminatingPods(t *testing.T) { _, ctx := ktesting.NewTestContext(t) closeFn, rm, informers, c := scSetup(ctx, t) defer closeFn() @@ -177,17 +178,19 @@ func TestDeletingAndFailedPods(t *testing.T) { cancel := runControllerAndInformers(rm, informers) defer cancel() + podCount := 3 + labelMap := labelMap() - sts := newSTS("sts", ns.Name, 2) + sts := newSTS("sts", ns.Name, podCount) stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{}) sts = stss[0] waitSTSStable(t, c, sts) - // Verify STS creates 2 pods + // Verify STS creates 3 pods podClient := c.CoreV1().Pods(ns.Name) pods := getPods(t, podClient, labelMap) - if len(pods.Items) != 2 { - t.Fatalf("len(pods) = %d, want 2", len(pods.Items)) + if len(pods.Items) != podCount { + t.Fatalf("len(pods) = %d, want %d", len(pods.Items), podCount) } // Set first pod as deleting pod @@ -206,23 +209,48 @@ func TestDeletingAndFailedPods(t *testing.T) { pod.Status.Phase = v1.PodFailed }) + // Set third pod as succeeded pod + succeededPod := &pods.Items[2] + updatePodStatus(t, podClient, succeededPod.Name, func(pod *v1.Pod) { + pod.Status.Phase = v1.PodSucceeded + }) + + exists := func(pods []v1.Pod, uid types.UID) bool { + for _, pod := range pods { + if pod.UID == uid { + return true + } + } + return false + } + if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { - // Verify only 2 pods exist: deleting pod and new pod replacing failed pod + // Verify only 3 pods exist: deleting pod and new pod replacing failed pod pods = getPods(t, podClient, labelMap) - if len(pods.Items) != 2 { + if len(pods.Items) != podCount { return false, nil } + // Verify deleting pod still exists // Immediately return false with an error if it does not exist - if pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID { + if !exists(pods.Items, deletingPod.UID) { return false, fmt.Errorf("expected deleting pod %s still exists, but it is not found", deletingPod.Name) } // Verify failed pod does not exist anymore - if pods.Items[0].UID == failedPod.UID || pods.Items[1].UID == failedPod.UID { + if exists(pods.Items, failedPod.UID) { return false, nil } - // Verify both pods have non-failed status - return pods.Items[0].Status.Phase != v1.PodFailed && pods.Items[1].Status.Phase != v1.PodFailed, nil + // Verify succeeded pod does not exist anymore + if exists(pods.Items, succeededPod.UID) { + return false, nil + } + // Verify all pods have non-terminated status + for _, pod := range pods.Items { + if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { + return false, nil + } + } + return true, nil }); err != nil { t.Fatalf("failed to verify failed pod %s has been replaced with a new non-failed pod, and deleting pod %s survives: %v", failedPod.Name, deletingPod.Name, err) } @@ -235,11 +263,11 @@ func TestDeletingAndFailedPods(t *testing.T) { if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { // Verify only 2 pods exist: new non-deleting pod replacing deleting pod and the non-failed pod pods = getPods(t, podClient, labelMap) - if len(pods.Items) != 2 { + if len(pods.Items) != podCount { return false, nil } // Verify deleting pod does not exist anymore - return pods.Items[0].UID != deletingPod.UID && pods.Items[1].UID != deletingPod.UID, nil + return !exists(pods.Items, deletingPod.UID), nil }); err != nil { t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err) }