diff --git a/pkg/controlplane/controller/leaderelection/election.go b/pkg/controlplane/controller/leaderelection/election.go index 915af0527d4..3f8dbd20667 100644 --- a/pkg/controlplane/controller/leaderelection/election.go +++ b/pkg/controlplane/controller/leaderelection/election.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/coordination/v1" v1alpha1 "k8s.io/api/coordination/v1alpha1" "k8s.io/klog/v2" + "k8s.io/utils/clock" ) func pickBestLeaderOldestEmulationVersion(candidates []*v1alpha1.LeaseCandidate) *v1alpha1.LeaseCandidate { @@ -155,15 +156,15 @@ func compare(lhs, rhs *v1alpha1.LeaseCandidate) int { return result } -func isLeaseExpired(lease *v1.Lease) bool { - currentTime := time.Now() +func isLeaseExpired(clock clock.Clock, lease *v1.Lease) bool { + currentTime := clock.Now() return lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil || lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second).Before(currentTime) } -func isLeaseCandidateExpired(lease *v1alpha1.LeaseCandidate) bool { - currentTime := time.Now() +func isLeaseCandidateExpired(clock clock.Clock, lease *v1alpha1.LeaseCandidate) bool { + currentTime := clock.Now() return lease.Spec.RenewTime == nil || lease.Spec.RenewTime.Add(leaseCandidateValidDuration).Before(currentTime) } diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go index e3dfd8f8926..3ba51c3ee5e 100644 --- a/pkg/controlplane/controller/leaderelection/leaderelection_controller.go +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/clock" "k8s.io/utils/ptr" ) @@ -68,6 +69,8 @@ type Controller struct { leaseCandidateRegistration cache.ResourceEventHandlerRegistration queue workqueue.TypedRateLimitingInterface[types.NamespacedName] + + clock clock.Clock } func (c *Controller) Run(ctx context.Context, workers int) { @@ -114,6 +117,8 @@ func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCan leaseCandidateClient: leaseCandidateClient, queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{Name: controllerName}), + + clock: clock.RealClock{}, } leaseSynced, err := leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -199,13 +204,13 @@ func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, lease return true, nil } - if isLeaseExpired(lease) || lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity == "" { + if isLeaseExpired(c.clock, lease) || lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity == "" { return true, nil } // every 15min enforce an election to update all candidates. Every 30min we garbage collect. for _, candidate := range candidates { - if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(leaseCandidateValidDuration/2).Before(time.Now()) { + if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(leaseCandidateValidDuration/2).Before(c.clock.Now()) { return true, nil } } @@ -258,7 +263,7 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na return defaultRequeueInterval, err } - now := time.Now() + now := c.clock.Now() canVoteYet := true for _, candidate := range candidates { if candidate.Spec.PingTime != nil && candidate.Spec.PingTime.Add(electionDuration).After(now) && @@ -331,7 +336,7 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na Spec: v1.LeaseSpec{ Strategy: &strategy, LeaseDurationSeconds: ptr.To(defaultLeaseDurationSeconds), - RenewTime: &metav1.MicroTime{Time: time.Now()}, + RenewTime: &metav1.MicroTime{Time: c.clock.Now()}, }, } @@ -368,7 +373,7 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na } orig := existing.DeepCopy() - isExpired := isLeaseExpired(existing) + isExpired := isLeaseExpired(c.clock, existing) noHolderIdentity := leaderLease.Spec.HolderIdentity != nil && existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity == "" expiredAndNewHolder := isExpired && leaderLease.Spec.HolderIdentity != nil && *existing.Spec.HolderIdentity != *leaderLease.Spec.HolderIdentity strategyChanged := existing.Spec.Strategy == nil || *existing.Spec.Strategy != strategy @@ -420,7 +425,7 @@ func (c *Controller) listAdmissableCandidates(leaseNN types.NamespacedName) ([]* if l.Spec.LeaseName != leaseNN.Name { continue } - if !isLeaseCandidateExpired(l) { + if !isLeaseCandidateExpired(c.clock, l) { results = append(results, l) } else { klog.Infof("LeaseCandidate %s is expired", l.Name) diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go index 412452c9ce4..2fdb33a0c6b 100644 --- a/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go @@ -31,10 +31,13 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" ) func TestReconcileElectionStep(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + tests := []struct { name string leaseNN types.NamespacedName @@ -83,7 +86,7 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -108,7 +111,7 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -121,7 +124,7 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.18.0", BinaryVersion: "1.18.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -134,7 +137,7 @@ func TestReconcileElectionStep(t *testing.T) { Spec: v1.LeaseSpec{ HolderIdentity: ptr.To("component-identity-1"), LeaseDurationSeconds: ptr.To(int32(10)), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), }, }, expectLease: true, @@ -157,8 +160,8 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-4 * electionDuration))), + PingTime: ptr.To(metav1.NewMicroTime(fakeClock.Now().Add(-2 * electionDuration))), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now().Add(-4 * electionDuration))), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -171,8 +174,8 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.20.0", BinaryVersion: "1.20.0", - PingTime: ptr.To(metav1.NewMicroTime(time.Now())), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PingTime: ptr.To(metav1.NewMicroTime(fakeClock.Now().Add(-1 * time.Millisecond))), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -197,7 +200,7 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -210,7 +213,7 @@ func TestReconcileElectionStep(t *testing.T) { Spec: v1.LeaseSpec{ HolderIdentity: ptr.To("component-identity-expired"), LeaseDurationSeconds: ptr.To(int32(10)), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now().Add(-1 * time.Minute))), }, }, expectLease: true, @@ -232,8 +235,8 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * time.Minute))), + PingTime: ptr.To(metav1.NewMicroTime(fakeClock.Now().Add(-1 * time.Minute))), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now().Add(-2 * time.Minute))), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -257,7 +260,7 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now().Add(-2 * electionDuration))), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -283,8 +286,8 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - PingTime: ptr.To(metav1.NewMicroTime(time.Now())), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), + PingTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now().Add(-1 * time.Minute))), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -308,7 +311,7 @@ func TestReconcileElectionStep(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{"foo.com/bar"}, }, }, @@ -321,7 +324,7 @@ func TestReconcileElectionStep(t *testing.T) { Spec: v1.LeaseSpec{ HolderIdentity: ptr.To("component-identity-expired"), LeaseDurationSeconds: ptr.To(int32(10)), - RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now().Add(-1 * time.Minute))), }, }, expectLease: true, @@ -344,6 +347,7 @@ func TestReconcileElectionStep(t *testing.T) { client.CoordinationV1(), client.CoordinationV1alpha1(), ) + controller.clock = fakeClock if err != nil { t.Fatal(err) } @@ -435,6 +439,8 @@ func TestReconcileElectionStep(t *testing.T) { } func TestController(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Now()) + cases := []struct { name string leaseNN types.NamespacedName @@ -455,7 +461,7 @@ func TestController(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -485,7 +491,7 @@ func TestController(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -498,7 +504,7 @@ func TestController(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.20.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -511,7 +517,7 @@ func TestController(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.20.0", BinaryVersion: "1.20.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -549,7 +555,7 @@ func TestController(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -590,7 +596,7 @@ func TestController(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.20.0", BinaryVersion: "1.20.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -603,7 +609,7 @@ func TestController(t *testing.T) { LeaseName: "component-A", EmulationVersion: "1.19.0", BinaryVersion: "1.19.0", - RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(fakeClock.Now())), PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, }, }, @@ -680,7 +686,7 @@ func TestController(t *testing.T) { if err == nil { if lease.Spec.PingTime != nil { c := lease.DeepCopy() - c.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} + c.Spec.RenewTime = &metav1.MicroTime{Time: fakeClock.Now()} _, err = client.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Update(ctx, c, metav1.UpdateOptions{}) if err != nil { runtime.HandleError(err) diff --git a/pkg/controlplane/controller/leaderelection/leasecandidategc_controller.go b/pkg/controlplane/controller/leaderelection/leasecandidategc_controller.go index 7485f1fadd9..9e11f623beb 100644 --- a/pkg/controlplane/controller/leaderelection/leasecandidategc_controller.go +++ b/pkg/controlplane/controller/leaderelection/leasecandidategc_controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/kubernetes" listers "k8s.io/client-go/listers/coordination/v1alpha1" "k8s.io/client-go/tools/cache" + "k8s.io/utils/clock" "k8s.io/klog/v2" ) @@ -42,6 +43,8 @@ type LeaseCandidateGCController struct { leaseCandidatesSynced cache.InformerSynced gcCheckPeriod time.Duration + + clock clock.Clock } // NewLeaseCandidateGC creates a new LeaseCandidateGCController. @@ -52,6 +55,7 @@ func NewLeaseCandidateGC(clientset kubernetes.Interface, gcCheckPeriod time.Dura leaseCandidateInformer: leaseCandidateInformer, leaseCandidatesSynced: leaseCandidateInformer.Informer().HasSynced, gcCheckPeriod: gcCheckPeriod, + clock: clock.RealClock{}, } } @@ -80,7 +84,7 @@ func (c *LeaseCandidateGCController) gc(ctx context.Context) { } for _, leaseCandidate := range lcs { // evaluate lease from cache - if !isLeaseCandidateExpired(leaseCandidate) { + if !isLeaseCandidateExpired(c.clock, leaseCandidate) { continue } lc, err := c.kubeclientset.CoordinationV1alpha1().LeaseCandidates(leaseCandidate.Namespace).Get(ctx, leaseCandidate.Name, metav1.GetOptions{}) @@ -89,7 +93,7 @@ func (c *LeaseCandidateGCController) gc(ctx context.Context) { continue } // evaluate lease from apiserver - if !isLeaseCandidateExpired(lc) { + if !isLeaseCandidateExpired(c.clock, lc) { continue } if err := c.kubeclientset.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Delete( diff --git a/pkg/controlplane/controller/leaderelection/leasecandidategc_controller_test.go b/pkg/controlplane/controller/leaderelection/leasecandidategc_controller_test.go index b845d62b8dd..da46af13e65 100644 --- a/pkg/controlplane/controller/leaderelection/leasecandidategc_controller_test.go +++ b/pkg/controlplane/controller/leaderelection/leasecandidategc_controller_test.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" ) @@ -115,9 +114,6 @@ func TestLeaseCandidateGCController(t *testing.T) { leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates() controller := NewLeaseCandidateGC(client, 10*time.Millisecond, leaseCandidateInformer) - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) - // Create lease candidates for _, lc := range tc.leaseCandidates { _, err := client.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Create(ctx, lc, metav1.CreateOptions{}) @@ -126,7 +122,8 @@ func TestLeaseCandidateGCController(t *testing.T) { } } - cache.WaitForCacheSync(ctx.Done(), controller.leaseCandidatesSynced) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) go controller.Run(ctx) err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 600*time.Second, true, func(ctx context.Context) (done bool, err error) {