mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #109832 from sanposhiho/retry-on-update
Retry when it fails to update pods status on scheduling loop
This commit is contained in:
commit
92cb0ae6ba
@ -26,8 +26,10 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/util/net"
|
||||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/util/retry"
|
||||||
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
|
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||||
@ -115,8 +117,12 @@ func PatchPodStatus(ctx context.Context, cs kubernetes.Interface, old *v1.Pod, n
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
|
patchFn := func() error {
|
||||||
return err
|
_, err := cs.CoreV1().Pods(old.Namespace).Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return retry.OnError(retry.DefaultBackoff, net.IsConnectionRefused, patchFn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeletePod deletes the given <pod> from API server
|
// DeletePod deletes the given <pod> from API server
|
||||||
|
@ -18,7 +18,9 @@ package util
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -27,6 +29,7 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/net"
|
||||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||||
clienttesting "k8s.io/client-go/testing"
|
clienttesting "k8s.io/client-go/testing"
|
||||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||||
@ -181,12 +184,17 @@ func TestRemoveNominatedNodeName(t *testing.T) {
|
|||||||
|
|
||||||
func TestPatchPodStatus(t *testing.T) {
|
func TestPatchPodStatus(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
pod v1.Pod
|
pod v1.Pod
|
||||||
|
client *clientsetfake.Clientset
|
||||||
|
// validateErr checks if error returned from PatchPodStatus is expected one or not.
|
||||||
|
// (true means error is expected one.)
|
||||||
|
validateErr func(goterr error) bool
|
||||||
statusToUpdate v1.PodStatus
|
statusToUpdate v1.PodStatus
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Should update pod conditions successfully",
|
name: "Should update pod conditions successfully",
|
||||||
|
client: clientsetfake.NewSimpleClientset(),
|
||||||
pod: v1.Pod{
|
pod: v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Namespace: "ns",
|
Namespace: "ns",
|
||||||
@ -209,11 +217,12 @@ func TestPatchPodStatus(t *testing.T) {
|
|||||||
// ref: #101697, #94626 - ImagePullSecrets are allowed to have empty secret names
|
// ref: #101697, #94626 - ImagePullSecrets are allowed to have empty secret names
|
||||||
// which would fail the 2-way merge patch generation on Pod patches
|
// which would fail the 2-way merge patch generation on Pod patches
|
||||||
// due to the mergeKey being the name field
|
// due to the mergeKey being the name field
|
||||||
name: "Should update pod conditions successfully on a pod Spec with secrets with empty name",
|
name: "Should update pod conditions successfully on a pod Spec with secrets with empty name",
|
||||||
|
client: clientsetfake.NewSimpleClientset(),
|
||||||
pod: v1.Pod{
|
pod: v1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Namespace: "ns",
|
Namespace: "ns",
|
||||||
Name: "pod2",
|
Name: "pod1",
|
||||||
},
|
},
|
||||||
Spec: v1.PodSpec{
|
Spec: v1.PodSpec{
|
||||||
// this will serialize to imagePullSecrets:[{}]
|
// this will serialize to imagePullSecrets:[{}]
|
||||||
@ -229,11 +238,90 @@ func TestPatchPodStatus(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "retry patch request when an 'connection refused' error is returned",
|
||||||
|
client: func() *clientsetfake.Clientset {
|
||||||
|
client := clientsetfake.NewSimpleClientset()
|
||||||
|
|
||||||
|
reqcount := 0
|
||||||
|
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||||
|
defer func() { reqcount++ }()
|
||||||
|
if reqcount == 0 {
|
||||||
|
// return an connection refused error for the first patch request.
|
||||||
|
return true, &v1.Pod{}, fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED)
|
||||||
|
}
|
||||||
|
if reqcount == 1 {
|
||||||
|
// not return error for the second patch request.
|
||||||
|
return false, &v1.Pod{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// return error if requests comes in more than three times.
|
||||||
|
return true, nil, errors.New("requests comes in more than three times.")
|
||||||
|
})
|
||||||
|
|
||||||
|
return client
|
||||||
|
}(),
|
||||||
|
pod: v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "ns",
|
||||||
|
Name: "pod1",
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
ImagePullSecrets: []v1.LocalObjectReference{{Name: "foo"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
statusToUpdate: v1.PodStatus{
|
||||||
|
Conditions: []v1.PodCondition{
|
||||||
|
{
|
||||||
|
Type: v1.PodScheduled,
|
||||||
|
Status: v1.ConditionFalse,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "only 4 retries at most",
|
||||||
|
client: func() *clientsetfake.Clientset {
|
||||||
|
client := clientsetfake.NewSimpleClientset()
|
||||||
|
|
||||||
|
reqcount := 0
|
||||||
|
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||||
|
defer func() { reqcount++ }()
|
||||||
|
if reqcount >= 4 {
|
||||||
|
// return error if requests comes in more than four times.
|
||||||
|
return true, nil, errors.New("requests comes in more than four times.")
|
||||||
|
}
|
||||||
|
|
||||||
|
// return an connection refused error for the first patch request.
|
||||||
|
return true, &v1.Pod{}, fmt.Errorf("connection refused: %w", syscall.ECONNREFUSED)
|
||||||
|
})
|
||||||
|
|
||||||
|
return client
|
||||||
|
}(),
|
||||||
|
pod: v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "ns",
|
||||||
|
Name: "pod1",
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
ImagePullSecrets: []v1.LocalObjectReference{{Name: "foo"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
validateErr: net.IsConnectionRefused,
|
||||||
|
statusToUpdate: v1.PodStatus{
|
||||||
|
Conditions: []v1.PodCondition{
|
||||||
|
{
|
||||||
|
Type: v1.PodScheduled,
|
||||||
|
Status: v1.ConditionFalse,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
client := clientsetfake.NewSimpleClientset()
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
client := tc.client
|
||||||
_, err := client.CoreV1().Pods(tc.pod.Namespace).Create(context.TODO(), &tc.pod, metav1.CreateOptions{})
|
_, err := client.CoreV1().Pods(tc.pod.Namespace).Create(context.TODO(), &tc.pod, metav1.CreateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -242,9 +330,16 @@ func TestPatchPodStatus(t *testing.T) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
err = PatchPodStatus(ctx, client, &tc.pod, &tc.statusToUpdate)
|
err = PatchPodStatus(ctx, client, &tc.pod, &tc.statusToUpdate)
|
||||||
if err != nil {
|
if err != nil && tc.validateErr == nil {
|
||||||
|
// shouldn't be error
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
if tc.validateErr != nil {
|
||||||
|
if !tc.validateErr(err) {
|
||||||
|
t.Fatalf("Returned unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{})
|
retrievedPod, err := client.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user