Merge pull request #130533 from Henrywu573/parall

Parallelize lease candidate ping
This commit is contained in:
Kubernetes Prow Robot 2025-03-11 12:47:53 -07:00 committed by GitHub
commit ac05e1a48e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -22,6 +22,7 @@ import (
"reflect"
"time"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/coordination/v1"
v1alpha2 "k8s.io/api/coordination/v1alpha2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -264,6 +265,7 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
now := c.clock.Now()
canVoteYet := true
g, gCtx := errgroup.WithContext(ctx)
for _, candidate := range candidates {
if candidate.Spec.PingTime != nil && candidate.Spec.PingTime.Add(electionDuration).After(now) &&
candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Before(candidate.Spec.PingTime) {
@ -280,17 +282,18 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
// If PingTime is outdated, send another PingTime only if it already acked the first one.
// This checks for pingTime <= renewTime because equality is possible in unit tests using a fake clock.
(candidate.Spec.PingTime.Add(electionDuration).Before(now) && !candidate.Spec.RenewTime.Before(candidate.Spec.PingTime)) {
// TODO(jefftree): We should randomize the order of sending pings and do them in parallel
// so that all candidates have equal opportunity to ack.
clone := candidate.DeepCopy()
clone.Spec.PingTime = &metav1.MicroTime{Time: now}
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return defaultRequeueInterval, err
}
g.Go(func() error {
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(gCtx, clone, metav1.UpdateOptions{})
return err
})
canVoteYet = false
}
}
if err := g.Wait(); err != nil {
return defaultRequeueInterval, err
}
if !canVoteYet {
return defaultRequeueInterval, nil
}