diff --git a/tools/leaderelection/leaderelection.go b/tools/leaderelection/leaderelection.go index 5a1194b4..c61c600a 100644 --- a/tools/leaderelection/leaderelection.go +++ b/tools/leaderelection/leaderelection.go @@ -159,6 +159,9 @@ type LeaderElectionConfig struct { // Name is the name of the resource lock for debugging Name string + + // Coordinated will use the Coordinated Leader Election feature + Coordinated bool } // LeaderCallbacks are callbacks that are triggered during certain @@ -249,7 +252,11 @@ func (le *LeaderElector) acquire(ctx context.Context) bool { desc := le.config.Lock.Describe() klog.Infof("attempting to acquire leader lease %v...", desc) wait.JitterUntil(func() { - succeeded = le.tryAcquireOrRenew(ctx) + if !le.config.Coordinated { + succeeded = le.tryAcquireOrRenew(ctx) + } else { + succeeded = le.tryCoordinatedRenew(ctx) + } le.maybeReportTransition() if !succeeded { klog.V(4).Infof("failed to acquire lease %v", desc) @@ -272,7 +279,11 @@ func (le *LeaderElector) renew(ctx context.Context) { timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { - return le.tryAcquireOrRenew(timeoutCtx), nil + if !le.config.Coordinated { + return le.tryAcquireOrRenew(timeoutCtx), nil + } else { + return le.tryCoordinatedRenew(timeoutCtx), nil + } }, timeoutCtx.Done()) le.maybeReportTransition() @@ -282,7 +293,6 @@ func (le *LeaderElector) renew(ctx context.Context) { return } le.metrics.leaderOff(le.config.Name) - klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) @@ -315,6 +325,81 @@ func (le *LeaderElector) release() bool { return true } +// tryCoordinatedRenew checks if it acquired a lease and tries to renew the +// lease if it has already been acquired. Returns true on success else returns +// false. +func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool { + now := metav1.NewTime(le.clock.Now()) + leaderElectionRecord := rl.LeaderElectionRecord{ + HolderIdentity: le.config.Lock.Identity(), + LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), + RenewTime: now, + AcquireTime: now, + } + + // 1. obtain the electionRecord + oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) + if err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) + return false + } + klog.Infof("lease lock not found: %v", le.config.Lock.Describe()) + return false + } + + // 2. Record obtained, check the Identity & Time + if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { + le.setObservedRecord(oldLeaderElectionRecord) + + le.observedRawRecord = oldLeaderElectionRawRecord + } + hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time) + + if hasExpired { + klog.Infof("lock has expired: %v", le.config.Lock.Describe()) + return false + } + + if !le.IsLeader() { + klog.V(4).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe()) + return false + } + + // 2b. If the lease has been marked as "end of term", don't renew it + if le.IsLeader() && oldLeaderElectionRecord.PreferredHolder != "" { + klog.V(4).Infof("lock is marked as 'end of term': %v", le.config.Lock.Describe()) + // TODO: Instead of letting lease expire, the holder may deleted it directly + // This will not be compatible with all controllers, so it needs to be opt-in behavior.. + // We must ensure all code guarded by this lease has successfully completed + // prior to releasing or there may be two processes + // simultaneously acting on the critical path. + // Usually once this returns false, the process is terminated.. + // xref: OnStoppedLeading + return false + } + + // 3. We're going to try to update. The leaderElectionRecord is set to it's default + // here. Let's correct it before updating. + if le.IsLeader() { + leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + leaderElectionRecord.Strategy = oldLeaderElectionRecord.Strategy + le.metrics.slowpathExercised(le.config.Name) + } else { + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 + } + + // update the lock itself + if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { + klog.Errorf("Failed to update lock: %v", err) + return false + } + + le.setObservedRecord(&leaderElectionRecord) + return true +} + // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, // else it tries to renew the lease if it has already been acquired. Returns true // on success else returns false. diff --git a/tools/leaderelection/leaderelection_test.go b/tools/leaderelection/leaderelection_test.go index de481e0a..91959ae3 100644 --- a/tools/leaderelection/leaderelection_test.go +++ b/tools/leaderelection/leaderelection_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -37,8 +38,6 @@ import ( rl "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" - - "github.com/stretchr/testify/assert" ) func createLockObject(t *testing.T, objectType, namespace, name string, record *rl.LeaderElectionRecord) (obj runtime.Object) { @@ -353,6 +352,147 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { } } +func TestTryCoordinatedRenew(t *testing.T) { + objectType := "leases" + clock := clock.RealClock{} + future := clock.Now().Add(1000 * time.Hour) + + tests := []struct { + name string + observedRecord rl.LeaderElectionRecord + observedTime time.Time + retryAfter time.Duration + reactors []Reactor + expectedEvents []string + + expectSuccess bool + transitionLeader bool + outHolder string + }{ + { + name: "don't acquire from led, acked object", + reactors: []Reactor{ + { + verb: "get", + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + }, + observedTime: future, + + expectSuccess: false, + outHolder: "bing", + }, + { + name: "renew already acquired object", + reactors: []Reactor{ + { + verb: "get", + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil + }, + }, + { + verb: "update", + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil + }, + }, + }, + observedTime: future, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"}, + + expectSuccess: true, + outHolder: "baz", + }, + } + + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + var lock rl.Interface + + objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + recorder := record.NewFakeRecorder(100) + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: recorder, + } + c := &fake.Clientset{} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) + } + c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) + + lock = &rl.LeaseLock{ + LeaseMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c.CoordinationV1(), + } + lec := LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, + }, + Coordinated: true, + } + observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord) + le := &LeaderElector{ + config: lec, + observedRecord: test.observedRecord, + observedRawRecord: observedRawRecord, + observedTime: test.observedTime, + clock: clock, + metrics: globalMetricsFactory.newLeaderMetrics(), + } + if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) { + if test.retryAfter != 0 { + time.Sleep(test.retryAfter) + if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) { + t.Errorf("unexpected result of tryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess) + } + } else { + t.Errorf("unexpected result of gryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess) + } + } + + le.observedRecord.AcquireTime = metav1.Time{} + le.observedRecord.RenewTime = metav1.Time{} + if le.observedRecord.HolderIdentity != test.outHolder { + t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity) + } + if len(test.reactors) != len(c.Actions()) { + t.Errorf("wrong number of api interactions") + } + if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { + t.Errorf("leader should have transitioned but did not") + } + if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { + t.Errorf("leader should not have transitioned but did") + } + + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) + } + assertEqualEvents(t, test.expectedEvents, recorder.Events) + }) + } +} + // Will test leader election using lease as the resource func TestTryAcquireOrRenewLeases(t *testing.T) { testTryAcquireOrRenew(t, "leases") diff --git a/tools/leaderelection/leasecandidate.go b/tools/leaderelection/leasecandidate.go new file mode 100644 index 00000000..f071dd33 --- /dev/null +++ b/tools/leaderelection/leasecandidate.go @@ -0,0 +1,196 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "time" + + v1 "k8s.io/api/coordination/v1" + v1alpha1 "k8s.io/api/coordination/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + coordinationv1alpha1client "k8s.io/client-go/kubernetes/typed/coordination/v1alpha1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const requeueInterval = 5 * time.Minute + +type LeaseCandidate struct { + LeaseClient coordinationv1alpha1client.LeaseCandidateInterface + LeaseCandidateInformer cache.SharedIndexInformer + InformerFactory informers.SharedInformerFactory + HasSynced cache.InformerSynced + + // At most there will be one item in this Queue (since we only watch one item) + queue workqueue.TypedRateLimitingInterface[int] + + name string + namespace string + + // controller lease + leaseName string + + Clock clock.Clock + + binaryVersion, emulationVersion string + preferredStrategies []v1.CoordinatedLeaseStrategy +} + +func NewCandidate(clientset kubernetes.Interface, + candidateName string, + candidateNamespace string, + targetLease string, + clock clock.Clock, + binaryVersion, emulationVersion string, + preferredStrategies []v1.CoordinatedLeaseStrategy, +) (*LeaseCandidate, error) { + fieldSelector := fields.OneTermEqualSelector("metadata.name", candidateName).String() + // A separate informer factory is required because this must start before informerFactories + // are started for leader elected components + informerFactory := informers.NewSharedInformerFactoryWithOptions( + clientset, 5*time.Minute, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fieldSelector + }), + ) + leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates().Informer() + + lc := &LeaseCandidate{ + LeaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace), + LeaseCandidateInformer: leaseCandidateInformer, + InformerFactory: informerFactory, + name: candidateName, + namespace: candidateNamespace, + leaseName: targetLease, + Clock: clock, + binaryVersion: binaryVersion, + emulationVersion: emulationVersion, + preferredStrategies: preferredStrategies, + } + lc.queue = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[int](), workqueue.TypedRateLimitingQueueConfig[int]{Name: "leasecandidate"}) + + synced, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok { + if leasecandidate.Spec.PingTime != nil { + lc.enqueueLease() + } + } + }, + }) + if err != nil { + return nil, err + } + lc.HasSynced = synced.HasSynced + + return lc, nil +} + +func (c *LeaseCandidate) Run(ctx context.Context) { + defer c.queue.ShutDown() + + go c.InformerFactory.Start(ctx.Done()) + if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.HasSynced) { + return + } + + c.enqueueLease() + go c.runWorker(ctx) + <-ctx.Done() +} + +func (c *LeaseCandidate) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *LeaseCandidate) processNextWorkItem(ctx context.Context) bool { + key, shutdown := c.queue.Get() + if shutdown { + return false + } + defer c.queue.Done(key) + + err := c.ensureLease(ctx) + if err == nil { + c.queue.AddAfter(key, requeueInterval) + return true + } + + utilruntime.HandleError(err) + klog.Infof("processNextWorkItem.AddRateLimited: %v", key) + c.queue.AddRateLimited(key) + + return true +} + +func (c *LeaseCandidate) enqueueLease() { + c.queue.Add(0) +} + +// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and +// a bool (true if this call created the lease), or any error that occurs. +func (c *LeaseCandidate) ensureLease(ctx context.Context) error { + lease, err := c.LeaseClient.Get(ctx, c.name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + klog.V(2).Infof("Creating lease candidate") + // lease does not exist, create it. + leaseToCreate := c.newLease() + _, err := c.LeaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}) + if err != nil { + return err + } + klog.V(2).Infof("Created lease candidate") + return nil + } else if err != nil { + return err + } + klog.V(2).Infof("lease candidate exists.. renewing") + clone := lease.DeepCopy() + clone.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()} + clone.Spec.PingTime = nil + _, err = c.LeaseClient.Update(ctx, clone, metav1.UpdateOptions{}) + if err != nil { + return err + } + return nil +} + +func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate { + lease := &v1alpha1.LeaseCandidate{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.name, + Namespace: c.namespace, + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: c.leaseName, + BinaryVersion: c.binaryVersion, + EmulationVersion: c.emulationVersion, + PreferredStrategies: c.preferredStrategies, + }, + } + lease.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()} + return lease +} diff --git a/tools/leaderelection/leasecandidate_test.go b/tools/leaderelection/leasecandidate_test.go new file mode 100644 index 00000000..5265abfc --- /dev/null +++ b/tools/leaderelection/leasecandidate_test.go @@ -0,0 +1,146 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "testing" + "time" + + v1 "k8s.io/api/coordination/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/clock" +) + +type testcase struct { + candidateName, candidateNamespace, leaseName string + binaryVersion, emulationVersion string +} + +func TestLeaseCandidateCreation(t *testing.T) { + tc := testcase{ + candidateName: "foo", + candidateNamespace: "default", + leaseName: "lease", + binaryVersion: "1.30.0", + emulationVersion: "1.30.0", + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + client := fake.NewSimpleClientset() + candidate, err := NewCandidate( + client, + tc.candidateName, + tc.candidateNamespace, + tc.leaseName, + clock.RealClock{}, + tc.binaryVersion, + tc.emulationVersion, + []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + ) + if err != nil { + t.Fatal(err) + } + + go candidate.Run(ctx) + err = pollForLease(ctx, tc, client, nil) + if err != nil { + t.Fatal(err) + } +} + +func TestLeaseCandidateAck(t *testing.T) { + tc := testcase{ + candidateName: "foo", + candidateNamespace: "default", + leaseName: "lease", + binaryVersion: "1.30.0", + emulationVersion: "1.30.0", + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + client := fake.NewSimpleClientset() + + candidate, err := NewCandidate( + client, + tc.candidateName, + tc.candidateNamespace, + tc.leaseName, + clock.RealClock{}, + tc.binaryVersion, + tc.emulationVersion, + []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + ) + if err != nil { + t.Fatal(err) + } + + go candidate.Run(ctx) + err = pollForLease(ctx, tc, client, nil) + if err != nil { + t.Fatal(err) + } + + // Update PingTime and verify that the client renews + ensureAfter := &metav1.MicroTime{Time: time.Now()} + lc, err := client.CoordinationV1alpha1().LeaseCandidates(tc.candidateNamespace).Get(ctx, tc.candidateName, metav1.GetOptions{}) + if err == nil { + if lc.Spec.PingTime == nil { + c := lc.DeepCopy() + c.Spec.PingTime = &metav1.MicroTime{Time: time.Now()} + _, err = client.CoordinationV1alpha1().LeaseCandidates(tc.candidateNamespace).Update(ctx, c, metav1.UpdateOptions{}) + if err != nil { + t.Error(err) + } + } + } + err = pollForLease(ctx, tc, client, ensureAfter) + if err != nil { + t.Fatal(err) + } +} + +func pollForLease(ctx context.Context, tc testcase, client *fake.Clientset, t *metav1.MicroTime) error { + return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + lc, err := client.CoordinationV1alpha1().LeaseCandidates(tc.candidateNamespace).Get(ctx, tc.candidateName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return true, err + } + if lc.Spec.BinaryVersion == tc.binaryVersion && + lc.Spec.EmulationVersion == tc.emulationVersion && + lc.Spec.LeaseName == tc.leaseName && + lc.Spec.PingTime == nil && + lc.Spec.RenewTime != nil { + // Ensure that if a time is provided, the renewTime occurred after the provided time. + if t != nil && t.After(lc.Spec.RenewTime.Time) { + return false, nil + } + return true, nil + } + return false, nil + }) +} diff --git a/tools/leaderelection/resourcelock/interface.go b/tools/leaderelection/resourcelock/interface.go index 483753d6..053a7570 100644 --- a/tools/leaderelection/resourcelock/interface.go +++ b/tools/leaderelection/resourcelock/interface.go @@ -19,14 +19,15 @@ package resourcelock import ( "context" "fmt" - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" "time" + v1 "k8s.io/api/coordination/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + clientset "k8s.io/client-go/kubernetes" coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + restclient "k8s.io/client-go/rest" ) const ( @@ -114,11 +115,13 @@ type LeaderElectionRecord struct { // attempt to acquire leases with empty identities and will wait for the full lease // interval to expire before attempting to reacquire. This value is set to empty when // a client voluntarily steps down. - HolderIdentity string `json:"holderIdentity"` - LeaseDurationSeconds int `json:"leaseDurationSeconds"` - AcquireTime metav1.Time `json:"acquireTime"` - RenewTime metav1.Time `json:"renewTime"` - LeaderTransitions int `json:"leaderTransitions"` + HolderIdentity string `json:"holderIdentity"` + LeaseDurationSeconds int `json:"leaseDurationSeconds"` + AcquireTime metav1.Time `json:"acquireTime"` + RenewTime metav1.Time `json:"renewTime"` + LeaderTransitions int `json:"leaderTransitions"` + Strategy v1.CoordinatedLeaseStrategy `json:"strategy"` + PreferredHolder string `json:"preferredHolder"` } // EventRecorder records a change in the ResourceLock. diff --git a/tools/leaderelection/resourcelock/leaselock.go b/tools/leaderelection/resourcelock/leaselock.go index 8a9d7d60..7cd2a8b9 100644 --- a/tools/leaderelection/resourcelock/leaselock.go +++ b/tools/leaderelection/resourcelock/leaselock.go @@ -122,6 +122,12 @@ func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElec if spec.RenewTime != nil { r.RenewTime = metav1.Time{Time: spec.RenewTime.Time} } + if spec.PreferredHolder != nil { + r.PreferredHolder = *spec.PreferredHolder + } + if spec.Strategy != nil { + r.Strategy = *spec.Strategy + } return &r } @@ -129,11 +135,18 @@ func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElec func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec { leaseDurationSeconds := int32(ler.LeaseDurationSeconds) leaseTransitions := int32(ler.LeaderTransitions) - return coordinationv1.LeaseSpec{ + spec := coordinationv1.LeaseSpec{ HolderIdentity: &ler.HolderIdentity, LeaseDurationSeconds: &leaseDurationSeconds, AcquireTime: &metav1.MicroTime{Time: ler.AcquireTime.Time}, RenewTime: &metav1.MicroTime{Time: ler.RenewTime.Time}, LeaseTransitions: &leaseTransitions, } + if ler.PreferredHolder != "" { + spec.PreferredHolder = &ler.PreferredHolder + } + if ler.Strategy != "" { + spec.Strategy = &ler.Strategy + } + return spec }