Review feedback

Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
This commit is contained in:
Dr. Stefan Schimanski 2024-07-23 18:37:47 +02:00 committed by Jefftree
parent 42678f1553
commit a64418ba0a
3 changed files with 56 additions and 54 deletions

View File

@ -44,15 +44,6 @@ func pickBestLeaderOldestEmulationVersion(candidates []*v1alpha1.LeaseCandidate)
return electee return electee
} }
func shouldReelect(candidates []*v1alpha1.LeaseCandidate, currentLeader *v1alpha1.LeaseCandidate) bool {
klog.Infof("shouldReelect for candidates: %+v", candidates)
pickedLeader := pickBestLeaderOldestEmulationVersion(candidates)
if pickedLeader == nil {
return false
}
return compare(currentLeader, pickedLeader) > 0
}
// topologicalSortWithOneRoot has a caveat that there may only be one root (indegree=0) node in a valid ordering. // topologicalSortWithOneRoot has a caveat that there may only be one root (indegree=0) node in a valid ordering.
func topologicalSortWithOneRoot(graph map[v1.CoordinatedLeaseStrategy][]v1.CoordinatedLeaseStrategy) []v1.CoordinatedLeaseStrategy { func topologicalSortWithOneRoot(graph map[v1.CoordinatedLeaseStrategy][]v1.CoordinatedLeaseStrategy) []v1.CoordinatedLeaseStrategy {
inDegree := make(map[v1.CoordinatedLeaseStrategy]int) inDegree := make(map[v1.CoordinatedLeaseStrategy]int)
@ -128,7 +119,7 @@ func validLeaseCandidateForOldestEmulationVersion(l *v1alpha1.LeaseCandidate) bo
return err == nil return err == nil
} }
func getEmulationVersion(l *v1alpha1.LeaseCandidate) semver.Version { func getEmulationVersionOrZero(l *v1alpha1.LeaseCandidate) semver.Version {
value := l.Spec.EmulationVersion value := l.Spec.EmulationVersion
v, err := semver.ParseTolerant(value) v, err := semver.ParseTolerant(value)
if err != nil { if err != nil {
@ -137,7 +128,7 @@ func getEmulationVersion(l *v1alpha1.LeaseCandidate) semver.Version {
return v return v
} }
func getBinaryVersion(l *v1alpha1.LeaseCandidate) semver.Version { func getBinaryVersionOrZero(l *v1alpha1.LeaseCandidate) semver.Version {
value := l.Spec.BinaryVersion value := l.Spec.BinaryVersion
v, err := semver.ParseTolerant(value) v, err := semver.ParseTolerant(value)
if err != nil { if err != nil {
@ -148,13 +139,13 @@ func getBinaryVersion(l *v1alpha1.LeaseCandidate) semver.Version {
// -1: lhs better, 1: rhs better // -1: lhs better, 1: rhs better
func compare(lhs, rhs *v1alpha1.LeaseCandidate) int { func compare(lhs, rhs *v1alpha1.LeaseCandidate) int {
lhsVersion := getEmulationVersion(lhs) l := getEmulationVersionOrZero(lhs)
rhsVersion := getEmulationVersion(rhs) r := getEmulationVersionOrZero(rhs)
result := lhsVersion.Compare(rhsVersion) result := l.Compare(r)
if result == 0 { if result == 0 {
lhsVersion := getBinaryVersion(lhs) l := getBinaryVersionOrZero(lhs)
rhsVersion := getBinaryVersion(rhs) r := getBinaryVersionOrZero(rhs)
result = lhsVersion.Compare(rhsVersion) result = l.Compare(r)
} }
if result == 0 { if result == 0 {
if lhs.CreationTimestamp.After(rhs.CreationTimestamp.Time) { if lhs.CreationTimestamp.After(rhs.CreationTimestamp.Time) {

View File

@ -254,7 +254,7 @@ func TestGetEmulationVersion(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got := getEmulationVersion(tt.candidate) got := getEmulationVersionOrZero(tt.candidate)
if got.FinalizeVersion() != tt.want.FinalizeVersion() { if got.FinalizeVersion() != tt.want.FinalizeVersion() {
t.Errorf("getEmulationVersion() = %v, want %v", got, tt.want) t.Errorf("getEmulationVersion() = %v, want %v", got, tt.want)
} }
@ -280,7 +280,7 @@ func TestGetBinaryVersion(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got := getBinaryVersion(tt.candidate) got := getBinaryVersionOrZero(tt.candidate)
if got.FinalizeVersion() != tt.want.FinalizeVersion() { if got.FinalizeVersion() != tt.want.FinalizeVersion() {
t.Errorf("getBinaryVersion() = %v, want %v", got, tt.want) t.Errorf("getBinaryVersion() = %v, want %v", got, tt.want)
} }
@ -748,3 +748,11 @@ func equalStrategies(s1, s2 []v1.CoordinatedLeaseStrategy) bool {
} }
return true return true
} }
func shouldReelect(candidates []*v1alpha1.LeaseCandidate, currentLeader *v1alpha1.LeaseCandidate) bool {
pickedLeader := pickBestLeaderOldestEmulationVersion(candidates)
if pickedLeader == nil {
return false
}
return compare(currentLeader, pickedLeader) > 0
}

View File

@ -42,9 +42,11 @@ import (
const ( const (
controllerName = "leader-election-controller" controllerName = "leader-election-controller"
// Requeue interval is the interval at which a Lease is requeued to verify that it is being renewed properly. // Requeue interval is the interval at which a Lease is requeued to verify that it is
// being renewed properly.
defaultRequeueInterval = 5 * time.Second defaultRequeueInterval = 5 * time.Second
noRequeue = 0 noRequeue = 0
defaultLeaseDurationSeconds int32 = 5 defaultLeaseDurationSeconds int32 = 5
electionDuration = 5 * time.Second electionDuration = 5 * time.Second
@ -92,7 +94,7 @@ func (c *Controller) Run(ctx context.Context, workers int) {
return return
} }
for _, lc := range lcs { for _, lc := range lcs {
c.processCandidate(lc) c.enqueueCandidate(lc)
} }
klog.Infof("Workers: %d", workers) klog.Infof("Workers: %d", workers)
@ -114,13 +116,13 @@ func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCan
} }
leaseSynced, err := leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ leaseSynced, err := leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
c.processLease(obj) c.enqueueLease(obj)
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
c.processLease(newObj) c.enqueueLease(newObj)
}, },
DeleteFunc: func(oldObj interface{}) { DeleteFunc: func(oldObj interface{}) {
c.processLease(oldObj) c.enqueueLease(oldObj)
}, },
}) })
@ -129,13 +131,13 @@ func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCan
} }
leaseCandidateSynced, err := leaseCandidateInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ leaseCandidateSynced, err := leaseCandidateInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
c.processCandidate(obj) c.enqueueCandidate(obj)
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
c.processCandidate(newObj) c.enqueueCandidate(newObj)
}, },
DeleteFunc: func(oldObj interface{}) { DeleteFunc: func(oldObj interface{}) {
c.processCandidate(oldObj) c.enqueueCandidate(oldObj)
}, },
}) })
@ -167,7 +169,7 @@ func (c *Controller) processNextElectionItem(ctx context.Context) bool {
return true return true
} }
func (c *Controller) processCandidate(obj any) { func (c *Controller) enqueueCandidate(obj any) {
lc, ok := obj.(*v1alpha1.LeaseCandidate) lc, ok := obj.(*v1alpha1.LeaseCandidate)
if !ok { if !ok {
return return
@ -182,7 +184,7 @@ func (c *Controller) processCandidate(obj any) {
c.queue.Add(types.NamespacedName{Namespace: lc.Namespace, Name: lc.Spec.LeaseName}) c.queue.Add(types.NamespacedName{Namespace: lc.Namespace, Name: lc.Spec.LeaseName})
} }
func (c *Controller) processLease(obj any) { func (c *Controller) enqueueLease(obj any) {
lease, ok := obj.(*v1.Lease) lease, ok := obj.(*v1.Lease)
if !ok { if !ok {
return return
@ -193,7 +195,7 @@ func (c *Controller) processLease(obj any) {
func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, leaseNN types.NamespacedName) (bool, error) { func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, leaseNN types.NamespacedName) (bool, error) {
lease, err := c.leaseInformer.Lister().Leases(leaseNN.Namespace).Get(leaseNN.Name) lease, err := c.leaseInformer.Lister().Leases(leaseNN.Namespace).Get(leaseNN.Name)
if err != nil && !apierrors.IsNotFound(err) { if err != nil && !apierrors.IsNotFound(err) {
return false, fmt.Errorf("error reading lease") return false, fmt.Errorf("error reading lease: %w", err)
} else if apierrors.IsNotFound(err) { } else if apierrors.IsNotFound(err) {
return true, nil return true, nil
} }
@ -213,7 +215,7 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease
return false, err return false, err
} }
if prelimStrategy != v1.OldestEmulationVersion { if prelimStrategy != v1.OldestEmulationVersion {
klog.V(2).Infof("strategy %s is not recognized by CLE.", prelimStrategy) klog.V(5).Infof("Strategy %q is ignored by CLE", prelimStrategy)
return false, nil return false, nil
} }
@ -221,7 +223,7 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease
if prelimElectee == nil { if prelimElectee == nil {
return false, nil return false, nil
} else if lease != nil && lease.Spec.HolderIdentity != nil && prelimElectee.Name == *lease.Spec.HolderIdentity { } else if lease != nil && lease.Spec.HolderIdentity != nil && prelimElectee.Name == *lease.Spec.HolderIdentity {
klog.V(2).Infof("Leader %s is already most optimal for lease %s %s", prelimElectee.Name, lease.Namespace, lease.Name) klog.V(5).Infof("Leader %s is already most optimal for lease %s", prelimElectee.Name, leaseNN)
return false, nil return false, nil
} }
return true, nil return true, nil
@ -246,10 +248,9 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
} else if len(candidates) == 0 { } else if len(candidates) == 0 {
return noRequeue, nil return noRequeue, nil
} }
klog.V(4).Infof("reconcileElectionStep %s, candidates: %d", leaseNN, len(candidates)) klog.V(6).Infof("Reconciling election for %s, candidates: %d", leaseNN, len(candidates))
// Check if an election is really needed by looking at the current lease // Check if an election is really needed by looking at the current lease and candidates
// and set of candidates
needElection, err := c.electionNeeded(candidates, leaseNN) needElection, err := c.electionNeeded(candidates, leaseNN)
if !needElection { if !needElection {
return noRequeue, err return noRequeue, err
@ -258,33 +259,35 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
return defaultRequeueInterval, err return defaultRequeueInterval, err
} }
fastTrackElection := false // election is ongoing as long as unexpired PingTimes exist
atLeastOnePingExpired := false
for _, candidate := range candidates { for _, candidate := range candidates {
if candidate.Spec.PingTime == nil {
continue
}
// If a candidate has a PingTime within the election duration, they have not acked // If a candidate has a PingTime within the election duration, they have not acked
// and we should wait until we receive their response // and we should wait until we receive their response
if candidate.Spec.PingTime != nil { if candidate.Spec.PingTime.Add(electionDuration).After(now) {
if candidate.Spec.PingTime.Add(electionDuration).After(now) { // continue waiting for the election to timeout
// continue waiting for the election to timeout return noRequeue, nil
return noRequeue, nil
} else {
// election timed out without ack. Clear and start election.
fastTrackElection = true
clone := candidate.DeepCopy()
clone.Spec.PingTime = nil
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return noRequeue, err
}
break
}
} }
// election timed out without ack (for one of the candidate). Clear and start election.
// TODO(sttts): this seems to be wrong. One candidate might get a lot more time to vote, while others are starving because they got a late ping. We have to give all of them a chance.
atLeastOnePingExpired = true
clone := candidate.DeepCopy()
clone.Spec.PingTime = nil
if _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}); err != nil {
return noRequeue, err
}
break
} }
if !fastTrackElection { if !atLeastOnePingExpired {
continueElection := true continueElection := true
for _, candidate := range candidates { for _, candidate := range candidates {
// if renewTime of a candidate is longer than electionDuration old, we have to ping. // if renewTime of a candidate is longer ago than electionDuration old, we have to ping.
if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).Before(now) { if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).Before(now) {
continueElection = false continueElection = false
break break