diff --git a/pkg/apis/coordination/types.go b/pkg/apis/coordination/types.go index d67546ab940..b1fb9d98481 100644 --- a/pkg/apis/coordination/types.go +++ b/pkg/apis/coordination/types.go @@ -117,13 +117,12 @@ type LeaseCandidateSpec struct { // LeaseCandidate will respond by updating RenewTime. // +optional PingTime *metav1.MicroTime - // RenewTime is the time that the LeaseCandidate was last updated. - // Any time a Lease needs to do leader election, the PingTime field - // is updated to signal to the LeaseCandidate that they should update - // the RenewTime. - // Old LeaseCandidate objects are also garbage collected if it has been hours - // since the last renew. The PingTime field is updated regularly to prevent - // garbage collection for still active LeaseCandidates. + // RenewTime is the time that the LeaseCandidate was last updated. Any time + // a Lease needs to do leader election, the PingTime field is updated to + // signal to the LeaseCandidate that they should update the RenewTime. The + // PingTime field is also updated regularly and LeaseCandidates must update + // RenewTime to prevent garbage collection for still active LeaseCandidates. + // Old LeaseCandidate objects are periodically garbage collected. // +optional RenewTime *metav1.MicroTime // BinaryVersion is the binary version. It must be in a semver format without leading `v`. diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index 08f7cf4aad0..c12a565c462 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -169,7 +169,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele ) return func(ctx context.Context, workers int) { go controller.Run(ctx, workers) - go gccontroller.Run(ctx.Done()) + go gccontroller.Run(ctx) }, err }) return nil diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go index a4da8450657..e3dfd8f8926 100644 --- a/pkg/controlplane/controller/leaderelection/leaderelection_controller.go +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go @@ -126,7 +126,6 @@ func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCan c.enqueueLease(oldObj) }, }) - if err != nil { return nil, err } @@ -141,7 +140,6 @@ func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCan c.enqueueCandidate(oldObj) }, }) - if err != nil { return nil, err } @@ -179,7 +177,7 @@ func (c *Controller) enqueueCandidate(obj any) { return } // Ignore candidates that transitioned to Pending because reelection is already in progress - if lc.Spec.PingTime != nil { + if lc.Spec.PingTime != nil && lc.Spec.RenewTime.Before(lc.Spec.PingTime) { return } c.queue.Add(types.NamespacedName{Namespace: lc.Namespace, Name: lc.Spec.LeaseName}) @@ -205,6 +203,7 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease return true, nil } + // every 15min enforce an election to update all candidates. Every 30min we garbage collect. for _, candidate := range candidates { if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(leaseCandidateValidDuration/2).Before(time.Now()) { return true, nil @@ -241,7 +240,6 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease // PingTime + electionDuration < time.Now: Candidate has not responded within the appropriate PingTime. Continue the election. // RenewTime + 5 seconds > time.Now: All candidates acked in the last 5 seconds, continue the election. func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.NamespacedName) (requeue time.Duration, err error) { - now := time.Now() candidates, err := c.listAdmissableCandidates(leaseNN) if err != nil { @@ -254,17 +252,52 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na // Check if an election is really needed by looking at the current lease and candidates needElection, err := c.electionNeeded(candidates, leaseNN) if !needElection { - return noRequeue, err + return defaultRequeueInterval, err } if err != nil { return defaultRequeueInterval, err } + now := time.Now() + canVoteYet := true + for _, candidate := range candidates { + if candidate.Spec.PingTime != nil && candidate.Spec.PingTime.Add(electionDuration).After(now) && + candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Before(candidate.Spec.PingTime) { + + // continue waiting for the election to timeout + canVoteYet = false + continue + } + if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).After(now) { + continue + } + + if candidate.Spec.PingTime == nil || + // If PingTime is outdated, send another PingTime only if it already acked the first one. + (candidate.Spec.PingTime.Add(electionDuration).Before(now) && candidate.Spec.PingTime.Before(candidate.Spec.RenewTime)) { + // TODO(jefftree): We should randomize the order of sending pings and do them in parallel + // so that all candidates have equal opportunity to ack. + clone := candidate.DeepCopy() + clone.Spec.PingTime = &metav1.MicroTime{Time: now} + _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) + if err != nil { + return defaultRequeueInterval, err + } + canVoteYet = false + } + } + if !canVoteYet { + return defaultRequeueInterval, nil + } + // election is ongoing as long as unexpired PingTimes exist - atLeastOnePingExpired := false for _, candidate := range candidates { if candidate.Spec.PingTime == nil { - continue + continue // shouldn't be the case after the above + } + + if candidate.Spec.RenewTime != nil && candidate.Spec.PingTime.Before(candidate.Spec.RenewTime) { + continue // this has renewed already } // If a candidate has a PingTime within the election duration, they have not acked @@ -273,39 +306,6 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na // continue waiting for the election to timeout return noRequeue, nil } - - // election timed out without ack (for one of the candidate). Clear and start election. - // TODO(sttts): this seems to be wrong. One candidate might get a lot more time to vote, while others are starving because they got a late ping. We have to give all of them a chance. - atLeastOnePingExpired = true - clone := candidate.DeepCopy() - clone.Spec.PingTime = nil - if _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}); err != nil { - return noRequeue, err - } - break - } - - if !atLeastOnePingExpired { - continueElection := true - for _, candidate := range candidates { - // if renewTime of a candidate is longer ago than electionDuration old, we have to ping. - if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).Before(now) { - continueElection = false - break - } - } - if !continueElection { - // Send an "are you alive" signal to all candidates - for _, candidate := range candidates { - clone := candidate.DeepCopy() - clone.Spec.PingTime = &metav1.MicroTime{Time: time.Now()} - _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) - if err != nil { - return noRequeue, err - } - } - return defaultRequeueInterval, nil - } } var ackedCandidates []*v1alpha1.LeaseCandidate @@ -398,7 +398,8 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na if reflect.DeepEqual(existing, orig) { klog.V(5).Infof("Lease %s already has the most optimal leader %q", leaseNN, *existing.Spec.HolderIdentity) - return noRequeue, nil + // We need to requeue to ensure that we are aware of an expired lease + return defaultRequeueInterval, nil } _, err = c.leaseClient.Leases(leaseNN.Namespace).Update(ctx, existing, metav1.UpdateOptions{}) diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go index 58e2dcd4905..3458aa96df2 100644 --- a/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go @@ -160,7 +160,7 @@ func TestReconcileElectionStep(t *testing.T) { EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))), + RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-4 * electionDuration))), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -173,6 +173,7 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.20.0", BinaryVersion: "1.20.0", + PingTime: ptr.To(metav1.NewMicroTime(time.Now())), RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, @@ -220,44 +221,6 @@ func TestReconcileElectionStep(t *testing.T) { expectedRequeue: true, expectedError: false, }, - { - name: "candidates exist, lease exists, lease expired, 3rdparty strategy", - leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, - candidates: []*v1alpha1.LeaseCandidate{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "component-identity-1", - }, - Spec: v1alpha1.LeaseCandidateSpec{ - LeaseName: "component-A", - EmulationVersion: "1.19.0", - BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), - PreferredStrategies: []v1.CoordinatedLeaseStrategy{"foo.com/bar"}, - }, - }, - }, - existingLease: &v1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "component-A", - Annotations: map[string]string{ - electedByAnnotationName: controllerName, - }, - }, - Spec: v1.LeaseSpec{ - HolderIdentity: ptr.To("component-identity-expired"), - LeaseDurationSeconds: ptr.To(int32(10)), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), - }, - }, - expectLease: true, - expectedHolderIdentity: ptr.To("component-identity-expired"), - expectedStrategy: ptr.To[v1.CoordinatedLeaseStrategy]("foo.com/bar"), - expectedRequeue: true, - expectedError: false, - }, { name: "candidates exist, no acked candidates should return error", leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, @@ -272,7 +235,7 @@ func TestReconcileElectionStep(t *testing.T) { EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), + RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * time.Minute))), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -310,7 +273,7 @@ func TestReconcileElectionStep(t *testing.T) { candidatesPinged: true, }, { - name: "candidates exist, ping within electionDuration should cause no state change", + name: "candidate exist, pinged candidate should have until electionDuration until election decision is made", leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, candidates: []*v1alpha1.LeaseCandidate{ { @@ -323,7 +286,7 @@ func TestReconcileElectionStep(t *testing.T) { EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", PingTime: ptr.To(metav1.NewMicroTime(time.Now())), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))), + RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -331,8 +294,42 @@ func TestReconcileElectionStep(t *testing.T) { existingLease: nil, expectLease: false, expectedHolderIdentity: nil, - expectedStrategy: nil, - expectedRequeue: false, + expectedRequeue: true, + expectedError: false, + }, + { + name: "candidates exist, lease exists, lease expired, 3rdparty strategy", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{"foo.com/bar"}, + }, + }, + }, + existingLease: &v1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-A", + }, + Spec: v1.LeaseSpec{ + HolderIdentity: ptr.To("component-identity-expired"), + LeaseDurationSeconds: ptr.To(int32(10)), + RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), + }, + }, + expectLease: true, + expectedHolderIdentity: ptr.To("component-identity-expired"), + expectedStrategy: ptr.To[v1.CoordinatedLeaseStrategy]("foo.com/bar"), + expectedRequeue: true, expectedError: false, }, } @@ -683,7 +680,6 @@ func TestController(t *testing.T) { if err == nil { if lease.Spec.PingTime != nil { c := lease.DeepCopy() - c.Spec.PingTime = nil c.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} _, err = client.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Update(ctx, c, metav1.UpdateOptions{}) if err != nil { diff --git a/pkg/controlplane/controller/leaderelection/leasecandidategc_controller.go b/pkg/controlplane/controller/leaderelection/leasecandidategc_controller.go index 60659cd5393..7485f1fadd9 100644 --- a/pkg/controlplane/controller/leaderelection/leasecandidategc_controller.go +++ b/pkg/controlplane/controller/leaderelection/leasecandidategc_controller.go @@ -83,7 +83,7 @@ func (c *LeaseCandidateGCController) gc(ctx context.Context) { if !isLeaseCandidateExpired(leaseCandidate) { continue } - lc, err := c.kubeclientset.CoordinationV1alpha1().LeaseCandidates(leaseCandidate.Namespace).Get(context.TODO(), leaseCandidate.Name, metav1.GetOptions{}) + lc, err := c.kubeclientset.CoordinationV1alpha1().LeaseCandidates(leaseCandidate.Namespace).Get(ctx, leaseCandidate.Name, metav1.GetOptions{}) if err != nil { klog.ErrorS(err, "Error getting lc") continue diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go index fa07066f03b..c75fc3bdf76 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go @@ -103,7 +103,7 @@ func NewCandidate(clientset kubernetes.Interface, h, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj, newObj interface{}) { if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok { - if leasecandidate.Spec.PingTime != nil { + if leasecandidate.Spec.PingTime != nil && leasecandidate.Spec.PingTime.After(leasecandidate.Spec.RenewTime.Time) { lc.enqueueLease() } } @@ -177,7 +177,6 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error { klog.V(2).Infof("lease candidate exists. Renewing.") clone := lease.DeepCopy() clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} - clone.Spec.PingTime = nil _, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{}) if err != nil { return err diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go index 92dc199f029..c50059180e3 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go @@ -130,7 +130,6 @@ func pollForLease(ctx context.Context, tc testcase, client *fake.Clientset, t *m if lc.Spec.BinaryVersion == tc.binaryVersion && lc.Spec.EmulationVersion == tc.emulationVersion && lc.Spec.LeaseName == tc.leaseName && - lc.Spec.PingTime == nil && lc.Spec.RenewTime != nil { // Ensure that if a time is provided, the renewTime occurred after the provided time. if t != nil && t.After(lc.Spec.RenewTime.Time) { diff --git a/test/integration/apiserver/coordinated_leader_election_test.go b/test/integration/apiserver/coordinated_leader_election_test.go index bbdfcdbe614..e66f11b96f9 100644 --- a/test/integration/apiserver/coordinated_leader_election_test.go +++ b/test/integration/apiserver/coordinated_leader_election_test.go @@ -144,7 +144,8 @@ func TestLeaseCandidateCleanup(t *testing.T) { BinaryVersion: "0.1.0", EmulationVersion: "0.1.0", PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, - RenewTime: &metav1.MicroTime{Time: time.Now().Add(-1 * time.Hour)}, + RenewTime: &metav1.MicroTime{Time: time.Now().Add(-2 * time.Hour)}, + PingTime: &metav1.MicroTime{Time: time.Now().Add(-1 * time.Hour)}, }, } ctx := context.Background() @@ -266,7 +267,7 @@ func leaderElectAndRun(ctx context.Context, kubeconfig *rest.Config, lockIdentit } func (t cleTest) pollForLease(name, namespace, holder string) { - err := wait.PollUntilContextTimeout(t.ctxList["main"].ctx, 1000*time.Millisecond, 15*time.Second, true, func(ctx context.Context) (done bool, err error) { + err := wait.PollUntilContextTimeout(t.ctxList["main"].ctx, 1000*time.Millisecond, 25*time.Second, true, func(ctx context.Context) (done bool, err error) { lease, err := t.clientset.CoordinationV1().Leases(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { fmt.Println(err) @@ -286,7 +287,7 @@ func (t cleTest) cancelController(name, namespace string) { func (t cleTest) cleanup() { err := t.clientset.CoordinationV1().Leases("kube-system").Delete(context.TODO(), "leader-election-controller", metav1.DeleteOptions{}) - if err != nil { + if err != nil && !apierrors.IsNotFound(err) { t.t.Error(err) } for _, c := range t.ctxList { diff --git a/test/integration/controlplane/generic_test.go b/test/integration/controlplane/generic_test.go index d8508f014ab..259f279c330 100644 --- a/test/integration/controlplane/generic_test.go +++ b/test/integration/controlplane/generic_test.go @@ -83,6 +83,7 @@ func TestGenericControlplaneStartUp(t *testing.T) { "events", "events.events.k8s.io", "flowschemas.flowcontrol.apiserver.k8s.io", + "leasecandidates.coordination.k8s.io", "leases.coordination.k8s.io", "localsubjectaccessreviews.authorization.k8s.io", "mutatingwebhookconfigurations.admissionregistration.k8s.io",