Automatically recreate pvc when sts pod is stuck in pending

This commit is contained in:
Rahul Rangith 2022-10-18 17:05:16 -04:00
parent 7f8be71148
commit c1cc18ccd5
5 changed files with 155 additions and 4 deletions

View File

@ -114,7 +114,7 @@ func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentV
func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
// Create the Pod's PVCs prior to creating the Pod // Create the Pod's PVCs prior to creating the Pod
if err := spc.createPersistentVolumeClaims(set, pod); err != nil { if err := spc.CreatePersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("create", set, pod, err) spc.recordPodEvent("create", set, pod, err)
return err return err
} }
@ -150,7 +150,7 @@ func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.
if !storageMatches(set, pod) { if !storageMatches(set, pod) {
updateStorage(set, pod) updateStorage(set, pod)
consistent = false consistent = false
if err := spc.createPersistentVolumeClaims(set, pod); err != nil { if err := spc.CreatePersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err) spc.recordPodEvent("update", set, pod, err)
return err return err
} }
@ -315,11 +315,11 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulS
} }
} }
// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of // CreatePersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of
// set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
// may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
// set's Spec. // set's Spec.
func (spc *StatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { func (spc *StatefulPodControl) CreatePersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
var errs []error var errs []error
for _, claim := range getPersistentVolumeClaims(set, pod) { for _, claim := range getPersistentVolumeClaims(set, pod) {
pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name) pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name)

View File

@ -445,6 +445,18 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// pod created, no more work possible for this round // pod created, no more work possible for this round
continue continue
} }
// If the Pod is in pending state then trigger PVC creation to create missing PVCs
if isPending(replicas[i]) {
klog.V(4).Infof(
"StatefulSet %s/%s is triggering PVC creation for pending Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.CreatePersistentVolumeClaims(set, replicas[i]); err != nil {
return &status, err
}
}
// If we find a Pod that is currently terminating, we must wait until graceful deletion // If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress. // completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic { if isTerminating(replicas[i]) && monotonic {

View File

@ -174,6 +174,7 @@ func TestStatefulSetControl(t *testing.T) {
{UpdateSetStatusFailure, simpleSetFn}, {UpdateSetStatusFailure, simpleSetFn},
{PodRecreateDeleteFailure, simpleSetFn}, {PodRecreateDeleteFailure, simpleSetFn},
{NewRevisionDeletePodFailure, simpleSetFn}, {NewRevisionDeletePodFailure, simpleSetFn},
{RecreatesPVCForPendingPod, simpleSetFn},
} }
for _, testCase := range testCases { for _, testCase := range testCases {
@ -697,6 +698,45 @@ func CreatesPodsWithStartOrdinal(t *testing.T, set *apps.StatefulSet, invariants
} }
} }
func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset()
om, _, ssc := setupController(client)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
}
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
for _, claim := range getPersistentVolumeClaims(set, pods[0]) {
om.claimsIndexer.Delete(&claim)
}
pods[0].Status.Phase = v1.PodPending
om.podsIndexer.Update(pods[0])
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
// invariants check if there any missing PVCs for the Pods
if err := invariants(set, om); err != nil {
t.Error(err)
}
_, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
}
func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
runTestOverPVCRetentionPolicies( runTestOverPVCRetentionPolicies(
t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) { t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
@ -2401,6 +2441,13 @@ func (om *fakeObjectManager) DeletePod(pod *v1.Pod) error {
return nil // Not found, no error in deleting. return nil // Not found, no error in deleting.
} }
func (om *fakeObjectManager) CreatePersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
for _, claim := range getPersistentVolumeClaims(set, pod) {
om.claimsIndexer.Update(&claim)
}
return nil
}
func (om *fakeObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error { func (om *fakeObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error {
om.claimsIndexer.Update(claim) om.claimsIndexer.Update(claim)
return nil return nil

View File

@ -413,6 +413,11 @@ func isCreated(pod *v1.Pod) bool {
return pod.Status.Phase != "" return pod.Status.Phase != ""
} }
// isPending returns true if pod has a Phase of PodPending
func isPending(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodPending
}
// isFailed returns true if pod has a Phase of PodFailed // isFailed returns true if pod has a Phase of PodFailed
func isFailed(pod *v1.Pod) bool { func isFailed(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed return pod.Status.Phase == v1.PodFailed

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -1347,6 +1348,92 @@ var _ = SIGDescribe("StatefulSet", func() {
framework.ExpectNoError(err) framework.ExpectNoError(err)
}) })
}) })
ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Feature:AutomaticPVCRecreation]", func() {
// This test must be run in an environment that will allow the test pod to be scheduled on a node with local volumes
ssName := "test-ss"
headlessSvcName := "test"
// Define StatefulSet Labels
ssPodLabels := map[string]string{
"name": "sample-pod",
"pod": WebserverImageName,
}
statefulPodMounts := []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
ss := e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, ssPodLabels)
e2epv.SkipIfNoDefaultStorageClass(c)
ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns)
_, err := c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Confirming PVC exists")
err = verifyStatefulSetPVCsExist(c, ss, []int{0})
framework.ExpectNoError(err)
ginkgo.By("Confirming Pod is ready")
e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1)
podName := getStatefulSetPodNameAtIndex(0, ss)
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)
nodeName := pod.Spec.NodeName
node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
framework.ExpectNoError(err)
oldData, err := json.Marshal(node)
framework.ExpectNoError(err)
node.Spec.Unschedulable = true
newData, err := json.Marshal(node)
framework.ExpectNoError(err)
// cordon node, to make sure pod does not get scheduled to the node until the pvc is deleted
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
framework.ExpectNoError(err)
ginkgo.By("Cordoning Node")
_, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
framework.ExpectNoError(err)
// wait for the node to be patched
time.Sleep(5 * time.Second)
node, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
framework.ExpectNoError(err)
framework.ExpectEqual(node.Spec.Unschedulable, true)
ginkgo.By("Deleting Pod")
err = c.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{})
framework.ExpectNoError(err)
// wait for the pod to be recreated
time.Sleep(10 * time.Second)
_, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)
pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()})
framework.ExpectNoError(err)
pvcName := pvcList.Items[0].Name
ginkgo.By("Deleting PVC")
err = c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvcName, metav1.DeleteOptions{})
framework.ExpectNoError(err)
ginkgo.By("Uncordoning Node")
// uncordon node, by reverting patch
revertPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{})
framework.ExpectNoError(err)
_, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, revertPatchBytes, metav1.PatchOptions{})
framework.ExpectNoError(err)
ginkgo.By("Confirming PVC recreated")
err = verifyStatefulSetPVCsExist(c, ss, []int{0})
framework.ExpectNoError(err)
ginkgo.By("Confirming Pod is ready after being recreated")
e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1)
_, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)
})
}) })
func kubectlExecWithRetries(ns string, args ...string) (out string) { func kubectlExecWithRetries(ns string, args ...string) (out string) {