mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Merge pull request #79341 from krzysied/kubelet_lease_fix
Handling OptimisticLockError in kubelet node lease controller
This commit is contained in:
commit
f20876908f
@ -27,11 +27,15 @@ go_test(
|
|||||||
"//staging/src/k8s.io/api/coordination/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/coordination/v1beta1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||||
"//vendor/k8s.io/utils/pointer:go_default_library",
|
"//vendor/k8s.io/utils/pointer:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package nodelease
|
package nodelease
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
coordv1beta1 "k8s.io/api/coordination/v1beta1"
|
coordv1beta1 "k8s.io/api/coordination/v1beta1"
|
||||||
@ -91,7 +92,9 @@ func (c *controller) sync() {
|
|||||||
lease, created := c.backoffEnsureLease()
|
lease, created := c.backoffEnsureLease()
|
||||||
// we don't need to update the lease if we just created it
|
// we don't need to update the lease if we just created it
|
||||||
if !created {
|
if !created {
|
||||||
c.retryUpdateLease(lease)
|
if err := c.retryUpdateLease(lease); err != nil {
|
||||||
|
klog.Errorf("%v, will retry after %v", err, c.renewInterval)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,18 +143,23 @@ func (c *controller) ensureLease() (*coordv1beta1.Lease, bool, error) {
|
|||||||
|
|
||||||
// retryUpdateLease attempts to update the lease for maxUpdateRetries,
|
// retryUpdateLease attempts to update the lease for maxUpdateRetries,
|
||||||
// call this once you're sure the lease has been created
|
// call this once you're sure the lease has been created
|
||||||
func (c *controller) retryUpdateLease(base *coordv1beta1.Lease) {
|
func (c *controller) retryUpdateLease(base *coordv1beta1.Lease) error {
|
||||||
for i := 0; i < maxUpdateRetries; i++ {
|
for i := 0; i < maxUpdateRetries; i++ {
|
||||||
_, err := c.leaseClient.Update(c.newLease(base))
|
_, err := c.leaseClient.Update(c.newLease(base))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
klog.Errorf("failed to update node lease, error: %v", err)
|
klog.Errorf("failed to update node lease, error: %v", err)
|
||||||
|
// OptimisticLockError requires getting the newer version of lease to proceed.
|
||||||
|
if apierrors.IsConflict(err) {
|
||||||
|
base, _ = c.backoffEnsureLease()
|
||||||
|
continue
|
||||||
|
}
|
||||||
if i > 0 && c.onRepeatedHeartbeatFailure != nil {
|
if i > 0 && c.onRepeatedHeartbeatFailure != nil {
|
||||||
c.onRepeatedHeartbeatFailure()
|
c.onRepeatedHeartbeatFailure()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
klog.Errorf("failed %d attempts to update node lease, will retry after %v", maxUpdateRetries, c.renewInterval)
|
return fmt.Errorf("failed %d attempts to update node lease", maxUpdateRetries)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLease constructs a new lease if base is nil, or returns a copy of base
|
// newLease constructs a new lease if base is nil, or returns a copy of base
|
||||||
|
@ -17,17 +17,22 @@ limitations under the License.
|
|||||||
package nodelease
|
package nodelease
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
coordv1beta1 "k8s.io/api/coordination/v1beta1"
|
coordv1beta1 "k8s.io/api/coordination/v1beta1"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/diff"
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
clienttesting "k8s.io/client-go/testing"
|
||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -195,3 +200,85 @@ func TestNewLease(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRetryUpdateLease(t *testing.T) {
|
||||||
|
node := &corev1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
UID: types.UID("foo-uid"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
gr := schema.GroupResource{Group: "v1", Resource: "lease"}
|
||||||
|
noConnectionUpdateErr := apierrors.NewServerTimeout(gr, "put", 1)
|
||||||
|
optimistcLockUpdateErr := apierrors.NewConflict(gr, "lease", fmt.Errorf("conflict"))
|
||||||
|
cases := []struct {
|
||||||
|
desc string
|
||||||
|
updateReactor func(action clienttesting.Action) (bool, runtime.Object, error)
|
||||||
|
getReactor func(action clienttesting.Action) (bool, runtime.Object, error)
|
||||||
|
onRepeatedHeartbeatFailure func()
|
||||||
|
expectErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "no errors",
|
||||||
|
updateReactor: func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||||
|
return true, &coordv1beta1.Lease{}, nil
|
||||||
|
},
|
||||||
|
getReactor: nil,
|
||||||
|
onRepeatedHeartbeatFailure: nil,
|
||||||
|
expectErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "connection errors",
|
||||||
|
updateReactor: func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||||
|
return true, nil, noConnectionUpdateErr
|
||||||
|
},
|
||||||
|
getReactor: nil,
|
||||||
|
onRepeatedHeartbeatFailure: nil,
|
||||||
|
expectErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "optimistic lock errors",
|
||||||
|
updateReactor: func() func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||||
|
i := 0
|
||||||
|
return func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||||
|
i++
|
||||||
|
switch i {
|
||||||
|
case 1:
|
||||||
|
return true, nil, noConnectionUpdateErr
|
||||||
|
case 2:
|
||||||
|
return true, nil, optimistcLockUpdateErr
|
||||||
|
default:
|
||||||
|
return true, &coordv1beta1.Lease{}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(),
|
||||||
|
getReactor: func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||||
|
return true, &coordv1beta1.Lease{}, nil
|
||||||
|
},
|
||||||
|
onRepeatedHeartbeatFailure: func() { t.Fatalf("onRepeatedHeartbeatFailure called") },
|
||||||
|
expectErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
cl := fake.NewSimpleClientset(node)
|
||||||
|
if tc.updateReactor != nil {
|
||||||
|
cl.PrependReactor("update", "leases", tc.updateReactor)
|
||||||
|
}
|
||||||
|
if tc.getReactor != nil {
|
||||||
|
cl.PrependReactor("get", "leases", tc.getReactor)
|
||||||
|
}
|
||||||
|
c := &controller{
|
||||||
|
clock: clock.NewFakeClock(time.Now()),
|
||||||
|
client: cl,
|
||||||
|
leaseClient: cl.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease),
|
||||||
|
holderIdentity: node.Name,
|
||||||
|
leaseDurationSeconds: 10,
|
||||||
|
onRepeatedHeartbeatFailure: tc.onRepeatedHeartbeatFailure,
|
||||||
|
}
|
||||||
|
if err := c.retryUpdateLease(nil); tc.expectErr != (err != nil) {
|
||||||
|
t.Fatalf("got %v, expected %v", err != nil, tc.expectErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user