Change PingTime to be persistent

This commit is contained in:
Jefftree 2024-07-23 19:19:12 +00:00
parent a738daa88a
commit 0c774d0b1f
9 changed files with 97 additions and 101 deletions

View File

@ -117,13 +117,12 @@ type LeaseCandidateSpec struct {
// LeaseCandidate will respond by updating RenewTime.
// +optional
PingTime *metav1.MicroTime
// RenewTime is the time that the LeaseCandidate was last updated.
// Any time a Lease needs to do leader election, the PingTime field
// is updated to signal to the LeaseCandidate that they should update
// the RenewTime.
// Old LeaseCandidate objects are also garbage collected if it has been hours
// since the last renew. The PingTime field is updated regularly to prevent
// garbage collection for still active LeaseCandidates.
// RenewTime is the time that the LeaseCandidate was last updated. Any time
// a Lease needs to do leader election, the PingTime field is updated to
// signal to the LeaseCandidate that they should update the RenewTime. The
// PingTime field is also updated regularly and LeaseCandidates must update
// RenewTime to prevent garbage collection for still active LeaseCandidates.
// Old LeaseCandidate objects are periodically garbage collected.
// +optional
RenewTime *metav1.MicroTime
// BinaryVersion is the binary version. It must be in a semver format without leading `v`.

View File

@ -169,7 +169,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
)
return func(ctx context.Context, workers int) {
go controller.Run(ctx, workers)
go gccontroller.Run(ctx.Done())
go gccontroller.Run(ctx)
}, err
})
return nil

View File

@ -126,7 +126,6 @@ func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCan
c.enqueueLease(oldObj)
},
})
if err != nil {
return nil, err
}
@ -141,7 +140,6 @@ func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCan
c.enqueueCandidate(oldObj)
},
})
if err != nil {
return nil, err
}
@ -179,7 +177,7 @@ func (c *Controller) enqueueCandidate(obj any) {
return
}
// Ignore candidates that transitioned to Pending because reelection is already in progress
if lc.Spec.PingTime != nil {
if lc.Spec.PingTime != nil && lc.Spec.RenewTime.Before(lc.Spec.PingTime) {
return
}
c.queue.Add(types.NamespacedName{Namespace: lc.Namespace, Name: lc.Spec.LeaseName})
@ -205,6 +203,7 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease
return true, nil
}
// every 15min enforce an election to update all candidates. Every 30min we garbage collect.
for _, candidate := range candidates {
if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(leaseCandidateValidDuration/2).Before(time.Now()) {
return true, nil
@ -241,7 +240,6 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease
// PingTime + electionDuration < time.Now: Candidate has not responded within the appropriate PingTime. Continue the election.
// RenewTime + 5 seconds > time.Now: All candidates acked in the last 5 seconds, continue the election.
func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.NamespacedName) (requeue time.Duration, err error) {
now := time.Now()
candidates, err := c.listAdmissableCandidates(leaseNN)
if err != nil {
@ -254,17 +252,52 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
// Check if an election is really needed by looking at the current lease and candidates
needElection, err := c.electionNeeded(candidates, leaseNN)
if !needElection {
return noRequeue, err
return defaultRequeueInterval, err
}
if err != nil {
return defaultRequeueInterval, err
}
now := time.Now()
canVoteYet := true
for _, candidate := range candidates {
if candidate.Spec.PingTime != nil && candidate.Spec.PingTime.Add(electionDuration).After(now) &&
candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Before(candidate.Spec.PingTime) {
// continue waiting for the election to timeout
canVoteYet = false
continue
}
if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).After(now) {
continue
}
if candidate.Spec.PingTime == nil ||
// If PingTime is outdated, send another PingTime only if it already acked the first one.
(candidate.Spec.PingTime.Add(electionDuration).Before(now) && candidate.Spec.PingTime.Before(candidate.Spec.RenewTime)) {
// TODO(jefftree): We should randomize the order of sending pings and do them in parallel
// so that all candidates have equal opportunity to ack.
clone := candidate.DeepCopy()
clone.Spec.PingTime = &metav1.MicroTime{Time: now}
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return defaultRequeueInterval, err
}
canVoteYet = false
}
}
if !canVoteYet {
return defaultRequeueInterval, nil
}
// election is ongoing as long as unexpired PingTimes exist
atLeastOnePingExpired := false
for _, candidate := range candidates {
if candidate.Spec.PingTime == nil {
continue
continue // shouldn't be the case after the above
}
if candidate.Spec.RenewTime != nil && candidate.Spec.PingTime.Before(candidate.Spec.RenewTime) {
continue // this has renewed already
}
// If a candidate has a PingTime within the election duration, they have not acked
@ -273,39 +306,6 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
// continue waiting for the election to timeout
return noRequeue, nil
}
// election timed out without ack (for one of the candidate). Clear and start election.
// TODO(sttts): this seems to be wrong. One candidate might get a lot more time to vote, while others are starving because they got a late ping. We have to give all of them a chance.
atLeastOnePingExpired = true
clone := candidate.DeepCopy()
clone.Spec.PingTime = nil
if _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}); err != nil {
return noRequeue, err
}
break
}
if !atLeastOnePingExpired {
continueElection := true
for _, candidate := range candidates {
// if renewTime of a candidate is longer ago than electionDuration old, we have to ping.
if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).Before(now) {
continueElection = false
break
}
}
if !continueElection {
// Send an "are you alive" signal to all candidates
for _, candidate := range candidates {
clone := candidate.DeepCopy()
clone.Spec.PingTime = &metav1.MicroTime{Time: time.Now()}
_, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return noRequeue, err
}
}
return defaultRequeueInterval, nil
}
}
var ackedCandidates []*v1alpha1.LeaseCandidate
@ -398,7 +398,8 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
if reflect.DeepEqual(existing, orig) {
klog.V(5).Infof("Lease %s already has the most optimal leader %q", leaseNN, *existing.Spec.HolderIdentity)
return noRequeue, nil
// We need to requeue to ensure that we are aware of an expired lease
return defaultRequeueInterval, nil
}
_, err = c.leaseClient.Leases(leaseNN.Namespace).Update(ctx, existing, metav1.UpdateOptions{})

View File

@ -160,7 +160,7 @@ func TestReconcileElectionStep(t *testing.T) {
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-4 * electionDuration))),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
@ -173,6 +173,7 @@ func TestReconcileElectionStep(t *testing.T) {
LeaseName: "component-A",
EmulationVersion: "1.20.0",
BinaryVersion: "1.20.0",
PingTime: ptr.To(metav1.NewMicroTime(time.Now())),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
@ -220,44 +221,6 @@ func TestReconcileElectionStep(t *testing.T) {
expectedRequeue: true,
expectedError: false,
},
{
name: "candidates exist, lease exists, lease expired, 3rdparty strategy",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{"foo.com/bar"},
},
},
},
existingLease: &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-A",
Annotations: map[string]string{
electedByAnnotationName: controllerName,
},
},
Spec: v1.LeaseSpec{
HolderIdentity: ptr.To("component-identity-expired"),
LeaseDurationSeconds: ptr.To(int32(10)),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
},
},
expectLease: true,
expectedHolderIdentity: ptr.To("component-identity-expired"),
expectedStrategy: ptr.To[v1.CoordinatedLeaseStrategy]("foo.com/bar"),
expectedRequeue: true,
expectedError: false,
},
{
name: "candidates exist, no acked candidates should return error",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
@ -272,7 +235,7 @@ func TestReconcileElectionStep(t *testing.T) {
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * time.Minute))),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
@ -310,7 +273,7 @@ func TestReconcileElectionStep(t *testing.T) {
candidatesPinged: true,
},
{
name: "candidates exist, ping within electionDuration should cause no state change",
name: "candidate exist, pinged candidate should have until electionDuration until election decision is made",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
@ -323,7 +286,7 @@ func TestReconcileElectionStep(t *testing.T) {
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
PingTime: ptr.To(metav1.NewMicroTime(time.Now())),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
},
},
@ -331,8 +294,42 @@ func TestReconcileElectionStep(t *testing.T) {
existingLease: nil,
expectLease: false,
expectedHolderIdentity: nil,
expectedStrategy: nil,
expectedRequeue: false,
expectedRequeue: true,
expectedError: false,
},
{
name: "candidates exist, lease exists, lease expired, 3rdparty strategy",
leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"},
candidates: []*v1alpha1.LeaseCandidate{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-identity-1",
},
Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "component-A",
EmulationVersion: "1.19.0",
BinaryVersion: "1.19.0",
RenewTime: ptr.To(metav1.NewMicroTime(time.Now())),
PreferredStrategies: []v1.CoordinatedLeaseStrategy{"foo.com/bar"},
},
},
},
existingLease: &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "component-A",
},
Spec: v1.LeaseSpec{
HolderIdentity: ptr.To("component-identity-expired"),
LeaseDurationSeconds: ptr.To(int32(10)),
RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))),
},
},
expectLease: true,
expectedHolderIdentity: ptr.To("component-identity-expired"),
expectedStrategy: ptr.To[v1.CoordinatedLeaseStrategy]("foo.com/bar"),
expectedRequeue: true,
expectedError: false,
},
}
@ -683,7 +680,6 @@ func TestController(t *testing.T) {
if err == nil {
if lease.Spec.PingTime != nil {
c := lease.DeepCopy()
c.Spec.PingTime = nil
c.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
_, err = client.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Update(ctx, c, metav1.UpdateOptions{})
if err != nil {

View File

@ -83,7 +83,7 @@ func (c *LeaseCandidateGCController) gc(ctx context.Context) {
if !isLeaseCandidateExpired(leaseCandidate) {
continue
}
lc, err := c.kubeclientset.CoordinationV1alpha1().LeaseCandidates(leaseCandidate.Namespace).Get(context.TODO(), leaseCandidate.Name, metav1.GetOptions{})
lc, err := c.kubeclientset.CoordinationV1alpha1().LeaseCandidates(leaseCandidate.Namespace).Get(ctx, leaseCandidate.Name, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Error getting lc")
continue

View File

@ -103,7 +103,7 @@ func NewCandidate(clientset kubernetes.Interface,
h, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok {
if leasecandidate.Spec.PingTime != nil {
if leasecandidate.Spec.PingTime != nil && leasecandidate.Spec.PingTime.After(leasecandidate.Spec.RenewTime.Time) {
lc.enqueueLease()
}
}
@ -177,7 +177,6 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
klog.V(2).Infof("lease candidate exists. Renewing.")
clone := lease.DeepCopy()
clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
clone.Spec.PingTime = nil
_, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return err

View File

@ -130,7 +130,6 @@ func pollForLease(ctx context.Context, tc testcase, client *fake.Clientset, t *m
if lc.Spec.BinaryVersion == tc.binaryVersion &&
lc.Spec.EmulationVersion == tc.emulationVersion &&
lc.Spec.LeaseName == tc.leaseName &&
lc.Spec.PingTime == nil &&
lc.Spec.RenewTime != nil {
// Ensure that if a time is provided, the renewTime occurred after the provided time.
if t != nil && t.After(lc.Spec.RenewTime.Time) {

View File

@ -144,7 +144,8 @@ func TestLeaseCandidateCleanup(t *testing.T) {
BinaryVersion: "0.1.0",
EmulationVersion: "0.1.0",
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
RenewTime: &metav1.MicroTime{Time: time.Now().Add(-1 * time.Hour)},
RenewTime: &metav1.MicroTime{Time: time.Now().Add(-2 * time.Hour)},
PingTime: &metav1.MicroTime{Time: time.Now().Add(-1 * time.Hour)},
},
}
ctx := context.Background()
@ -266,7 +267,7 @@ func leaderElectAndRun(ctx context.Context, kubeconfig *rest.Config, lockIdentit
}
func (t cleTest) pollForLease(name, namespace, holder string) {
err := wait.PollUntilContextTimeout(t.ctxList["main"].ctx, 1000*time.Millisecond, 15*time.Second, true, func(ctx context.Context) (done bool, err error) {
err := wait.PollUntilContextTimeout(t.ctxList["main"].ctx, 1000*time.Millisecond, 25*time.Second, true, func(ctx context.Context) (done bool, err error) {
lease, err := t.clientset.CoordinationV1().Leases(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
fmt.Println(err)
@ -286,7 +287,7 @@ func (t cleTest) cancelController(name, namespace string) {
func (t cleTest) cleanup() {
err := t.clientset.CoordinationV1().Leases("kube-system").Delete(context.TODO(), "leader-election-controller", metav1.DeleteOptions{})
if err != nil {
if err != nil && !apierrors.IsNotFound(err) {
t.t.Error(err)
}
for _, c := range t.ctxList {

View File

@ -83,6 +83,7 @@ func TestGenericControlplaneStartUp(t *testing.T) {
"events",
"events.events.k8s.io",
"flowschemas.flowcontrol.apiserver.k8s.io",
"leasecandidates.coordination.k8s.io",
"leases.coordination.k8s.io",
"localsubjectaccessreviews.authorization.k8s.io",
"mutatingwebhookconfigurations.admissionregistration.k8s.io",