diff --git a/tools/leaderelection/leasecandidate.go b/tools/leaderelection/leasecandidate.go index f071dd33..98871213 100644 --- a/tools/leaderelection/leasecandidate.go +++ b/tools/leaderelection/leasecandidate.go @@ -18,6 +18,7 @@ package leaderelection import ( "context" + "reflect" "time" v1 "k8s.io/api/coordination/v1" @@ -37,11 +38,15 @@ import ( const requeueInterval = 5 * time.Minute +type CacheSyncWaiter interface { + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool +} + type LeaseCandidate struct { - LeaseClient coordinationv1alpha1client.LeaseCandidateInterface - LeaseCandidateInformer cache.SharedIndexInformer - InformerFactory informers.SharedInformerFactory - HasSynced cache.InformerSynced + leaseClient coordinationv1alpha1client.LeaseCandidateInterface + leaseCandidateInformer cache.SharedIndexInformer + informerFactory informers.SharedInformerFactory + hasSynced cache.InformerSynced // At most there will be one item in this Queue (since we only watch one item) queue workqueue.TypedRateLimitingInterface[int] @@ -52,7 +57,7 @@ type LeaseCandidate struct { // controller lease leaseName string - Clock clock.Clock + clock clock.Clock binaryVersion, emulationVersion string preferredStrategies []v1.CoordinatedLeaseStrategy @@ -62,10 +67,9 @@ func NewCandidate(clientset kubernetes.Interface, candidateName string, candidateNamespace string, targetLease string, - clock clock.Clock, binaryVersion, emulationVersion string, preferredStrategies []v1.CoordinatedLeaseStrategy, -) (*LeaseCandidate, error) { +) (*LeaseCandidate, CacheSyncWaiter, error) { fieldSelector := fields.OneTermEqualSelector("metadata.name", candidateName).String() // A separate informer factory is required because this must start before informerFactories // are started for leader elected components @@ -78,20 +82,20 @@ func NewCandidate(clientset kubernetes.Interface, leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates().Informer() lc := &LeaseCandidate{ - LeaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace), - LeaseCandidateInformer: leaseCandidateInformer, - InformerFactory: informerFactory, + leaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace), + leaseCandidateInformer: leaseCandidateInformer, + informerFactory: informerFactory, name: candidateName, namespace: candidateNamespace, leaseName: targetLease, - Clock: clock, + clock: clock.RealClock{}, binaryVersion: binaryVersion, emulationVersion: emulationVersion, preferredStrategies: preferredStrategies, } lc.queue = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[int](), workqueue.TypedRateLimitingQueueConfig[int]{Name: "leasecandidate"}) - synced, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + h, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj, newObj interface{}) { if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok { if leasecandidate.Spec.PingTime != nil { @@ -101,18 +105,18 @@ func NewCandidate(clientset kubernetes.Interface, }, }) if err != nil { - return nil, err + return nil, nil, err } - lc.HasSynced = synced.HasSynced + lc.hasSynced = h.HasSynced - return lc, nil + return lc, informerFactory, nil } func (c *LeaseCandidate) Run(ctx context.Context) { defer c.queue.ShutDown() - go c.InformerFactory.Start(ctx.Done()) - if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.HasSynced) { + go c.informerFactory.Start(ctx.Done()) + if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) { return } @@ -153,12 +157,12 @@ func (c *LeaseCandidate) enqueueLease() { // ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and // a bool (true if this call created the lease), or any error that occurs. func (c *LeaseCandidate) ensureLease(ctx context.Context) error { - lease, err := c.LeaseClient.Get(ctx, c.name, metav1.GetOptions{}) + lease, err := c.leaseClient.Get(ctx, c.name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { klog.V(2).Infof("Creating lease candidate") // lease does not exist, create it. leaseToCreate := c.newLease() - _, err := c.LeaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}) + _, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}) if err != nil { return err } @@ -169,9 +173,9 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error { } klog.V(2).Infof("lease candidate exists.. renewing") clone := lease.DeepCopy() - clone.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()} + clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} clone.Spec.PingTime = nil - _, err = c.LeaseClient.Update(ctx, clone, metav1.UpdateOptions{}) + _, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{}) if err != nil { return err } @@ -191,6 +195,6 @@ func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate { PreferredStrategies: c.preferredStrategies, }, } - lease.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()} + lease.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} return lease } diff --git a/tools/leaderelection/leasecandidate_test.go b/tools/leaderelection/leasecandidate_test.go index 5265abfc..43e0d2d1 100644 --- a/tools/leaderelection/leasecandidate_test.go +++ b/tools/leaderelection/leasecandidate_test.go @@ -26,7 +26,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/fake" - "k8s.io/utils/clock" ) type testcase struct { @@ -47,12 +46,11 @@ func TestLeaseCandidateCreation(t *testing.T) { defer cancel() client := fake.NewSimpleClientset() - candidate, err := NewCandidate( + candidate, _, err := NewCandidate( client, tc.candidateName, tc.candidateNamespace, tc.leaseName, - clock.RealClock{}, tc.binaryVersion, tc.emulationVersion, []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, @@ -82,12 +80,11 @@ func TestLeaseCandidateAck(t *testing.T) { client := fake.NewSimpleClientset() - candidate, err := NewCandidate( + candidate, _, err := NewCandidate( client, tc.candidateName, tc.candidateNamespace, tc.leaseName, - clock.RealClock{}, tc.binaryVersion, tc.emulationVersion, []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},