From fac758164029e278e9bda924090ed078bb6514c8 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Tue, 23 Jul 2024 14:28:08 +0000 Subject: [PATCH] feedback: leasecandidate clients --- .../app/controllermanager.go | 12 ++-- cmd/kube-scheduler/app/server.go | 2 +- .../controller/leaderelection/election.go | 2 +- .../leaderelection_controller.go | 69 +++++++++---------- .../leaderelection_controller_test.go | 26 +------ .../tools/leaderelection/leaderelection.go | 8 ++- .../tools/leaderelection/leasecandidate.go | 23 ++++--- .../leaderelection/leasecandidate_test.go | 2 +- .../coordinated_leader_election_test.go | 2 +- 9 files changed, 64 insertions(+), 82 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index cbfa4baf82b..39bdcdc805d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -293,7 +293,11 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { } if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) { - ver, err := semver.ParseTolerant(version.Get().String()) + binaryVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).BinaryVersion().String()) + if err != nil { + return err + } + emulationVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).EmulationVersion().String()) if err != nil { return err } @@ -301,11 +305,11 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { // Start lease candidate controller for coordinated leader election leaseCandidate, waitForSync, err := leaderelection.NewCandidate( c.Client, - id, "kube-system", + id, "kube-controller-manager", - ver.FinalizeVersion(), - ver.FinalizeVersion(), // TODO(Jefftree): Use compatibility version when it's available + binaryVersion.FinalizeVersion(), + emulationVersion.FinalizeVersion(), []coordinationv1.CoordinatedLeaseStrategy{coordinationv1.OldestEmulationVersion}, ) if err != nil { diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 46989b03e1b..7b08b119b4b 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -223,8 +223,8 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * // Start lease candidate controller for coordinated leader election leaseCandidate, waitForSync, err := leaderelection.NewCandidate( cc.Client, - cc.LeaderElection.Lock.Identity(), metav1.NamespaceSystem, + cc.LeaderElection.Lock.Identity(), "kube-scheduler", binaryVersion.FinalizeVersion(), emulationVersion.FinalizeVersion(), diff --git a/pkg/controlplane/controller/leaderelection/election.go b/pkg/controlplane/controller/leaderelection/election.go index c81a995b1dd..20e7f3dd6f1 100644 --- a/pkg/controlplane/controller/leaderelection/election.go +++ b/pkg/controlplane/controller/leaderelection/election.go @@ -113,7 +113,7 @@ func pickBestStrategy(candidates []*v1alpha1.LeaseCandidate) (v1.CoordinatedLeas sorted := topologicalSortWithOneRoot(graph) if sorted == nil { - return nilStrategy, fmt.Errorf("Invalid strategy") + return nilStrategy, fmt.Errorf("invalid strategy") } return sorted[0], nil diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go index 3896f94f94a..fc1b3b876f3 100644 --- a/pkg/controlplane/controller/leaderelection/leaderelection_controller.go +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go @@ -40,11 +40,11 @@ import ( ) const ( - controllerName = "leader-election-controller" - ElectedByAnnotationName = "coordination.k8s.io/elected-by" // Value should be set to controllerName + controllerName = "leader-election-controller" // Requeue interval is the interval at which a Lease is requeued to verify that it is being renewed properly. - requeueInterval = 5 * time.Second + defaultRequeueInterval = 5 * time.Second + noRequeue = 0 defaultLeaseDurationSeconds int32 = 5 electionDuration = 5 * time.Second @@ -158,10 +158,10 @@ func (c *Controller) processNextElectionItem(ctx context.Context) bool { return false } - completed, err := c.reconcileElectionStep(ctx, key) + intervalForRequeue, err := c.reconcileElectionStep(ctx, key) utilruntime.HandleError(err) - if completed { - defer c.queue.AddAfter(key, requeueInterval) + if intervalForRequeue != noRequeue { + defer c.queue.AddAfter(key, intervalForRequeue) } c.queue.Done(key) return true @@ -237,22 +237,25 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease // PingTime + electionDuration > time.Now: We just asked all candidates to ack and are still waiting for response // PingTime + electionDuration < time.Now: Candidate has not responded within the appropriate PingTime. Continue the election. // RenewTime + 5 seconds > time.Now: All candidates acked in the last 5 seconds, continue the election. -func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.NamespacedName) (requeue bool, err error) { +func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.NamespacedName) (requeue time.Duration, err error) { now := time.Now() candidates, err := c.listAdmissableCandidates(leaseNN) if err != nil { - return true, err + return defaultRequeueInterval, err } else if len(candidates) == 0 { - return false, nil + return noRequeue, nil } - klog.V(4).Infof("reconcileElectionStep %q %q, candidates: %d", leaseNN.Namespace, leaseNN.Name, len(candidates)) + klog.V(4).Infof("reconcileElectionStep %s, candidates: %d", leaseNN, len(candidates)) // Check if an election is really needed by looking at the current lease // and set of candidates needElection, err := c.electionNeeded(candidates, leaseNN) - if !needElection || err != nil { - return needElection, err + if !needElection { + return noRequeue, err + } + if err != nil { + return defaultRequeueInterval, err } fastTrackElection := false @@ -263,7 +266,7 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na if candidate.Spec.PingTime != nil { if candidate.Spec.PingTime.Add(electionDuration).After(now) { // continue waiting for the election to timeout - return false, nil + return noRequeue, nil } else { // election timed out without ack. Clear and start election. fastTrackElection = true @@ -271,7 +274,7 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na clone.Spec.PingTime = nil _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) if err != nil { - return false, err + return noRequeue, err } break } @@ -294,10 +297,10 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na clone.Spec.PingTime = &metav1.MicroTime{Time: time.Now()} _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) if err != nil { - return false, err + return noRequeue, err } } - return true, nil + return defaultRequeueInterval, nil } } @@ -308,22 +311,22 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na } } if len(ackedCandidates) == 0 { - return false, fmt.Errorf("no available candidates") + return noRequeue, fmt.Errorf("no available candidates") } strategy, err := pickBestStrategy(ackedCandidates) if err != nil { - return false, err + return noRequeue, err } if strategy != v1.OldestEmulationVersion { klog.V(2).Infof("strategy %s is not recognized by CLE.", strategy) - return false, nil + return noRequeue, nil } electee := pickBestLeaderOldestEmulationVersion(ackedCandidates) if electee == nil { - return false, fmt.Errorf("should not happen, could not find suitable electee") + return noRequeue, fmt.Errorf("should not happen, could not find suitable electee") } electeeName := electee.Name @@ -332,9 +335,6 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na ObjectMeta: metav1.ObjectMeta{ Namespace: leaseNN.Namespace, Name: leaseNN.Name, - Annotations: map[string]string{ - ElectedByAnnotationName: controllerName, - }, }, Spec: v1.LeaseSpec{ HolderIdentity: &electeeName, @@ -346,31 +346,27 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na _, err = c.leaseClient.Leases(leaseNN.Namespace).Create(ctx, leaderLease, metav1.CreateOptions{}) // If the create was successful, then we can return here. if err == nil { - klog.Infof("Created lease %q %q for %q", leaseNN.Namespace, leaseNN.Name, electee.Name) - return true, nil + klog.Infof("Created lease %s for %q", leaseNN, electee.Name) + return defaultRequeueInterval, nil } // If there was an error, return if !apierrors.IsAlreadyExists(err) { - return false, err + return noRequeue, err } existingLease, err := c.leaseClient.Leases(leaseNN.Namespace).Get(ctx, leaseNN.Name, metav1.GetOptions{}) if err != nil { - return false, err + return noRequeue, err } leaseClone := existingLease.DeepCopy() // Update the Lease if it either does not have a holder or is expired isExpired := isLeaseExpired(existingLease) if leaseClone.Spec.HolderIdentity == nil || *leaseClone.Spec.HolderIdentity == "" || (isExpired && *leaseClone.Spec.HolderIdentity != electeeName) { - klog.Infof("lease %q %q is expired, resetting it and setting holder to %q", leaseNN.Namespace, leaseNN.Name, electee.Name) + klog.Infof("lease %s is expired, resetting it and setting holder to %q", leaseNN, electee.Name) leaseClone.Spec.Strategy = &strategy leaseClone.Spec.PreferredHolder = nil - if leaseClone.ObjectMeta.Annotations == nil { - leaseClone.ObjectMeta.Annotations = make(map[string]string) - } - leaseClone.ObjectMeta.Annotations[ElectedByAnnotationName] = controllerName leaseClone.Spec.HolderIdentity = &electeeName leaseClone.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} @@ -378,18 +374,19 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na leaseClone.Spec.AcquireTime = nil _, err = c.leaseClient.Leases(leaseNN.Namespace).Update(ctx, leaseClone, metav1.UpdateOptions{}) if err != nil { - return false, err + return time.Until(leaseClone.Spec.RenewTime.Time), err } } else if leaseClone.Spec.HolderIdentity != nil && *leaseClone.Spec.HolderIdentity != electeeName { - klog.Infof("lease %q %q already exists for holder %q but should be held by %q, marking preferredHolder", leaseNN.Namespace, leaseNN.Name, *leaseClone.Spec.HolderIdentity, electee.Name) + klog.Infof("lease %s already exists for holder %q but should be held by %q, marking preferredHolder", leaseNN, *leaseClone.Spec.HolderIdentity, electee.Name) leaseClone.Spec.PreferredHolder = &electeeName leaseClone.Spec.Strategy = &strategy _, err = c.leaseClient.Leases(leaseNN.Namespace).Update(ctx, leaseClone, metav1.UpdateOptions{}) if err != nil { - return false, err + return noRequeue, err } + return time.Until(leaseClone.Spec.RenewTime.Time), nil } - return true, nil + return defaultRequeueInterval, nil } func (c *Controller) listAdmissableCandidates(leaseNN types.NamespacedName) ([]*v1alpha1.LeaseCandidate, error) { diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go index b0a344b3c75..5e3686d4d74 100644 --- a/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go @@ -123,9 +123,6 @@ func TestReconcileElectionStep(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "component-A", - Annotations: map[string]string{ - ElectedByAnnotationName: controllerName, - }, }, Spec: v1.LeaseSpec{ HolderIdentity: ptr.To("component-identity-1"), @@ -197,9 +194,6 @@ func TestReconcileElectionStep(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "component-A", - Annotations: map[string]string{ - ElectedByAnnotationName: controllerName, - }, }, Spec: v1.LeaseSpec{ HolderIdentity: ptr.To("component-identity-expired"), @@ -320,7 +314,7 @@ func TestReconcileElectionStep(t *testing.T) { cache.WaitForCacheSync(ctx.Done(), controller.leaseCandidateInformer.Informer().HasSynced) requeue, err := controller.reconcileElectionStep(ctx, tc.leaseNN) - if requeue != tc.expectedRequeue { + if (requeue != 0) != tc.expectedRequeue { t.Errorf("reconcileElectionStep() requeue = %v, want %v", requeue, tc.expectedRequeue) } if tc.expectedError && err == nil { @@ -404,9 +398,6 @@ func TestController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "kube-system", Name: "component-A", - Annotations: map[string]string{ - ElectedByAnnotationName: controllerName, - }, }, Spec: v1.LeaseSpec{ HolderIdentity: ptr.To("component-identity-1"), @@ -463,9 +454,6 @@ func TestController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "kube-system", Name: "component-A", - Annotations: map[string]string{ - ElectedByAnnotationName: controllerName, - }, }, Spec: v1.LeaseSpec{ HolderIdentity: ptr.To("component-identity-1"), @@ -482,9 +470,6 @@ func TestController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "kube-system", Name: "component-A", - Annotations: map[string]string{ - ElectedByAnnotationName: controllerName, - }, }, Spec: v1alpha1.LeaseCandidateSpec{}, }, @@ -510,9 +495,6 @@ func TestController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "kube-system", Name: "component-A", - Annotations: map[string]string{ - ElectedByAnnotationName: controllerName, - }, }, Spec: v1.LeaseSpec{ HolderIdentity: ptr.To("component-identity-1"), @@ -529,9 +511,6 @@ func TestController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "kube-system", Name: "component-A", - Annotations: map[string]string{ - ElectedByAnnotationName: controllerName, - }, }, Spec: v1alpha1.LeaseCandidateSpec{}, }, @@ -567,9 +546,6 @@ func TestController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Namespace: "kube-system", Name: "component-A", - Annotations: map[string]string{ - ElectedByAnnotationName: controllerName, - }, }, Spec: v1.LeaseSpec{ HolderIdentity: ptr.To("component-identity-2"), diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index c61c600a7f7..d9d87d55f24 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -161,6 +161,7 @@ type LeaderElectionConfig struct { Name string // Coordinated will use the Coordinated Leader Election feature + // WARNING: Coordinated leader election is ALPHA. Coordinated bool } @@ -293,6 +294,7 @@ func (le *LeaderElector) renew(ctx context.Context) { return } le.metrics.leaderOff(le.config.Name) + klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) @@ -354,15 +356,15 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool { le.observedRawRecord = oldLeaderElectionRawRecord } - hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time) + hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time) if hasExpired { klog.Infof("lock has expired: %v", le.config.Lock.Describe()) return false } if !le.IsLeader() { - klog.V(4).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe()) + klog.V(6).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe()) return false } @@ -370,7 +372,7 @@ func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool { if le.IsLeader() && oldLeaderElectionRecord.PreferredHolder != "" { klog.V(4).Infof("lock is marked as 'end of term': %v", le.config.Lock.Describe()) // TODO: Instead of letting lease expire, the holder may deleted it directly - // This will not be compatible with all controllers, so it needs to be opt-in behavior.. + // This will not be compatible with all controllers, so it needs to be opt-in behavior. // We must ensure all code guarded by this lease has successfully completed // prior to releasing or there may be two processes // simultaneously acting on the critical path. diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go index 98871213c9f..fa07066f03b 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go @@ -63,9 +63,14 @@ type LeaseCandidate struct { preferredStrategies []v1.CoordinatedLeaseStrategy } +// NewCandidate creates new LeaseCandidate controller that creates a +// LeaseCandidate object if it does not exist and watches changes +// to the corresponding object and renews if PingTime is set. +// WARNING: This is an ALPHA feature. Ensure that the CoordinatedLeaderElection +// feature gate is on. func NewCandidate(clientset kubernetes.Interface, - candidateName string, candidateNamespace string, + candidateName string, targetLease string, binaryVersion, emulationVersion string, preferredStrategies []v1.CoordinatedLeaseStrategy, @@ -144,7 +149,6 @@ func (c *LeaseCandidate) processNextWorkItem(ctx context.Context) bool { } utilruntime.HandleError(err) - klog.Infof("processNextWorkItem.AddRateLimited: %v", key) c.queue.AddRateLimited(key) return true @@ -161,9 +165,8 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error { if apierrors.IsNotFound(err) { klog.V(2).Infof("Creating lease candidate") // lease does not exist, create it. - leaseToCreate := c.newLease() - _, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}) - if err != nil { + leaseToCreate := c.newLeaseCandidate() + if _, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}); err != nil { return err } klog.V(2).Infof("Created lease candidate") @@ -171,7 +174,7 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error { } else if err != nil { return err } - klog.V(2).Infof("lease candidate exists.. renewing") + klog.V(2).Infof("lease candidate exists. Renewing.") clone := lease.DeepCopy() clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} clone.Spec.PingTime = nil @@ -182,8 +185,8 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error { return nil } -func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate { - lease := &v1alpha1.LeaseCandidate{ +func (c *LeaseCandidate) newLeaseCandidate() *v1alpha1.LeaseCandidate { + lc := &v1alpha1.LeaseCandidate{ ObjectMeta: metav1.ObjectMeta{ Name: c.name, Namespace: c.namespace, @@ -195,6 +198,6 @@ func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate { PreferredStrategies: c.preferredStrategies, }, } - lease.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} - return lease + lc.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} + return lc } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go index 43e0d2d11ae..d1378228bfb 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go @@ -48,8 +48,8 @@ func TestLeaseCandidateCreation(t *testing.T) { client := fake.NewSimpleClientset() candidate, _, err := NewCandidate( client, - tc.candidateName, tc.candidateNamespace, + tc.candidateName, tc.leaseName, tc.binaryVersion, tc.emulationVersion, diff --git a/test/integration/apiserver/coordinated_leader_election_test.go b/test/integration/apiserver/coordinated_leader_election_test.go index 8d2ce297b25..bbdfcdbe614 100644 --- a/test/integration/apiserver/coordinated_leader_election_test.go +++ b/test/integration/apiserver/coordinated_leader_election_test.go @@ -199,8 +199,8 @@ func (t cleTest) createAndRunFakeLegacyController(name string, namespace string, func (t cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) { identityLease, _, err := leaderelection.NewCandidate( t.clientset, - name, namespace, + name, targetLease, binaryVersion, compatibilityVersion,