From c1cc18ccd5f7652d5a8225eb747ee1f8aa13f6bd Mon Sep 17 00:00:00 2001 From: Rahul Rangith Date: Tue, 18 Oct 2022 17:05:16 -0400 Subject: [PATCH 1/6] Automatically recreate pvc when sts pod is stuck in pending --- .../statefulset/stateful_pod_control.go | 8 +- .../statefulset/stateful_set_control.go | 12 +++ .../statefulset/stateful_set_control_test.go | 47 ++++++++++ .../statefulset/stateful_set_utils.go | 5 ++ test/e2e/apps/statefulset.go | 87 +++++++++++++++++++ 5 files changed, 155 insertions(+), 4 deletions(-) diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index 2b0546b4632..f762f87a743 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -114,7 +114,7 @@ func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentV func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { // Create the Pod's PVCs prior to creating the Pod - if err := spc.createPersistentVolumeClaims(set, pod); err != nil { + if err := spc.CreatePersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("create", set, pod, err) return err } @@ -150,7 +150,7 @@ func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1. if !storageMatches(set, pod) { updateStorage(set, pod) consistent = false - if err := spc.createPersistentVolumeClaims(set, pod); err != nil { + if err := spc.CreatePersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } @@ -315,11 +315,11 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulS } } -// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of +// CreatePersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with // set's Spec. -func (spc *StatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { +func (spc *StatefulPodControl) CreatePersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { var errs []error for _, claim := range getPersistentVolumeClaims(set, pod) { pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 47722c3524f..64f30e2f7a5 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -445,6 +445,18 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // pod created, no more work possible for this round continue } + + // If the Pod is in pending state then trigger PVC creation to create missing PVCs + if isPending(replicas[i]) { + klog.V(4).Infof( + "StatefulSet %s/%s is triggering PVC creation for pending Pod %s", + set.Namespace, + set.Name, + replicas[i].Name) + if err := ssc.podControl.CreatePersistentVolumeClaims(set, replicas[i]); err != nil { + return &status, err + } + } // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. if isTerminating(replicas[i]) && monotonic { diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index e3a9c61666d..a97b5210f8c 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -174,6 +174,7 @@ func TestStatefulSetControl(t *testing.T) { {UpdateSetStatusFailure, simpleSetFn}, {PodRecreateDeleteFailure, simpleSetFn}, {NewRevisionDeletePodFailure, simpleSetFn}, + {RecreatesPVCForPendingPod, simpleSetFn}, } for _, testCase := range testCases { @@ -697,6 +698,45 @@ func CreatesPodsWithStartOrdinal(t *testing.T, set *apps.StatefulSet, invariants } } +func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { + client := fake.NewSimpleClientset() + om, _, ssc := setupController(client) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Error(err) + } + pods, err := om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Errorf("Error updating StatefulSet %s", err) + } + if err := invariants(set, om); err != nil { + t.Error(err) + } + pods, err = om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + for _, claim := range getPersistentVolumeClaims(set, pods[0]) { + om.claimsIndexer.Delete(&claim) + } + pods[0].Status.Phase = v1.PodPending + om.podsIndexer.Update(pods[0]) + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Errorf("Error updating StatefulSet %s", err) + } + // invariants check if there any missing PVCs for the Pods + if err := invariants(set, om); err != nil { + t.Error(err) + } + _, err = om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } +} + func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { runTestOverPVCRetentionPolicies( t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) { @@ -2401,6 +2441,13 @@ func (om *fakeObjectManager) DeletePod(pod *v1.Pod) error { return nil // Not found, no error in deleting. } +func (om *fakeObjectManager) CreatePersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { + for _, claim := range getPersistentVolumeClaims(set, pod) { + om.claimsIndexer.Update(&claim) + } + return nil +} + func (om *fakeObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error { om.claimsIndexer.Update(claim) return nil diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index f3aa894a585..ac6b78d4a6f 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -413,6 +413,11 @@ func isCreated(pod *v1.Pod) bool { return pod.Status.Phase != "" } +// isPending returns true if pod has a Phase of PodPending +func isPending(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodPending +} + // isFailed returns true if pod has a Phase of PodFailed func isFailed(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodFailed diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index 55ca49af3ce..e27929635a7 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" @@ -1347,6 +1348,92 @@ var _ = SIGDescribe("StatefulSet", func() { framework.ExpectNoError(err) }) }) + + ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Feature:AutomaticPVCRecreation]", func() { + // This test must be run in an environment that will allow the test pod to be scheduled on a node with local volumes + ssName := "test-ss" + headlessSvcName := "test" + // Define StatefulSet Labels + ssPodLabels := map[string]string{ + "name": "sample-pod", + "pod": WebserverImageName, + } + + statefulPodMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} + ss := e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, ssPodLabels) + e2epv.SkipIfNoDefaultStorageClass(c) + ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) + _, err := c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming PVC exists") + err = verifyStatefulSetPVCsExist(c, ss, []int{0}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming Pod is ready") + e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1) + podName := getStatefulSetPodNameAtIndex(0, ss) + pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + nodeName := pod.Spec.NodeName + node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + oldData, err := json.Marshal(node) + framework.ExpectNoError(err) + + node.Spec.Unschedulable = true + + newData, err := json.Marshal(node) + framework.ExpectNoError(err) + + // cordon node, to make sure pod does not get scheduled to the node until the pvc is deleted + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + framework.ExpectNoError(err) + ginkgo.By("Cordoning Node") + _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) + + // wait for the node to be patched + time.Sleep(5 * time.Second) + node, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + framework.ExpectEqual(node.Spec.Unschedulable, true) + + ginkgo.By("Deleting Pod") + err = c.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // wait for the pod to be recreated + time.Sleep(10 * time.Second) + _, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()}) + framework.ExpectNoError(err) + pvcName := pvcList.Items[0].Name + + ginkgo.By("Deleting PVC") + err = c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvcName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Uncordoning Node") + // uncordon node, by reverting patch + revertPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{}) + framework.ExpectNoError(err) + _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, revertPatchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming PVC recreated") + err = verifyStatefulSetPVCsExist(c, ss, []int{0}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming Pod is ready after being recreated") + e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1) + _, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + }) }) func kubectlExecWithRetries(ns string, args ...string) (out string) { From e6a90aa48a1e29e1ece9fa14d7324796ffa78cc6 Mon Sep 17 00:00:00 2001 From: Rahul Rangith Date: Mon, 14 Nov 2022 15:04:16 -0500 Subject: [PATCH 2/6] PR feedback --- test/e2e/apps/statefulset.go | 36 ++++++++++++++++---------- test/e2e/framework/node/wait.go | 17 ++++++++++++ test/e2e/framework/statefulset/wait.go | 25 ++++++++++++++++++ 3 files changed, 65 insertions(+), 13 deletions(-) diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index e27929635a7..db209cf9cba 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -1349,7 +1349,7 @@ var _ = SIGDescribe("StatefulSet", func() { }) }) - ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Feature:AutomaticPVCRecreation]", func() { + ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Feature:AutomaticPVCRecreation][Disruptive][Serial]", func() { // This test must be run in an environment that will allow the test pod to be scheduled on a node with local volumes ssName := "test-ss" headlessSvcName := "test" @@ -1394,36 +1394,37 @@ var _ = SIGDescribe("StatefulSet", func() { ginkgo.By("Cordoning Node") _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) framework.ExpectNoError(err) + cordoned := true - // wait for the node to be patched - time.Sleep(5 * time.Second) - node, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - framework.ExpectNoError(err) - framework.ExpectEqual(node.Spec.Unschedulable, true) + defer func() { + if cordoned { + uncordonNode(c, oldData, newData, nodeName) + } + }() + + // wait for the node to be unschedulable + e2enode.WaitForNodeSchedulable(c, nodeName, 10*time.Second, false) ginkgo.By("Deleting Pod") err = c.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{}) framework.ExpectNoError(err) // wait for the pod to be recreated - time.Sleep(10 * time.Second) + e2estatefulset.WaitForStatusCurrentReplicas(c, ss, 1) _, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) framework.ExpectNoError(err) pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()}) framework.ExpectNoError(err) + framework.ExpectEqual(len(pvcList.Items), 1) pvcName := pvcList.Items[0].Name ginkgo.By("Deleting PVC") err = c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvcName, metav1.DeleteOptions{}) framework.ExpectNoError(err) - ginkgo.By("Uncordoning Node") - // uncordon node, by reverting patch - revertPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{}) - framework.ExpectNoError(err) - _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, revertPatchBytes, metav1.PatchOptions{}) - framework.ExpectNoError(err) + uncordonNode(c, oldData, newData, nodeName) + cordoned = false ginkgo.By("Confirming PVC recreated") err = verifyStatefulSetPVCsExist(c, ss, []int{0}) @@ -1436,6 +1437,15 @@ var _ = SIGDescribe("StatefulSet", func() { }) }) +func uncordonNode(c clientset.Interface, oldData, newData []byte, nodeName string) { + ginkgo.By("Uncordoning Node") + // uncordon node, by reverting patch + revertPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{}) + framework.ExpectNoError(err) + _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, revertPatchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) +} + func kubectlExecWithRetries(ns string, args ...string) (out string) { var err error for i := 0; i < 3; i++ { diff --git a/test/e2e/framework/node/wait.go b/test/e2e/framework/node/wait.go index 60b79cd6477..ffda416b693 100644 --- a/test/e2e/framework/node/wait.go +++ b/test/e2e/framework/node/wait.go @@ -143,6 +143,23 @@ func WaitForNodeToBeReady(ctx context.Context, c clientset.Interface, name strin return WaitConditionToBe(ctx, c, name, v1.NodeReady, true, timeout) } +func WaitForNodeSchedulable(c clientset.Interface, name string, timeout time.Duration, wantSchedulable bool) bool { + framework.Logf("Waiting up to %v for node %s to be schedulable: %t", timeout, name, wantSchedulable) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + node, err := c.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + framework.Logf("Couldn't get node %s", name) + continue + } + + if IsNodeSchedulable(node) == wantSchedulable { + return true + } + } + framework.Logf("Node %s didn't reach desired schedulable status (%t) within %v", name, wantSchedulable, timeout) + return false +} + // CheckReady waits up to timeout for cluster to has desired size and // there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes. func CheckReady(ctx context.Context, c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) { diff --git a/test/e2e/framework/statefulset/wait.go b/test/e2e/framework/statefulset/wait.go index f05c50dc353..7e4d7053788 100644 --- a/test/e2e/framework/statefulset/wait.go +++ b/test/e2e/framework/statefulset/wait.go @@ -171,6 +171,31 @@ func WaitForStatusReplicas(ctx context.Context, c clientset.Interface, ss *appsv } } +// WaitForStatusCurrentReplicas waits for the ss.Status.CurrentReplicas to be equal to expectedReplicas +func WaitForStatusCurrentReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) { + framework.Logf("Waiting for statefulset status.currentReplicas updated to %d", expectedReplicas) + + ns, name := ss.Namespace, ss.Name + pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, + func() (bool, error) { + ssGet, err := c.AppsV1().StatefulSets(ns).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if ssGet.Status.ObservedGeneration < ss.Generation { + return false, nil + } + if ssGet.Status.CurrentReplicas != expectedReplicas { + framework.Logf("Waiting for stateful set status.currentReplicas to become %d, currently %d", expectedReplicas, ssGet.Status.CurrentReplicas) + return false, nil + } + return true, nil + }) + if pollErr != nil { + framework.Failf("Failed waiting for stateful set status.currentReplicas updated to %d: %v", expectedReplicas, pollErr) + } +} + // Saturate waits for all Pods in ss to become Running and Ready. func Saturate(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) { var i int32 From 3cf636b22e0ac41e5a7be0a4bb1469885557ea16 Mon Sep 17 00:00:00 2001 From: Rahul Rangith Date: Thu, 1 Dec 2022 15:26:36 -0500 Subject: [PATCH 3/6] PR feedback --- pkg/controller/statefulset/stateful_pod_control.go | 10 +++++----- pkg/controller/statefulset/stateful_set_control.go | 9 ++++++++- .../statefulset/stateful_set_control_test.go | 2 +- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index f762f87a743..d0c051e2aba 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -22,7 +22,7 @@ import ( "strings" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" errorutils "k8s.io/apimachinery/pkg/util/errors" @@ -114,7 +114,7 @@ func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentV func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { // Create the Pod's PVCs prior to creating the Pod - if err := spc.CreatePersistentVolumeClaims(set, pod); err != nil { + if err := spc.createPersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("create", set, pod, err) return err } @@ -150,7 +150,7 @@ func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1. if !storageMatches(set, pod) { updateStorage(set, pod) consistent = false - if err := spc.CreatePersistentVolumeClaims(set, pod); err != nil { + if err := spc.createPersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } @@ -315,11 +315,11 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulS } } -// CreatePersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of +// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with // set's Spec. -func (spc *StatefulPodControl) CreatePersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { +func (spc *StatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { var errs []error for _, claim := range getPersistentVolumeClaims(set, pod) { pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 64f30e2f7a5..ea14c300796 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -453,9 +453,16 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( set.Namespace, set.Name, replicas[i].Name) - if err := ssc.podControl.CreatePersistentVolumeClaims(set, replicas[i]); err != nil { + if err := ssc.podControl.createPersistentVolumeClaims(set, replicas[i]); err != nil { return &status, err } + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + // Set PVC policy as much as is possible at this point. + if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(set, replicas[i]); err != nil { + ssc.podControl.recordPodEvent("update", set, replicas[i], err) + return &status, err + } + } } // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index a97b5210f8c..32cc07a3bbf 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -2441,7 +2441,7 @@ func (om *fakeObjectManager) DeletePod(pod *v1.Pod) error { return nil // Not found, no error in deleting. } -func (om *fakeObjectManager) CreatePersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { +func (om *fakeObjectManager) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { for _, claim := range getPersistentVolumeClaims(set, pod) { om.claimsIndexer.Update(&claim) } From 392cd5ce8c629709d33c0af3a09a49b64f3d70f5 Mon Sep 17 00:00:00 2001 From: Rahul Rangith Date: Fri, 2 Dec 2022 10:04:34 -0500 Subject: [PATCH 4/6] Make e2e test not rely on local volumes --- .../statefulset/stateful_pod_control.go | 2 +- .../statefulset/stateful_set_control_test.go | 7 ------- test/e2e/apps/statefulset.go | 16 ++++++++++++---- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index d0c051e2aba..2b0546b4632 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -22,7 +22,7 @@ import ( "strings" apps "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" errorutils "k8s.io/apimachinery/pkg/util/errors" diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 32cc07a3bbf..ef1c38f09f1 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -2441,13 +2441,6 @@ func (om *fakeObjectManager) DeletePod(pod *v1.Pod) error { return nil // Not found, no error in deleting. } -func (om *fakeObjectManager) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { - for _, claim := range getPersistentVolumeClaims(set, pod) { - om.claimsIndexer.Update(&claim) - } - return nil -} - func (om *fakeObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error { om.claimsIndexer.Update(claim) return nil diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index db209cf9cba..814e1969c33 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -1349,8 +1349,7 @@ var _ = SIGDescribe("StatefulSet", func() { }) }) - ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Feature:AutomaticPVCRecreation][Disruptive][Serial]", func() { - // This test must be run in an environment that will allow the test pod to be scheduled on a node with local volumes + ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Disruptive][Serial]", func() { ssName := "test-ss" headlessSvcName := "test" // Define StatefulSet Labels @@ -1359,11 +1358,18 @@ var _ = SIGDescribe("StatefulSet", func() { "pod": WebserverImageName, } + readyNode, err := e2enode.GetReadySchedulableWorkerNode(c) + framework.ExpectNoError(err) + hostLabel := "kubernetes.io/hostname" + hostLabelVal := readyNode.Labels[hostLabel] + statefulPodMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} ss := e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, ssPodLabels) + ss.Spec.Template.Spec.NodeSelector = map[string]string{hostLabel: hostLabelVal} // force the pod on a specific node + e2epv.SkipIfNoDefaultStorageClass(c) ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) - _, err := c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{}) + _, err = c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Confirming PVC exists") @@ -1377,6 +1383,7 @@ var _ = SIGDescribe("StatefulSet", func() { framework.ExpectNoError(err) nodeName := pod.Spec.NodeName + framework.ExpectEqual(nodeName, readyNode.Name) node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) framework.ExpectNoError(err) @@ -1432,8 +1439,9 @@ var _ = SIGDescribe("StatefulSet", func() { ginkgo.By("Confirming Pod is ready after being recreated") e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1) - _, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + pod, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) framework.ExpectNoError(err) + framework.ExpectEqual(pod.Spec.NodeName, readyNode.Name) // confirm the pod was scheduled back to the original node }) }) From 8924b58e2c69770f54afca24284c2dbc65bee9ad Mon Sep 17 00:00:00 2001 From: Rahul Rangith Date: Mon, 19 Dec 2022 09:00:06 -0500 Subject: [PATCH 5/6] Cleanup statefulset after e2e test --- test/e2e/apps/statefulset.go | 176 +++++++++++++++++++---------------- 1 file changed, 95 insertions(+), 81 deletions(-) diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index 814e1969c33..4a0f11621d8 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -1349,99 +1349,113 @@ var _ = SIGDescribe("StatefulSet", func() { }) }) - ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Disruptive][Serial]", func() { - ssName := "test-ss" - headlessSvcName := "test" - // Define StatefulSet Labels - ssPodLabels := map[string]string{ - "name": "sample-pod", - "pod": WebserverImageName, + ginkgo.Describe("Automatically recreate PVC for pending pod when PVC is missing", func() { + ssName := "ss" + labels := map[string]string{ + "foo": "bar", + "baz": "blah", } + headlessSvcName := "test" + var statefulPodMounts []v1.VolumeMount + var ss *appsv1.StatefulSet - readyNode, err := e2enode.GetReadySchedulableWorkerNode(c) - framework.ExpectNoError(err) - hostLabel := "kubernetes.io/hostname" - hostLabelVal := readyNode.Labels[hostLabel] + ginkgo.BeforeEach(func(ctx context.Context) { + statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} + ss = e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, labels) + }) - statefulPodMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} - ss := e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, ssPodLabels) - ss.Spec.Template.Spec.NodeSelector = map[string]string{hostLabel: hostLabelVal} // force the pod on a specific node - - e2epv.SkipIfNoDefaultStorageClass(c) - ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) - _, err = c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{}) - framework.ExpectNoError(err) - - ginkgo.By("Confirming PVC exists") - err = verifyStatefulSetPVCsExist(c, ss, []int{0}) - framework.ExpectNoError(err) - - ginkgo.By("Confirming Pod is ready") - e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1) - podName := getStatefulSetPodNameAtIndex(0, ss) - pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) - framework.ExpectNoError(err) - - nodeName := pod.Spec.NodeName - framework.ExpectEqual(nodeName, readyNode.Name) - node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - framework.ExpectNoError(err) - - oldData, err := json.Marshal(node) - framework.ExpectNoError(err) - - node.Spec.Unschedulable = true - - newData, err := json.Marshal(node) - framework.ExpectNoError(err) - - // cordon node, to make sure pod does not get scheduled to the node until the pvc is deleted - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) - framework.ExpectNoError(err) - ginkgo.By("Cordoning Node") - _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) - framework.ExpectNoError(err) - cordoned := true - - defer func() { - if cordoned { - uncordonNode(c, oldData, newData, nodeName) + ginkgo.AfterEach(func(ctx context.Context) { + if ginkgo.CurrentSpecReport().Failed() { + e2eoutput.DumpDebugInfo(ctx, c, ns) } - }() + framework.Logf("Deleting all statefulset in ns %v", ns) + e2estatefulset.DeleteAllStatefulSets(ctx, c, ns) + }) - // wait for the node to be unschedulable - e2enode.WaitForNodeSchedulable(c, nodeName, 10*time.Second, false) + ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Disruptive][Serial]", func(ctx context.Context) { + e2epv.SkipIfNoDefaultStorageClass(ctx, c) - ginkgo.By("Deleting Pod") - err = c.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{}) - framework.ExpectNoError(err) + readyNode, err := e2enode.GetRandomReadySchedulableNode(ctx, c) + framework.ExpectNoError(err) + hostLabel := "kubernetes.io/hostname" + hostLabelVal := readyNode.Labels[hostLabel] - // wait for the pod to be recreated - e2estatefulset.WaitForStatusCurrentReplicas(c, ss, 1) - _, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) - framework.ExpectNoError(err) + ss.Spec.Template.Spec.NodeSelector = map[string]string{hostLabel: hostLabelVal} // force the pod on a specific node + ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) + _, err = c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{}) + framework.ExpectNoError(err) - pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()}) - framework.ExpectNoError(err) - framework.ExpectEqual(len(pvcList.Items), 1) - pvcName := pvcList.Items[0].Name + ginkgo.By("Confirming PVC exists") + err = verifyStatefulSetPVCsExist(ctx, c, ss, []int{0}) + framework.ExpectNoError(err) - ginkgo.By("Deleting PVC") - err = c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvcName, metav1.DeleteOptions{}) - framework.ExpectNoError(err) + ginkgo.By("Confirming Pod is ready") + e2estatefulset.WaitForStatusReadyReplicas(ctx, c, ss, 1) + podName := getStatefulSetPodNameAtIndex(0, ss) + pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) - uncordonNode(c, oldData, newData, nodeName) - cordoned = false + nodeName := pod.Spec.NodeName + framework.ExpectEqual(nodeName, readyNode.Name) + node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) - ginkgo.By("Confirming PVC recreated") - err = verifyStatefulSetPVCsExist(c, ss, []int{0}) - framework.ExpectNoError(err) + oldData, err := json.Marshal(node) + framework.ExpectNoError(err) - ginkgo.By("Confirming Pod is ready after being recreated") - e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1) - pod, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) - framework.ExpectNoError(err) - framework.ExpectEqual(pod.Spec.NodeName, readyNode.Name) // confirm the pod was scheduled back to the original node + node.Spec.Unschedulable = true + + newData, err := json.Marshal(node) + framework.ExpectNoError(err) + + // cordon node, to make sure pod does not get scheduled to the node until the pvc is deleted + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + framework.ExpectNoError(err) + ginkgo.By("Cordoning Node") + _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) + cordoned := true + + defer func() { + if cordoned { + uncordonNode(c, oldData, newData, nodeName) + } + }() + + // wait for the node to be unschedulable + e2enode.WaitForNodeSchedulable(c, nodeName, 10*time.Second, false) + + ginkgo.By("Deleting Pod") + err = c.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // wait for the pod to be recreated + e2estatefulset.WaitForStatusCurrentReplicas(c, ss, 1) + _, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()}) + framework.ExpectNoError(err) + framework.ExpectEqual(len(pvcList.Items), 1) + pvcName := pvcList.Items[0].Name + + ginkgo.By("Deleting PVC") + err = c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvcName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + uncordonNode(c, oldData, newData, nodeName) + cordoned = false + + ginkgo.By("Confirming PVC recreated") + err = verifyStatefulSetPVCsExist(ctx, c, ss, []int{0}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming Pod is ready after being recreated") + e2estatefulset.WaitForStatusReadyReplicas(ctx, c, ss, 1) + pod, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + framework.ExpectEqual(pod.Spec.NodeName, readyNode.Name) // confirm the pod was scheduled back to the original node + }) }) }) From 2f0eb543c74e9716ef11f23de118880035ce0906 Mon Sep 17 00:00:00 2001 From: Rahul Rangith Date: Fri, 6 Jan 2023 17:22:26 -0500 Subject: [PATCH 6/6] New function for creating missing pvcs --- .../statefulset/stateful_pod_control.go | 16 ++++++++++++++++ .../statefulset/stateful_set_control.go | 9 +-------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index 2b0546b4632..8e2dcf8d795 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -315,6 +315,22 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulS } } +// createMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy +func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { + if err := spc.createPersistentVolumeClaims(set, pod); err != nil { + return err + } + + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + // Set PVC policy as much as is possible at this point. + if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil { + spc.recordPodEvent("update", set, pod, err) + return err + } + } + return nil +} + // createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index ea14c300796..276e41ef09f 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -453,16 +453,9 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( set.Namespace, set.Name, replicas[i].Name) - if err := ssc.podControl.createPersistentVolumeClaims(set, replicas[i]); err != nil { + if err := ssc.podControl.createMissingPersistentVolumeClaims(set, replicas[i]); err != nil { return &status, err } - if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { - // Set PVC policy as much as is possible at this point. - if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(set, replicas[i]); err != nil { - ssc.podControl.recordPodEvent("update", set, replicas[i], err) - return &status, err - } - } } // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress.