diff --git a/pkg/kubelet/nodelease/BUILD b/pkg/kubelet/nodelease/BUILD index 579a2ddbf60..d453767482b 100644 --- a/pkg/kubelet/nodelease/BUILD +++ b/pkg/kubelet/nodelease/BUILD @@ -27,11 +27,15 @@ go_test( "//staging/src/k8s.io/api/coordination/v1beta1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ], ) diff --git a/pkg/kubelet/nodelease/controller.go b/pkg/kubelet/nodelease/controller.go index c614368517b..c9e9cf37080 100644 --- a/pkg/kubelet/nodelease/controller.go +++ b/pkg/kubelet/nodelease/controller.go @@ -17,6 +17,7 @@ limitations under the License. package nodelease import ( + "fmt" "time" coordv1beta1 "k8s.io/api/coordination/v1beta1" @@ -91,7 +92,9 @@ func (c *controller) sync() { lease, created := c.backoffEnsureLease() // we don't need to update the lease if we just created it if !created { - c.retryUpdateLease(lease) + if err := c.retryUpdateLease(lease); err != nil { + klog.Errorf("%v, will retry after %v", err, c.renewInterval) + } } } @@ -140,18 +143,23 @@ func (c *controller) ensureLease() (*coordv1beta1.Lease, bool, error) { // retryUpdateLease attempts to update the lease for maxUpdateRetries, // call this once you're sure the lease has been created -func (c *controller) retryUpdateLease(base *coordv1beta1.Lease) { +func (c *controller) retryUpdateLease(base *coordv1beta1.Lease) error { for i := 0; i < maxUpdateRetries; i++ { _, err := c.leaseClient.Update(c.newLease(base)) if err == nil { - return + return nil } klog.Errorf("failed to update node lease, error: %v", err) + // OptimisticLockError requires getting the newer version of lease to proceed. + if apierrors.IsConflict(err) { + base, _ = c.backoffEnsureLease() + continue + } if i > 0 && c.onRepeatedHeartbeatFailure != nil { c.onRepeatedHeartbeatFailure() } } - klog.Errorf("failed %d attempts to update node lease, will retry after %v", maxUpdateRetries, c.renewInterval) + return fmt.Errorf("failed %d attempts to update node lease", maxUpdateRetries) } // newLease constructs a new lease if base is nil, or returns a copy of base diff --git a/pkg/kubelet/nodelease/controller_test.go b/pkg/kubelet/nodelease/controller_test.go index 651fdae781d..cbd362a31ee 100644 --- a/pkg/kubelet/nodelease/controller_test.go +++ b/pkg/kubelet/nodelease/controller_test.go @@ -17,17 +17,22 @@ limitations under the License. package nodelease import ( + "fmt" "testing" "time" coordv1beta1 "k8s.io/api/coordination/v1beta1" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" "k8s.io/utils/pointer" ) @@ -195,3 +200,85 @@ func TestNewLease(t *testing.T) { }) } } + +func TestRetryUpdateLease(t *testing.T) { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + UID: types.UID("foo-uid"), + }, + } + gr := schema.GroupResource{Group: "v1", Resource: "lease"} + noConnectionUpdateErr := apierrors.NewServerTimeout(gr, "put", 1) + optimistcLockUpdateErr := apierrors.NewConflict(gr, "lease", fmt.Errorf("conflict")) + cases := []struct { + desc string + updateReactor func(action clienttesting.Action) (bool, runtime.Object, error) + getReactor func(action clienttesting.Action) (bool, runtime.Object, error) + onRepeatedHeartbeatFailure func() + expectErr bool + }{ + { + desc: "no errors", + updateReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, &coordv1beta1.Lease{}, nil + }, + getReactor: nil, + onRepeatedHeartbeatFailure: nil, + expectErr: false, + }, + { + desc: "connection errors", + updateReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, nil, noConnectionUpdateErr + }, + getReactor: nil, + onRepeatedHeartbeatFailure: nil, + expectErr: true, + }, + { + desc: "optimistic lock errors", + updateReactor: func() func(action clienttesting.Action) (bool, runtime.Object, error) { + i := 0 + return func(action clienttesting.Action) (bool, runtime.Object, error) { + i++ + switch i { + case 1: + return true, nil, noConnectionUpdateErr + case 2: + return true, nil, optimistcLockUpdateErr + default: + return true, &coordv1beta1.Lease{}, nil + } + } + }(), + getReactor: func(action clienttesting.Action) (bool, runtime.Object, error) { + return true, &coordv1beta1.Lease{}, nil + }, + onRepeatedHeartbeatFailure: func() { t.Fatalf("onRepeatedHeartbeatFailure called") }, + expectErr: false, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + cl := fake.NewSimpleClientset(node) + if tc.updateReactor != nil { + cl.PrependReactor("update", "leases", tc.updateReactor) + } + if tc.getReactor != nil { + cl.PrependReactor("get", "leases", tc.getReactor) + } + c := &controller{ + clock: clock.NewFakeClock(time.Now()), + client: cl, + leaseClient: cl.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease), + holderIdentity: node.Name, + leaseDurationSeconds: 10, + onRepeatedHeartbeatFailure: tc.onRepeatedHeartbeatFailure, + } + if err := c.retryUpdateLease(nil); tc.expectErr != (err != nil) { + t.Fatalf("got %v, expected %v", err != nil, tc.expectErr) + } + }) + } +}