update lease controller

Passing in a context instead of a stop channel has several advantages:
- ensures that client-go calls return as soon as the controller is asked to stop
- contextual logging can be used

By passing that context down to its own functions and checking it while
waiting, the lease controller also doesn't get stuck in backoffEnsureLease
anymore (https://github.com/kubernetes/kubernetes/issues/116196).
This commit is contained in:
Patrick Ohly 2023-03-02 15:06:00 +01:00
parent af9f7a4d90
commit dad95e1be6
4 changed files with 39 additions and 35 deletions

View File

@ -17,7 +17,6 @@ limitations under the License.
package controlplane package controlplane
import ( import (
"context"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
@ -64,6 +63,7 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery" "k8s.io/apiserver/pkg/endpoints/discovery"
apiserverfeatures "k8s.io/apiserver/pkg/features" apiserverfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
@ -453,14 +453,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method // TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background()) ctx := wait.ContextForChannel(hookContext.StopCh)
go func() {
select {
case <-hookContext.StopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
// prime values and start listeners // prime values and start listeners
if m.ClusterAuthenticationInfo.ClientCA != nil { if m.ClusterAuthenticationInfo.ClientCA != nil {
@ -495,6 +488,10 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return err return err
} }
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx := wait.ContextForChannel(hookContext.StopCh)
leaseName := m.GenericAPIServer.APIServerID leaseName := m.GenericAPIServer.APIServerID
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID()) holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
@ -509,7 +506,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
metav1.NamespaceSystem, metav1.NamespaceSystem,
// TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver. // TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver.
labelAPIServerHeartbeatFunc(KubeAPIServer)) labelAPIServerHeartbeatFunc(KubeAPIServer))
go controller.Run(hookContext.StopCh) go controller.Run(ctx)
return nil return nil
}) })
// Labels for apiserver idenitiy leases switched from k8s.io/component=kube-apiserver to apiserver.kubernetes.io/identity=kube-apiserver. // Labels for apiserver idenitiy leases switched from k8s.io/component=kube-apiserver to apiserver.kubernetes.io/identity=kube-apiserver.

View File

@ -1534,7 +1534,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
go kl.fastStatusUpdateOnce() go kl.fastStatusUpdateOnce()
// start syncing lease // start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop) go kl.nodeLeaseController.Run(context.Background())
} }
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

View File

@ -44,7 +44,7 @@ const (
// Controller manages creating and renewing the lease for this component (kube-apiserver, kubelet, etc.) // Controller manages creating and renewing the lease for this component (kube-apiserver, kubelet, etc.)
type Controller interface { type Controller interface {
Run(stopCh <-chan struct{}) Run(ctx context.Context)
} }
// ProcessLeaseFunc processes the given lease in-place // ProcessLeaseFunc processes the given lease in-place
@ -92,15 +92,15 @@ func NewController(clock clock.Clock, client clientset.Interface, holderIdentity
} }
// Run runs the controller // Run runs the controller
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(ctx context.Context) {
if c.leaseClient == nil { if c.leaseClient == nil {
klog.Infof("lease controller has nil lease client, will not claim or renew leases") klog.FromContext(ctx).Info("lease controller has nil lease client, will not claim or renew leases")
return return
} }
wait.JitterUntil(c.sync, c.renewInterval, 0.04, true, stopCh) wait.JitterUntilWithContext(ctx, c.sync, c.renewInterval, 0.04, true)
} }
func (c *controller) sync() { func (c *controller) sync(ctx context.Context) {
if c.latestLease != nil { if c.latestLease != nil {
// As long as the lease is not (or very rarely) updated by any other agent than the component itself, // As long as the lease is not (or very rarely) updated by any other agent than the component itself,
// we can optimistically assume it didn't change since our last update and try updating // we can optimistically assume it didn't change since our last update and try updating
@ -109,19 +109,19 @@ func (c *controller) sync() {
// If at some point other agents will also be frequently updating the Lease object, this // If at some point other agents will also be frequently updating the Lease object, this
// can result in performance degradation, because we will end up with calling additional // can result in performance degradation, because we will end up with calling additional
// GET/PUT - at this point this whole "if" should be removed. // GET/PUT - at this point this whole "if" should be removed.
err := c.retryUpdateLease(c.latestLease) err := c.retryUpdateLease(ctx, c.latestLease)
if err == nil { if err == nil {
return return
} }
klog.Infof("failed to update lease using latest lease, fallback to ensure lease, err: %v", err) klog.FromContext(ctx).Info("failed to update lease using latest lease, fallback to ensure lease", "err", err)
} }
lease, created := c.backoffEnsureLease() lease, created := c.backoffEnsureLease(ctx)
c.latestLease = lease c.latestLease = lease
// we don't need to update the lease if we just created it // we don't need to update the lease if we just created it
if !created && lease != nil { if !created && lease != nil {
if err := c.retryUpdateLease(lease); err != nil { if err := c.retryUpdateLease(ctx, lease); err != nil {
klog.Errorf("%v, will retry after %v", err, c.renewInterval) klog.FromContext(ctx).Error(err, "Will retry updating lease", "interval", c.renewInterval)
} }
} }
} }
@ -130,7 +130,7 @@ func (c *controller) sync() {
// and uses exponentially increasing waits to prevent overloading the API server // and uses exponentially increasing waits to prevent overloading the API server
// with retries. Returns the lease, and true if this call created the lease, // with retries. Returns the lease, and true if this call created the lease,
// false otherwise. // false otherwise.
func (c *controller) backoffEnsureLease() (*coordinationv1.Lease, bool) { func (c *controller) backoffEnsureLease(ctx context.Context) (*coordinationv1.Lease, bool) {
var ( var (
lease *coordinationv1.Lease lease *coordinationv1.Lease
created bool created bool
@ -138,22 +138,26 @@ func (c *controller) backoffEnsureLease() (*coordinationv1.Lease, bool) {
) )
sleep := 100 * time.Millisecond sleep := 100 * time.Millisecond
for { for {
lease, created, err = c.ensureLease() lease, created, err = c.ensureLease(ctx)
if err == nil { if err == nil {
break break
} }
sleep = minDuration(2*sleep, maxBackoff) sleep = minDuration(2*sleep, maxBackoff)
klog.Errorf("failed to ensure lease exists, will retry in %v, error: %v", sleep, err) klog.FromContext(ctx).Error(err, "Failed to ensure lease exists, will retry", "interval", sleep)
// backoff wait // backoff wait with early return if the context gets canceled
c.clock.Sleep(sleep) select {
case <-ctx.Done():
return nil, false
case <-time.After(sleep):
}
} }
return lease, created return lease, created
} }
// ensureLease creates the lease if it does not exist. Returns the lease and // ensureLease creates the lease if it does not exist. Returns the lease and
// a bool (true if this call created the lease), or any error that occurs. // a bool (true if this call created the lease), or any error that occurs.
func (c *controller) ensureLease() (*coordinationv1.Lease, bool, error) { func (c *controller) ensureLease(ctx context.Context) (*coordinationv1.Lease, bool, error) {
lease, err := c.leaseClient.Get(context.TODO(), c.leaseName, metav1.GetOptions{}) lease, err := c.leaseClient.Get(ctx, c.leaseName, metav1.GetOptions{})
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
// lease does not exist, create it. // lease does not exist, create it.
leaseToCreate, err := c.newLease(nil) leaseToCreate, err := c.newLease(nil)
@ -163,7 +167,7 @@ func (c *controller) ensureLease() (*coordinationv1.Lease, bool, error) {
if err != nil { if err != nil {
return nil, false, nil return nil, false, nil
} }
lease, err := c.leaseClient.Create(context.TODO(), leaseToCreate, metav1.CreateOptions{}) lease, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{})
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -178,18 +182,18 @@ func (c *controller) ensureLease() (*coordinationv1.Lease, bool, error) {
// retryUpdateLease attempts to update the lease for maxUpdateRetries, // retryUpdateLease attempts to update the lease for maxUpdateRetries,
// call this once you're sure the lease has been created // call this once you're sure the lease has been created
func (c *controller) retryUpdateLease(base *coordinationv1.Lease) error { func (c *controller) retryUpdateLease(ctx context.Context, base *coordinationv1.Lease) error {
for i := 0; i < maxUpdateRetries; i++ { for i := 0; i < maxUpdateRetries; i++ {
leaseToUpdate, _ := c.newLease(base) leaseToUpdate, _ := c.newLease(base)
lease, err := c.leaseClient.Update(context.TODO(), leaseToUpdate, metav1.UpdateOptions{}) lease, err := c.leaseClient.Update(ctx, leaseToUpdate, metav1.UpdateOptions{})
if err == nil { if err == nil {
c.latestLease = lease c.latestLease = lease
return nil return nil
} }
klog.Errorf("failed to update lease, error: %v", err) klog.FromContext(ctx).Error(err, "Failed to update lease")
// OptimisticLockError requires getting the newer version of lease to proceed. // OptimisticLockError requires getting the newer version of lease to proceed.
if apierrors.IsConflict(err) { if apierrors.IsConflict(err) {
base, _ = c.backoffEnsureLease() base, _ = c.backoffEnsureLease(ctx)
continue continue
} }
if i > 0 && c.onRepeatedHeartbeatFailure != nil { if i > 0 && c.onRepeatedHeartbeatFailure != nil {

View File

@ -39,6 +39,7 @@ import (
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
) )
func TestNewNodeLease(t *testing.T) { func TestNewNodeLease(t *testing.T) {
@ -270,6 +271,7 @@ func TestRetryUpdateNodeLease(t *testing.T) {
} }
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
cl := fake.NewSimpleClientset(node) cl := fake.NewSimpleClientset(node)
if tc.updateReactor != nil { if tc.updateReactor != nil {
cl.PrependReactor("update", "leases", tc.updateReactor) cl.PrependReactor("update", "leases", tc.updateReactor)
@ -287,7 +289,7 @@ func TestRetryUpdateNodeLease(t *testing.T) {
onRepeatedHeartbeatFailure: tc.onRepeatedHeartbeatFailure, onRepeatedHeartbeatFailure: tc.onRepeatedHeartbeatFailure,
newLeasePostProcessFunc: setNodeOwnerFunc(cl, node.Name), newLeasePostProcessFunc: setNodeOwnerFunc(cl, node.Name),
} }
if err := c.retryUpdateLease(nil); tc.expectErr != (err != nil) { if err := c.retryUpdateLease(ctx, nil); tc.expectErr != (err != nil) {
t.Fatalf("got %v, expected %v", err != nil, tc.expectErr) t.Fatalf("got %v, expected %v", err != nil, tc.expectErr)
} }
}) })
@ -405,6 +407,7 @@ func TestUpdateUsingLatestLease(t *testing.T) {
} }
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
cl := fake.NewSimpleClientset(tc.existingObjs...) cl := fake.NewSimpleClientset(tc.existingObjs...)
if tc.updateReactor != nil { if tc.updateReactor != nil {
cl.PrependReactor("update", "leases", tc.updateReactor) cl.PrependReactor("update", "leases", tc.updateReactor)
@ -426,7 +429,7 @@ func TestUpdateUsingLatestLease(t *testing.T) {
newLeasePostProcessFunc: setNodeOwnerFunc(cl, node.Name), newLeasePostProcessFunc: setNodeOwnerFunc(cl, node.Name),
} }
c.sync() c.sync(ctx)
if tc.expectLatestLease { if tc.expectLatestLease {
if tc.expectLeaseResourceVersion != c.latestLease.ResourceVersion { if tc.expectLeaseResourceVersion != c.latestLease.ResourceVersion {