From 8b18f7c869deb0554579fdd7cae9d9691424c940 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Thu, 5 May 2022 15:34:05 +0000 Subject: [PATCH] Retry when update unschedulable pods status on scheduling loop --- pkg/scheduler/util/utils.go | 10 ++- pkg/scheduler/util/utils_test.go | 109 +++++++++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 9 deletions(-) diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index 929dfb8fa44..c597510fb62 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -26,8 +26,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" extenderv1 "k8s.io/kube-scheduler/extender/v1" @@ -115,8 +117,12 @@ func PatchPodStatus(ctx context.Context, cs kubernetes.Interface, old *v1.Pod, n return nil } - _, err = cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") - return err + patchFn := func() error { + _, err := cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + return err + } + + return retry.OnError(retry.DefaultBackoff, net.IsConnectionRefused, patchFn) } // DeletePod deletes the given from API server diff --git a/pkg/scheduler/util/utils_test.go b/pkg/scheduler/util/utils_test.go index a97f2a10546..34ad386c2a2 100644 --- a/pkg/scheduler/util/utils_test.go +++ b/pkg/scheduler/util/utils_test.go @@ -18,7 +18,9 @@ package util import ( "context" + "errors" "fmt" + "syscall" "testing" "time" @@ -27,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/net" clientsetfake "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" extenderv1 "k8s.io/kube-scheduler/extender/v1" @@ -181,12 +184,17 @@ func TestRemoveNominatedNodeName(t *testing.T) { func TestPatchPodStatus(t *testing.T) { tests := []struct { - name string - pod v1.Pod + name string + pod v1.Pod + client *clientsetfake.Clientset + // validateErr checks if error returned from PatchPodStatus is expected one or not. + // (true means error is expected one.) + validateErr func(goterr error) bool statusToUpdate v1.PodStatus }{ { - name: "Should update pod conditions successfully", + name: "Should update pod conditions successfully", + client: clientsetfake.NewSimpleClientset(), pod: v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "ns", @@ -209,11 +217,12 @@ func TestPatchPodStatus(t *testing.T) { // ref: #101697, #94626 - ImagePullSecrets are allowed to have empty secret names // which would fail the 2-way merge patch generation on Pod patches // due to the mergeKey being the name field - name: "Should update pod conditions successfully on a pod Spec with secrets with empty name", + name: "Should update pod conditions successfully on a pod Spec with secrets with empty name", + client: clientsetfake.NewSimpleClientset(), pod: v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: "ns", - Name: "pod2", + Name: "pod1", }, Spec: v1.PodSpec{ // this will serialize to imagePullSecrets:[{}] @@ -229,11 +238,90 @@ func TestPatchPodStatus(t *testing.T) { }, }, }, + { + name: "retry patch request when an 'connection refused' error is returned", + client: func() *clientsetfake.Clientset { + client := clientsetfake.NewSimpleClientset() + + reqcount := 0 + client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + defer func() { reqcount++ }() + if reqcount == 0 { + // return an connection refused error for the first patch request. + return true, &v1.Pod{}, fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED) + } + if reqcount == 1 { + // not return error for the second patch request. + return false, &v1.Pod{}, nil + } + + // return error if requests comes in more than three times. + return true, nil, errors.New("requests comes in more than three times.") + }) + + return client + }(), + pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "pod1", + }, + Spec: v1.PodSpec{ + ImagePullSecrets: []v1.LocalObjectReference{{Name: "foo"}}, + }, + }, + statusToUpdate: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + }, + }, + }, + }, + { + name: "only 4 retries at most", + client: func() *clientsetfake.Clientset { + client := clientsetfake.NewSimpleClientset() + + reqcount := 0 + client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + defer func() { reqcount++ }() + if reqcount >= 4 { + // return error if requests comes in more than four times. + return true, nil, errors.New("requests comes in more than four times.") + } + + // return an connection refused error for the first patch request. + return true, &v1.Pod{}, fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED) + }) + + return client + }(), + pod: v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "pod1", + }, + Spec: v1.PodSpec{ + ImagePullSecrets: []v1.LocalObjectReference{{Name: "foo"}}, + }, + }, + validateErr: net.IsConnectionRefused, + statusToUpdate: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + }, + }, + }, + }, } - client := clientsetfake.NewSimpleClientset() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + client := tc.client _, err := client.CoreV1().Pods(tc.pod.Namespace).Create(context.TODO(), &tc.pod, metav1.CreateOptions{}) if err != nil { t.Fatal(err) @@ -242,9 +330,16 @@ func TestPatchPodStatus(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() err = PatchPodStatus(ctx, client, &tc.pod, &tc.statusToUpdate) - if err != nil { + if err != nil && tc.validateErr == nil { + // shouldn't be error t.Fatal(err) } + if tc.validateErr != nil { + if !tc.validateErr(err) { + t.Fatalf("Returned unexpected error: %v", err) + } + return + } retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{}) if err != nil {