diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index b3c87c07bdc..3e906d0fa1e 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -17,7 +17,6 @@ limitations under the License. package controlplane import ( - "context" "fmt" "net" "net/http" @@ -64,6 +63,7 @@ import ( utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/endpoints/discovery" apiserverfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/generic" @@ -453,14 +453,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver // TODO: See if we can pass ctx to the current method - ctx, cancel := context.WithCancel(context.Background()) - go func() { - select { - case <-hookContext.StopCh: - cancel() // stopCh closed, so cancel our context - case <-ctx.Done(): - } - }() + ctx := wait.ContextForChannel(hookContext.StopCh) // prime values and start listeners if m.ClusterAuthenticationInfo.ClientCA != nil { @@ -495,6 +488,10 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return err } + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx := wait.ContextForChannel(hookContext.StopCh) + leaseName := m.GenericAPIServer.APIServerID holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID()) @@ -509,7 +506,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) metav1.NamespaceSystem, // TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver. labelAPIServerHeartbeatFunc(KubeAPIServer)) - go controller.Run(hookContext.StopCh) + go controller.Run(ctx) return nil }) // Labels for apiserver idenitiy leases switched from k8s.io/component=kube-apiserver to apiserver.kubernetes.io/identity=kube-apiserver. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c9ac91b9f2a..fecdcc6a45b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1534,7 +1534,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { go kl.fastStatusUpdateOnce() // start syncing lease - go kl.nodeLeaseController.Run(wait.NeverStop) + go kl.nodeLeaseController.Run(context.Background()) } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) diff --git a/staging/src/k8s.io/component-helpers/apimachinery/lease/controller.go b/staging/src/k8s.io/component-helpers/apimachinery/lease/controller.go index 203285e7442..843eb36133c 100644 --- a/staging/src/k8s.io/component-helpers/apimachinery/lease/controller.go +++ b/staging/src/k8s.io/component-helpers/apimachinery/lease/controller.go @@ -44,7 +44,7 @@ const ( // Controller manages creating and renewing the lease for this component (kube-apiserver, kubelet, etc.) type Controller interface { - Run(stopCh <-chan struct{}) + Run(ctx context.Context) } // ProcessLeaseFunc processes the given lease in-place @@ -92,15 +92,15 @@ func NewController(clock clock.Clock, client clientset.Interface, holderIdentity } // Run runs the controller -func (c *controller) Run(stopCh <-chan struct{}) { +func (c *controller) Run(ctx context.Context) { if c.leaseClient == nil { - klog.Infof("lease controller has nil lease client, will not claim or renew leases") + klog.FromContext(ctx).Info("lease controller has nil lease client, will not claim or renew leases") return } - wait.JitterUntil(c.sync, c.renewInterval, 0.04, true, stopCh) + wait.JitterUntilWithContext(ctx, c.sync, c.renewInterval, 0.04, true) } -func (c *controller) sync() { +func (c *controller) sync(ctx context.Context) { if c.latestLease != nil { // As long as the lease is not (or very rarely) updated by any other agent than the component itself, // we can optimistically assume it didn't change since our last update and try updating @@ -109,19 +109,19 @@ func (c *controller) sync() { // If at some point other agents will also be frequently updating the Lease object, this // can result in performance degradation, because we will end up with calling additional // GET/PUT - at this point this whole "if" should be removed. - err := c.retryUpdateLease(c.latestLease) + err := c.retryUpdateLease(ctx, c.latestLease) if err == nil { return } - klog.Infof("failed to update lease using latest lease, fallback to ensure lease, err: %v", err) + klog.FromContext(ctx).Info("failed to update lease using latest lease, fallback to ensure lease", "err", err) } - lease, created := c.backoffEnsureLease() + lease, created := c.backoffEnsureLease(ctx) c.latestLease = lease // we don't need to update the lease if we just created it if !created && lease != nil { - if err := c.retryUpdateLease(lease); err != nil { - klog.Errorf("%v, will retry after %v", err, c.renewInterval) + if err := c.retryUpdateLease(ctx, lease); err != nil { + klog.FromContext(ctx).Error(err, "Will retry updating lease", "interval", c.renewInterval) } } } @@ -130,7 +130,7 @@ func (c *controller) sync() { // and uses exponentially increasing waits to prevent overloading the API server // with retries. Returns the lease, and true if this call created the lease, // false otherwise. -func (c *controller) backoffEnsureLease() (*coordinationv1.Lease, bool) { +func (c *controller) backoffEnsureLease(ctx context.Context) (*coordinationv1.Lease, bool) { var ( lease *coordinationv1.Lease created bool @@ -138,22 +138,26 @@ func (c *controller) backoffEnsureLease() (*coordinationv1.Lease, bool) { ) sleep := 100 * time.Millisecond for { - lease, created, err = c.ensureLease() + lease, created, err = c.ensureLease(ctx) if err == nil { break } sleep = minDuration(2*sleep, maxBackoff) - klog.Errorf("failed to ensure lease exists, will retry in %v, error: %v", sleep, err) - // backoff wait - c.clock.Sleep(sleep) + klog.FromContext(ctx).Error(err, "Failed to ensure lease exists, will retry", "interval", sleep) + // backoff wait with early return if the context gets canceled + select { + case <-ctx.Done(): + return nil, false + case <-time.After(sleep): + } } return lease, created } // ensureLease creates the lease if it does not exist. Returns the lease and // a bool (true if this call created the lease), or any error that occurs. -func (c *controller) ensureLease() (*coordinationv1.Lease, bool, error) { - lease, err := c.leaseClient.Get(context.TODO(), c.leaseName, metav1.GetOptions{}) +func (c *controller) ensureLease(ctx context.Context) (*coordinationv1.Lease, bool, error) { + lease, err := c.leaseClient.Get(ctx, c.leaseName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { // lease does not exist, create it. leaseToCreate, err := c.newLease(nil) @@ -163,7 +167,7 @@ func (c *controller) ensureLease() (*coordinationv1.Lease, bool, error) { if err != nil { return nil, false, nil } - lease, err := c.leaseClient.Create(context.TODO(), leaseToCreate, metav1.CreateOptions{}) + lease, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}) if err != nil { return nil, false, err } @@ -178,18 +182,18 @@ func (c *controller) ensureLease() (*coordinationv1.Lease, bool, error) { // retryUpdateLease attempts to update the lease for maxUpdateRetries, // call this once you're sure the lease has been created -func (c *controller) retryUpdateLease(base *coordinationv1.Lease) error { +func (c *controller) retryUpdateLease(ctx context.Context, base *coordinationv1.Lease) error { for i := 0; i < maxUpdateRetries; i++ { leaseToUpdate, _ := c.newLease(base) - lease, err := c.leaseClient.Update(context.TODO(), leaseToUpdate, metav1.UpdateOptions{}) + lease, err := c.leaseClient.Update(ctx, leaseToUpdate, metav1.UpdateOptions{}) if err == nil { c.latestLease = lease return nil } - klog.Errorf("failed to update lease, error: %v", err) + klog.FromContext(ctx).Error(err, "Failed to update lease") // OptimisticLockError requires getting the newer version of lease to proceed. if apierrors.IsConflict(err) { - base, _ = c.backoffEnsureLease() + base, _ = c.backoffEnsureLease(ctx) continue } if i > 0 && c.onRepeatedHeartbeatFailure != nil { diff --git a/staging/src/k8s.io/component-helpers/apimachinery/lease/controller_test.go b/staging/src/k8s.io/component-helpers/apimachinery/lease/controller_test.go index 4d9cc01f49b..0efcd6bbd15 100644 --- a/staging/src/k8s.io/component-helpers/apimachinery/lease/controller_test.go +++ b/staging/src/k8s.io/component-helpers/apimachinery/lease/controller_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/utils/pointer" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" ) func TestNewNodeLease(t *testing.T) { @@ -270,6 +271,7 @@ func TestRetryUpdateNodeLease(t *testing.T) { } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) cl := fake.NewSimpleClientset(node) if tc.updateReactor != nil { cl.PrependReactor("update", "leases", tc.updateReactor) @@ -287,7 +289,7 @@ func TestRetryUpdateNodeLease(t *testing.T) { onRepeatedHeartbeatFailure: tc.onRepeatedHeartbeatFailure, newLeasePostProcessFunc: setNodeOwnerFunc(cl, node.Name), } - if err := c.retryUpdateLease(nil); tc.expectErr != (err != nil) { + if err := c.retryUpdateLease(ctx, nil); tc.expectErr != (err != nil) { t.Fatalf("got %v, expected %v", err != nil, tc.expectErr) } }) @@ -405,6 +407,7 @@ func TestUpdateUsingLatestLease(t *testing.T) { } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) cl := fake.NewSimpleClientset(tc.existingObjs...) if tc.updateReactor != nil { cl.PrependReactor("update", "leases", tc.updateReactor) @@ -426,7 +429,7 @@ func TestUpdateUsingLatestLease(t *testing.T) { newLeasePostProcessFunc: setNodeOwnerFunc(cl, node.Name), } - c.sync() + c.sync(ctx) if tc.expectLatestLease { if tc.expectLeaseResourceVersion != c.latestLease.ResourceVersion {