From c47ff1e1a9aec44f262674eb6cdbabf80512d981 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Sun, 21 Jul 2024 20:06:03 +0000 Subject: [PATCH] CLE controller and client changes --- .../app/controllermanager.go | 30 +- cmd/kube-scheduler/app/server.go | 37 +- hack/local-up-cluster.sh | 1 + pkg/controlplane/apiserver/server.go | 23 + .../controller/leaderelection/election.go | 135 ++++ .../leaderelection/election_test.go | 522 +++++++++++++ .../leaderelection_controller.go | 399 ++++++++++ .../leaderelection_controller_test.go | 698 ++++++++++++++++++ .../leaderelection/run_with_leaderelection.go | 91 +++ staging/publishing/import-restrictions.yaml | 1 + .../tools/leaderelection/leaderelection.go | 91 ++- .../leaderelection/leaderelection_test.go | 144 +++- .../tools/leaderelection/leasecandidate.go | 196 +++++ .../leaderelection/leasecandidate_test.go | 146 ++++ .../leaderelection/resourcelock/interface.go | 17 +- .../leaderelection/resourcelock/leaselock.go | 15 +- .../coordinated_leader_election_test.go | 296 ++++++++ 17 files changed, 2827 insertions(+), 15 deletions(-) create mode 100644 pkg/controlplane/controller/leaderelection/election.go create mode 100644 pkg/controlplane/controller/leaderelection/election_test.go create mode 100644 pkg/controlplane/controller/leaderelection/leaderelection_controller.go create mode 100644 pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go create mode 100644 pkg/controlplane/controller/leaderelection/run_with_leaderelection.go create mode 100644 staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go create mode 100644 staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go create mode 100644 test/integration/apiserver/coordinated_leader_election_test.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 8885f9f9f1c..0223bdfdd95 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -28,8 +28,9 @@ import ( "sort" "time" + "github.com/blang/semver/v4" "github.com/spf13/cobra" - + v1 "k8s.io/api/coordination/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -78,7 +79,9 @@ import ( kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config" garbagecollector "k8s.io/kubernetes/pkg/controller/garbagecollector" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/serviceaccount" + "k8s.io/utils/clock" ) func init() { @@ -289,6 +292,30 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { return startSATokenControllerInit(ctx, controllerContext, controllerName) } } + ver, err := semver.ParseTolerant(version.Get().String()) + if err != nil { + return err + } + + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) { + // Start component identity lease management + leaseCandidate, err := leaderelection.NewCandidate( + c.Client, + id, + "kube-system", + "kube-controller-manager", + clock.RealClock{}, + ver.FinalizeVersion(), + ver.FinalizeVersion(), // TODO: Use compatibility version when it's available + []v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"}, + ) + if err != nil { + return err + } + healthzHandler.AddHealthChecker(healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory)) + + go leaseCandidate.Run(ctx) + } // Start the main lock go leaderElectAndRun(ctx, c, id, electionChecker, @@ -886,6 +913,7 @@ func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdent Callbacks: callbacks, WatchDog: electionChecker, Name: leaseName, + Coordinated: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection), }) panic("unreachable") diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 9cbc044f111..08f59829d4c 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -24,8 +24,9 @@ import ( "os" goruntime "runtime" + "github.com/blang/semver/v4" "github.com/spf13/cobra" - + coordinationv1 "k8s.io/api/coordination/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/authentication/authenticator" @@ -56,8 +57,11 @@ import ( "k8s.io/component-base/version" "k8s.io/component-base/version/verflag" "k8s.io/klog/v2" + "k8s.io/utils/clock" + schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" "k8s.io/kubernetes/cmd/kube-scheduler/app/options" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/latest" @@ -207,6 +211,34 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * }) readyzChecks = append(readyzChecks, handlerSyncCheck) + if cc.LeaderElection != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) { + binaryVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).BinaryVersion().String()) + if err != nil { + return err + } + emulationVersion, err := semver.ParseTolerant(utilversion.DefaultComponentGlobalsRegistry.EffectiveVersionFor(utilversion.DefaultKubeComponent).EmulationVersion().String()) + if err != nil { + return err + } + + // Start component identity lease management + leaseCandidate, err := leaderelection.NewCandidate( + cc.Client, + cc.LeaderElection.Lock.Identity(), + "kube-system", + "kube-scheduler", + clock.RealClock{}, + binaryVersion.FinalizeVersion(), + emulationVersion.FinalizeVersion(), + []coordinationv1.CoordinatedLeaseStrategy{"OldestEmulationVersion"}, + ) + if err != nil { + return err + } + readyzChecks = append(readyzChecks, healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory)) + go leaseCandidate.Run(ctx) + } + // Start up the healthz server. if cc.SecureServing != nil { handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks, readyzChecks), cc.Authentication.Authenticator, cc.Authorization.Authorizer) @@ -245,6 +277,9 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * } // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) { + cc.LeaderElection.Coordinated = true + } cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { close(waitingForLeader) diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index 31883ac147b..f381eb442e8 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -1027,6 +1027,7 @@ EOF --feature-gates="${FEATURE_GATES}" \ --authentication-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \ --authorization-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \ + --leader-elect=false \ --master="https://${API_HOST}:${API_SECURE_PORT}" >"${SCHEDULER_LOG}" 2>&1 & SCHEDULER_PID=$! } diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index 534d9fe85cc..3f11324456d 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -17,6 +17,7 @@ limitations under the License. package apiserver import ( + "context" "fmt" "os" "time" @@ -41,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc" "k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust" + "k8s.io/kubernetes/pkg/controlplane/controller/leaderelection" "k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking" "k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces" "k8s.io/kubernetes/pkg/features" @@ -145,6 +147,27 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele return nil, fmt.Errorf("failed to get listener address: %w", err) } + if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.CoordinatedLeaderElection) { + leaseInformer := s.VersionedInformers.Coordination().V1().Leases() + lcInformer := s.VersionedInformers.Coordination().V1alpha1().LeaseCandidates() + // Ensure that informers are registered before starting. Coordinated Leader Election leader-elected + // and may register informer handlers after they are started. + _ = leaseInformer.Informer() + _ = lcInformer.Informer() + s.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-coordinated-leader-election-controller", func(hookContext genericapiserver.PostStartHookContext) error { + go leaderelection.RunWithLeaderElection(hookContext, s.GenericAPIServer.LoopbackClientConfig, func() (func(ctx context.Context, workers int), error) { + controller, err := leaderelection.NewController( + leaseInformer, + lcInformer, + client.CoordinationV1(), + client.CoordinationV1alpha1(), + ) + return controller.Run, err + }) + return nil + }) + } + if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.Generic.PublicAddress, publicServicePort) peerEndpointCtrl := peerreconcilers.New( diff --git a/pkg/controlplane/controller/leaderelection/election.go b/pkg/controlplane/controller/leaderelection/election.go new file mode 100644 index 00000000000..d92e1c1a4f6 --- /dev/null +++ b/pkg/controlplane/controller/leaderelection/election.go @@ -0,0 +1,135 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "slices" + "time" + + "github.com/blang/semver/v4" + v1 "k8s.io/api/coordination/v1" + v1alpha1 "k8s.io/api/coordination/v1alpha1" + "k8s.io/klog/v2" +) + +func pickBestLeaderOldestEmulationVersion(candidates []*v1alpha1.LeaseCandidate) *v1alpha1.LeaseCandidate { + var electee *v1alpha1.LeaseCandidate + for _, c := range candidates { + if !validLeaseCandidateForOldestEmulationVersion(c) { + continue + } + if electee == nil || compare(electee, c) > 0 { + electee = c + } + } + if electee == nil { + klog.Infof("pickBestLeader: none found") + } else { + klog.Infof("pickBestLeader: %s %s", electee.Namespace, electee.Name) + } + return electee +} + +func shouldReelect(candidates []*v1alpha1.LeaseCandidate, currentLeader *v1alpha1.LeaseCandidate) bool { + klog.Infof("shouldReelect for candidates: %+v", candidates) + pickedLeader := pickBestLeaderOldestEmulationVersion(candidates) + if pickedLeader == nil { + return false + } + return compare(currentLeader, pickedLeader) > 0 +} + +func pickBestStrategy(candidates []*v1alpha1.LeaseCandidate) v1.CoordinatedLeaseStrategy { + // TODO: This doesn't account for cycles within the preference graph + // We may have to do a topological sort to verify that the preference ordering is valid + var bestStrategy *v1.CoordinatedLeaseStrategy + for _, c := range candidates { + if len(c.Spec.PreferredStrategies) > 0 { + if bestStrategy == nil { + bestStrategy = &c.Spec.PreferredStrategies[0] + continue + } + if *bestStrategy != c.Spec.PreferredStrategies[0] { + if idx := slices.Index(c.Spec.PreferredStrategies, *bestStrategy); idx > 0 { + bestStrategy = &c.Spec.PreferredStrategies[0] + } else { + klog.Infof("Error: bad strategy ordering") + } + } + } + } + return (*bestStrategy) +} + +func validLeaseCandidateForOldestEmulationVersion(l *v1alpha1.LeaseCandidate) bool { + _, err := semver.ParseTolerant(l.Spec.EmulationVersion) + if err != nil { + return false + } + _, err = semver.ParseTolerant(l.Spec.BinaryVersion) + return err == nil +} + +func getEmulationVersion(l *v1alpha1.LeaseCandidate) semver.Version { + value := l.Spec.EmulationVersion + v, err := semver.ParseTolerant(value) + if err != nil { + return semver.Version{} + } + return v +} + +func getBinaryVersion(l *v1alpha1.LeaseCandidate) semver.Version { + value := l.Spec.BinaryVersion + v, err := semver.ParseTolerant(value) + if err != nil { + return semver.Version{} + } + return v +} + +// -1: lhs better, 1: rhs better +func compare(lhs, rhs *v1alpha1.LeaseCandidate) int { + lhsVersion := getEmulationVersion(lhs) + rhsVersion := getEmulationVersion(rhs) + result := lhsVersion.Compare(rhsVersion) + if result == 0 { + lhsVersion := getBinaryVersion(lhs) + rhsVersion := getBinaryVersion(rhs) + result = lhsVersion.Compare(rhsVersion) + } + if result == 0 { + if lhs.CreationTimestamp.After(rhs.CreationTimestamp.Time) { + return 1 + } + return -1 + } + return result +} + +func isLeaseExpired(lease *v1.Lease) bool { + currentTime := time.Now() + return lease.Spec.RenewTime == nil || + lease.Spec.LeaseDurationSeconds == nil || + lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second).Before(currentTime) +} + +func isLeaseCandidateExpired(lease *v1alpha1.LeaseCandidate) bool { + currentTime := time.Now() + return lease.Spec.RenewTime == nil || + lease.Spec.RenewTime.Add(leaseCandidateValidDuration).Before(currentTime) +} diff --git a/pkg/controlplane/controller/leaderelection/election_test.go b/pkg/controlplane/controller/leaderelection/election_test.go new file mode 100644 index 00000000000..4a1277afeeb --- /dev/null +++ b/pkg/controlplane/controller/leaderelection/election_test.go @@ -0,0 +1,522 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "testing" + "time" + + "github.com/blang/semver/v4" + v1 "k8s.io/api/coordination/v1" + v1alpha1 "k8s.io/api/coordination/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestPickBestLeaderOldestEmulationVersion(t *testing.T) { + tests := []struct { + name string + candidates []*v1alpha1.LeaseCandidate + want *v1alpha1.LeaseCandidate + }{ + { + name: "empty", + candidates: []*v1alpha1.LeaseCandidate{}, + want: nil, + }, + { + name: "single candidate", + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate1", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.1.0", + }, + }, + }, + want: &v1alpha1.LeaseCandidate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate1", + Namespace: "default", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.1.0", + }, + }, + }, + { + name: "multiple candidates, different emulation versions", + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate1", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now().Add(-1 * time.Hour)}, + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.1.0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate2", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.2.0", + BinaryVersion: "0.2.0", + }, + }, + }, + want: &v1alpha1.LeaseCandidate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate1", + Namespace: "default", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "v1", + BinaryVersion: "v1", + }, + }, + }, + { + name: "multiple candidates, same emulation versions, different binary versions", + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate1", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now().Add(-1 * time.Hour)}, + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.1.0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate2", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.2.0", + }, + }, + }, + want: &v1alpha1.LeaseCandidate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate1", + Namespace: "default", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.1.0", + }, + }, + }, + { + name: "multiple candidates, same emulation versions, same binary versions, different creation timestamps", + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate1", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now().Add(-1 * time.Hour)}, + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.1.0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate2", + Namespace: "default", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.1.0", + }, + }, + }, + want: &v1alpha1.LeaseCandidate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "candidate1", + Namespace: "default", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.1.0", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := pickBestLeaderOldestEmulationVersion(tt.candidates) + if got != nil && tt.want != nil { + if got.Name != tt.want.Name || got.Namespace != tt.want.Namespace { + t.Errorf("pickBestLeaderOldestEmulationVersion() = %v, want %v", got, tt.want) + } + } else if got != tt.want { + t.Errorf("pickBestLeaderOldestEmulationVersion() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestValidLeaseCandidateForOldestEmulationVersion(t *testing.T) { + tests := []struct { + name string + candidate *v1alpha1.LeaseCandidate + want bool + }{ + { + name: "valid emulation and binary versions", + candidate: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "0.1.0", + }, + }, + want: true, + }, + { + name: "invalid emulation version", + candidate: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "invalid", + BinaryVersion: "0.1.0", + }, + }, + want: false, + }, + { + name: "invalid binary version", + candidate: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + BinaryVersion: "invalid", + }, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := validLeaseCandidateForOldestEmulationVersion(tt.candidate) + if got != tt.want { + t.Errorf("validLeaseCandidateForOldestEmulationVersion() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetEmulationVersion(t *testing.T) { + tests := []struct { + name string + candidate *v1alpha1.LeaseCandidate + want semver.Version + }{ + { + name: "valid emulation version", + candidate: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "0.1.0", + }, + }, + want: semver.MustParse("0.1.0"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := getEmulationVersion(tt.candidate) + if got.FinalizeVersion() != tt.want.FinalizeVersion() { + t.Errorf("getEmulationVersion() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetBinaryVersion(t *testing.T) { + tests := []struct { + name string + candidate *v1alpha1.LeaseCandidate + want semver.Version + }{ + { + name: "valid binary version", + candidate: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + BinaryVersion: "0.3.0", + }, + }, + want: semver.MustParse("0.3.0"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := getBinaryVersion(tt.candidate) + if got.FinalizeVersion() != tt.want.FinalizeVersion() { + t.Errorf("getBinaryVersion() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestCompare(t *testing.T) { + nowTime := time.Now() + cases := []struct { + name string + lhs *v1alpha1.LeaseCandidate + rhs *v1alpha1.LeaseCandidate + expectedResult int + }{ + { + name: "identical versions earlier timestamp", + lhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.20.0", + BinaryVersion: "1.21.0", + }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: nowTime.Add(time.Duration(1))}, + }, + }, + rhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.20.0", + BinaryVersion: "1.21.0", + }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: nowTime}, + }, + }, + expectedResult: 1, + }, + { + name: "no lhs version", + lhs: &v1alpha1.LeaseCandidate{}, + rhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.20.0", + BinaryVersion: "1.21.0", + }, + }, + expectedResult: -1, + }, + { + name: "no rhs version", + lhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.20.0", + BinaryVersion: "1.21.0", + }, + }, + rhs: &v1alpha1.LeaseCandidate{}, + expectedResult: 1, + }, + { + name: "invalid lhs version", + lhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "xyz", + BinaryVersion: "xyz", + }, + }, + rhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.20.0", + BinaryVersion: "1.21.0", + }, + }, + expectedResult: -1, + }, + { + name: "invalid rhs version", + lhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.20.0", + BinaryVersion: "1.21.0", + }, + }, + rhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "xyz", + BinaryVersion: "xyz", + }, + }, + expectedResult: 1, + }, + { + name: "lhs less than rhs", + lhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.19.0", + BinaryVersion: "1.20.0", + }, + }, + rhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.20.0", + BinaryVersion: "1.20.0", + }, + }, + expectedResult: -1, + }, + { + name: "rhs less than lhs", + lhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.20.0", + BinaryVersion: "1.20.0", + }, + }, + rhs: &v1alpha1.LeaseCandidate{ + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.19.0", + BinaryVersion: "1.20.0", + }, + }, + expectedResult: 1, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + result := compare(tc.lhs, tc.rhs) + if result != tc.expectedResult { + t.Errorf("Expected comparison result of %d but got %d", tc.expectedResult, result) + } + }) + } +} + +func TestShouldReelect(t *testing.T) { + cases := []struct { + name string + candidates []*v1alpha1.LeaseCandidate + currentLeader *v1alpha1.LeaseCandidate + expectResult bool + }{ + { + name: "candidate with newer binary version", + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "component-identity-2", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.19.0", + BinaryVersion: "1.20.0", + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + currentLeader: &v1alpha1.LeaseCandidate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + expectResult: false, + }, + { + name: "no newer candidates", + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "component-identity-2", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + currentLeader: &v1alpha1.LeaseCandidate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + expectResult: false, + }, + { + name: "no candidates", + candidates: []*v1alpha1.LeaseCandidate{}, + currentLeader: &v1alpha1.LeaseCandidate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + expectResult: false, + }, + // TODO: Add test cases where candidates have invalid version numbers + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + result := shouldReelect(tc.candidates, tc.currentLeader) + if tc.expectResult != result { + t.Errorf("Expected %t but got %t", tc.expectResult, result) + } + }) + } +} diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go new file mode 100644 index 00000000000..789d6bd8bc0 --- /dev/null +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go @@ -0,0 +1,399 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "fmt" + "time" + + v1 "k8s.io/api/coordination/v1" + v1alpha1 "k8s.io/api/coordination/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + coordinationv1informers "k8s.io/client-go/informers/coordination/v1" + coordinationv1alpha1 "k8s.io/client-go/informers/coordination/v1alpha1" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" + coordinationv1alpha1client "k8s.io/client-go/kubernetes/typed/coordination/v1alpha1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" +) + +const ( + controllerName = "leader-election-controller" + ElectedByAnnotationName = "coordination.k8s.io/elected-by" // Value should be set to controllerName + + // Requeue interval is the interval at which a Lease is requeued to verify that it is being renewed properly. + requeueInterval = 5 * time.Second + defaultLeaseDurationSeconds int32 = 5 + + electionDuration = 5 * time.Second + + leaseCandidateValidDuration = 5 * time.Minute +) + +// Controller is the leader election controller, which observes component identity leases for +// components that have self-nominated as candidate leaders for leases and elects leaders +// for those leases, favoring candidates with higher versions. +type Controller struct { + leaseInformer coordinationv1informers.LeaseInformer + leaseClient coordinationv1client.CoordinationV1Interface + leaseRegistration cache.ResourceEventHandlerRegistration + + leaseCandidateInformer coordinationv1alpha1.LeaseCandidateInformer + leaseCandidateClient coordinationv1alpha1client.CoordinationV1alpha1Interface + leaseCandidateRegistration cache.ResourceEventHandlerRegistration + + queue workqueue.TypedRateLimitingInterface[types.NamespacedName] +} + +func (c *Controller) Run(ctx context.Context, workers int) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + defer func() { + err := c.leaseInformer.Informer().RemoveEventHandler(c.leaseRegistration) + if err != nil { + klog.Warning("error removing leaseInformer eventhandler") + } + err = c.leaseCandidateInformer.Informer().RemoveEventHandler(c.leaseCandidateRegistration) + if err != nil { + klog.Warning("error removing leaseCandidateInformer eventhandler") + } + }() + + if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), c.leaseRegistration.HasSynced, c.leaseCandidateRegistration.HasSynced) { + return + } + + // This controller is leader elected and may start after informers have already started. List on startup. + lcs, err := c.leaseCandidateInformer.Lister().List(labels.Everything()) + if err != nil { + utilruntime.HandleError(err) + return + } + for _, lc := range lcs { + c.processCandidate(lc) + } + + klog.Infof("Workers: %d", workers) + for i := 0; i < workers; i++ { + klog.Infof("Starting worker") + go wait.UntilWithContext(ctx, c.runElectionWorker, time.Second) + } + <-ctx.Done() +} + +func NewController(leaseInformer coordinationv1informers.LeaseInformer, leaseCandidateInformer coordinationv1alpha1.LeaseCandidateInformer, leaseClient coordinationv1client.CoordinationV1Interface, leaseCandidateClient coordinationv1alpha1client.CoordinationV1alpha1Interface) (*Controller, error) { + c := &Controller{ + leaseInformer: leaseInformer, + leaseCandidateInformer: leaseCandidateInformer, + leaseClient: leaseClient, + leaseCandidateClient: leaseCandidateClient, + + queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{Name: controllerName}), + } + leaseSynced, err := leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.processLease(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + c.processLease(newObj) + }, + DeleteFunc: func(oldObj interface{}) { + c.processLease(oldObj) + }, + }) + + if err != nil { + return nil, err + } + leaseCandidateSynced, err := leaseCandidateInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.processCandidate(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + c.processCandidate(newObj) + }, + DeleteFunc: func(oldObj interface{}) { + c.processCandidate(oldObj) + }, + }) + + if err != nil { + return nil, err + } + c.leaseRegistration = leaseSynced + c.leaseCandidateRegistration = leaseCandidateSynced + return c, nil +} + +func (c *Controller) runElectionWorker(ctx context.Context) { + for c.processNextElectionItem(ctx) { + } +} + +func (c *Controller) processNextElectionItem(ctx context.Context) bool { + key, shutdown := c.queue.Get() + if shutdown { + return false + } + + completed, err := c.reconcileElectionStep(ctx, key) + utilruntime.HandleError(err) + if completed { + defer c.queue.AddAfter(key, requeueInterval) + } + c.queue.Done(key) + return true +} + +func (c *Controller) processCandidate(obj any) { + lc, ok := obj.(*v1alpha1.LeaseCandidate) + if !ok { + return + } + if lc == nil { + return + } + // Ignore candidates that transitioned to Pending because reelection is already in progress + if lc.Spec.PingTime != nil { + return + } + c.queue.Add(types.NamespacedName{Namespace: lc.Namespace, Name: lc.Spec.LeaseName}) +} + +func (c *Controller) processLease(obj any) { + lease, ok := obj.(*v1.Lease) + if !ok { + return + } + c.queue.Add(types.NamespacedName{Namespace: lease.Namespace, Name: lease.Name}) +} + +func (c *Controller) electionNeeded(candidates []*v1alpha1.LeaseCandidate, leaseNN types.NamespacedName) (bool, error) { + lease, err := c.leaseInformer.Lister().Leases(leaseNN.Namespace).Get(leaseNN.Name) + if err != nil && !apierrors.IsNotFound(err) { + return false, fmt.Errorf("error reading lease") + } else if apierrors.IsNotFound(err) { + return true, nil + } + + if isLeaseExpired(lease) || lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity == "" { + return true, nil + } + + prelimStrategy := pickBestStrategy(candidates) + if prelimStrategy != v1.OldestEmulationVersion { + klog.V(2).Infof("strategy %s is not recognized by CLE.", prelimStrategy) + return false, nil + } + + prelimElectee := pickBestLeaderOldestEmulationVersion(candidates) + if prelimElectee == nil { + return false, nil + } else if lease != nil && lease.Spec.HolderIdentity != nil && prelimElectee.Name == *lease.Spec.HolderIdentity { + klog.V(2).Infof("Leader %s is already most optimal for lease %s %s", prelimElectee.Name, lease.Namespace, lease.Name) + return false, nil + } + return true, nil +} + +// reconcileElectionStep steps through a step in an election. +// A step looks at the current state of Lease and LeaseCandidates and takes one of the following action +// - do nothing (because leader is already optimal or still waiting for an event) +// - request ack from candidates (update LeaseCandidate PingTime) +// - finds the most optimal candidate and elect (update the Lease object) +// Instead of keeping a map and lock on election, the state is +// calculated every time by looking at the lease, and set of available candidates. +// PingTime + electionDuration > time.Now: We just asked all candidates to ack and are still waiting for response +// PingTime + electionDuration < time.Now: Candidate has not responded within the appropriate PingTime. Continue the election. +// RenewTime + 5 seconds > time.Now: All candidates acked in the last 5 seconds, continue the election. +func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.NamespacedName) (requeue bool, err error) { + now := time.Now() + + candidates, err := c.listAdmissableCandidates(leaseNN) + if err != nil { + return true, err + } else if len(candidates) == 0 { + return false, nil + } + klog.V(4).Infof("reconcileElectionStep %q %q, candidates: %d", leaseNN.Namespace, leaseNN.Name, len(candidates)) + + // Check if an election is really needed by looking at the current lease + // and set of candidates + needElection, err := c.electionNeeded(candidates, leaseNN) + if !needElection || err != nil { + return needElection, err + } + + fastTrackElection := false + + for _, candidate := range candidates { + // If a candidate has a PingTime within the election duration, they have not acked + // and we should wait until we receive their response + if candidate.Spec.PingTime != nil { + if candidate.Spec.PingTime.Add(electionDuration).After(now) { + // continue waiting for the election to timeout + return false, nil + } else { + // election timed out without ack. Clear and start election. + fastTrackElection = true + clone := candidate.DeepCopy() + clone.Spec.PingTime = nil + _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) + if err != nil { + return false, err + } + break + } + } + } + + if !fastTrackElection { + continueElection := true + for _, candidate := range candidates { + // if renewTime of a candidate is longer than electionDuration old, we have to ping. + if candidate.Spec.RenewTime != nil && candidate.Spec.RenewTime.Add(electionDuration).Before(now) { + continueElection = false + break + } + } + if !continueElection { + // Send an "are you alive" signal to all candidates + for _, candidate := range candidates { + clone := candidate.DeepCopy() + clone.Spec.PingTime = &metav1.MicroTime{Time: time.Now()} + _, err := c.leaseCandidateClient.LeaseCandidates(clone.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) + if err != nil { + return false, err + } + } + return true, nil + } + } + + var ackedCandidates []*v1alpha1.LeaseCandidate + for _, candidate := range candidates { + if candidate.Spec.RenewTime.Add(electionDuration).After(now) { + ackedCandidates = append(ackedCandidates, candidate) + } + } + if len(ackedCandidates) == 0 { + return false, fmt.Errorf("no available candidates") + } + + strategy := pickBestStrategy(ackedCandidates) + if strategy != v1.OldestEmulationVersion { + klog.V(2).Infof("strategy %s is not recognized by CLE.", strategy) + return false, nil + } + electee := pickBestLeaderOldestEmulationVersion(ackedCandidates) + + if electee == nil { + return false, fmt.Errorf("should not happen, could not find suitable electee") + } + + electeeName := electee.Name + // create the leader election lease + leaderLease := &v1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: leaseNN.Namespace, + Name: leaseNN.Name, + Annotations: map[string]string{ + ElectedByAnnotationName: controllerName, + }, + }, + Spec: v1.LeaseSpec{ + HolderIdentity: &electeeName, + Strategy: &strategy, + LeaseDurationSeconds: ptr.To(defaultLeaseDurationSeconds), + RenewTime: &metav1.MicroTime{Time: time.Now()}, + }, + } + _, err = c.leaseClient.Leases(leaseNN.Namespace).Create(ctx, leaderLease, metav1.CreateOptions{}) + // If the create was successful, then we can return here. + if err == nil { + klog.Infof("Created lease %q %q for %q", leaseNN.Namespace, leaseNN.Name, electee.Name) + return true, nil + } + + // If there was an error, return + if !apierrors.IsAlreadyExists(err) { + return false, err + } + + existingLease, err := c.leaseClient.Leases(leaseNN.Namespace).Get(ctx, leaseNN.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + leaseClone := existingLease.DeepCopy() + + // Update the Lease if it either does not have a holder or is expired + isExpired := isLeaseExpired(existingLease) + if leaseClone.Spec.HolderIdentity == nil || *leaseClone.Spec.HolderIdentity == "" || (isExpired && *leaseClone.Spec.HolderIdentity != electeeName) { + klog.Infof("lease %q %q is expired, resetting it and setting holder to %q", leaseNN.Namespace, leaseNN.Name, electee.Name) + leaseClone.Spec.Strategy = &strategy + leaseClone.Spec.PreferredHolder = nil + if leaseClone.ObjectMeta.Annotations == nil { + leaseClone.ObjectMeta.Annotations = make(map[string]string) + } + leaseClone.ObjectMeta.Annotations[ElectedByAnnotationName] = controllerName + leaseClone.Spec.HolderIdentity = &electeeName + + leaseClone.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} + leaseClone.Spec.LeaseDurationSeconds = ptr.To(defaultLeaseDurationSeconds) + leaseClone.Spec.AcquireTime = nil + _, err = c.leaseClient.Leases(leaseNN.Namespace).Update(ctx, leaseClone, metav1.UpdateOptions{}) + if err != nil { + return false, err + } + } else if leaseClone.Spec.HolderIdentity != nil && *leaseClone.Spec.HolderIdentity != electeeName { + klog.Infof("lease %q %q already exists for holder %q but should be held by %q, marking preferredHolder", leaseNN.Namespace, leaseNN.Name, *leaseClone.Spec.HolderIdentity, electee.Name) + leaseClone.Spec.PreferredHolder = &electeeName + leaseClone.Spec.Strategy = &strategy + _, err = c.leaseClient.Leases(leaseNN.Namespace).Update(ctx, leaseClone, metav1.UpdateOptions{}) + if err != nil { + return false, err + } + } + return true, nil +} + +func (c *Controller) listAdmissableCandidates(leaseNN types.NamespacedName) ([]*v1alpha1.LeaseCandidate, error) { + leases, err := c.leaseCandidateInformer.Lister().LeaseCandidates(leaseNN.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + var results []*v1alpha1.LeaseCandidate + for _, l := range leases { + if l.Spec.LeaseName != leaseNN.Name { + continue + } + if !isLeaseCandidateExpired(l) { + results = append(results, l) + } else { + klog.Infof("LeaseCandidate %s is expired", l.Name) + } + } + return results, nil +} diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go new file mode 100644 index 00000000000..b0a344b3c75 --- /dev/null +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller_test.go @@ -0,0 +1,698 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "testing" + "time" + + v1 "k8s.io/api/coordination/v1" + v1alpha1 "k8s.io/api/coordination/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/ptr" + + "k8s.io/client-go/tools/cache" +) + +func TestReconcileElectionStep(t *testing.T) { + tests := []struct { + name string + leaseNN types.NamespacedName + candidates []*v1alpha1.LeaseCandidate + existingLease *v1.Lease + expectedHolderIdentity *string + expectedPreferredHolder string + expectedRequeue bool + expectedError bool + candidatesPinged bool + }{ + { + name: "no candidates, no lease, noop", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{}, + existingLease: nil, + expectedHolderIdentity: nil, + expectedRequeue: false, + expectedError: false, + }, + { + name: "no candidates, lease exists. noop, not managed by CLE", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{}, + existingLease: &v1.Lease{}, + expectedHolderIdentity: nil, + expectedRequeue: false, + expectedError: false, + }, + { + name: "candidates exist, no existing lease should create lease", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + existingLease: nil, + expectedHolderIdentity: ptr.To("component-identity-1"), + expectedRequeue: true, + expectedError: false, + }, + { + name: "candidates exist, lease exists, unoptimal should set preferredHolder", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-2", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.18.0", + BinaryVersion: "1.18.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + existingLease: &v1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-A", + Annotations: map[string]string{ + ElectedByAnnotationName: controllerName, + }, + }, + Spec: v1.LeaseSpec{ + HolderIdentity: ptr.To("component-identity-1"), + LeaseDurationSeconds: ptr.To(int32(10)), + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + }, + }, + expectedHolderIdentity: ptr.To("component-identity-1"), + expectedPreferredHolder: "component-identity-2", + expectedRequeue: true, + expectedError: false, + }, + { + name: "candidates exist, should only elect leader from acked candidates", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))), + RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-2", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.20.0", + BinaryVersion: "1.20.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + existingLease: nil, + expectedHolderIdentity: ptr.To("component-identity-2"), + expectedRequeue: true, + expectedError: false, + }, + { + name: "candidates exist, lease exists, lease expired", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + existingLease: &v1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-A", + Annotations: map[string]string{ + ElectedByAnnotationName: controllerName, + }, + }, + Spec: v1.LeaseSpec{ + HolderIdentity: ptr.To("component-identity-expired"), + LeaseDurationSeconds: ptr.To(int32(10)), + RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), + }, + }, + expectedHolderIdentity: ptr.To("component-identity-1"), + expectedRequeue: true, + expectedError: false, + }, + { + name: "candidates exist, no acked candidates should return error", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + PingTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), + RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-1 * time.Minute))), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + existingLease: nil, + expectedHolderIdentity: nil, + expectedRequeue: false, + expectedError: true, + }, + { + name: "candidates exist, should ping on election", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + existingLease: nil, + expectedHolderIdentity: nil, + expectedRequeue: true, + expectedError: false, + candidatesPinged: true, + }, + { + name: "candidates exist, ping within electionDuration should cause no state change", + leaseNN: types.NamespacedName{Namespace: "default", Name: "component-A"}, + candidates: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + PingTime: ptr.To(metav1.NewMicroTime(time.Now())), + RenewTime: ptr.To(metav1.NewMicroTime(time.Now().Add(-2 * electionDuration))), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + existingLease: nil, + expectedHolderIdentity: nil, + expectedRequeue: false, + expectedError: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + _ = informerFactory.Coordination().V1alpha1().LeaseCandidates().Lister() + controller, err := NewController( + informerFactory.Coordination().V1().Leases(), + informerFactory.Coordination().V1alpha1().LeaseCandidates(), + client.CoordinationV1(), + client.CoordinationV1alpha1(), + ) + if err != nil { + t.Fatal(err) + } + go informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + // Set up the fake client with the existing lease + if tc.existingLease != nil { + _, err = client.CoordinationV1().Leases(tc.existingLease.Namespace).Create(ctx, tc.existingLease, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + } + + // Set up the fake client with the candidates + for _, candidate := range tc.candidates { + _, err = client.CoordinationV1alpha1().LeaseCandidates(candidate.Namespace).Create(ctx, candidate, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + } + cache.WaitForCacheSync(ctx.Done(), controller.leaseCandidateInformer.Informer().HasSynced) + requeue, err := controller.reconcileElectionStep(ctx, tc.leaseNN) + + if requeue != tc.expectedRequeue { + t.Errorf("reconcileElectionStep() requeue = %v, want %v", requeue, tc.expectedRequeue) + } + if tc.expectedError && err == nil { + t.Errorf("reconcileElectionStep() error = %v, want error", err) + } else if !tc.expectedError && err != nil { + t.Errorf("reconcileElectionStep() error = %v, want nil", err) + } + + // Check the lease holder identity + if tc.expectedHolderIdentity != nil { + lease, err := client.CoordinationV1().Leases(tc.leaseNN.Namespace).Get(ctx, tc.leaseNN.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != *tc.expectedHolderIdentity { + t.Errorf("reconcileElectionStep() holderIdentity = %v, want %v", *lease.Spec.HolderIdentity, *tc.expectedHolderIdentity) + } + if tc.expectedPreferredHolder != "" { + if lease.Spec.PreferredHolder == nil || *lease.Spec.PreferredHolder != tc.expectedPreferredHolder { + t.Errorf("reconcileElectionStep() preferredHolder = %v, want %v", lease.Spec.PreferredHolder, tc.expectedPreferredHolder) + } + } + } + + // Verify that ping to candidate was issued + if tc.candidatesPinged { + pinged := false + candidatesList, err := client.CoordinationV1alpha1().LeaseCandidates(tc.leaseNN.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + oldCandidateMap := make(map[string]*v1alpha1.LeaseCandidate) + for _, candidate := range tc.candidates { + oldCandidateMap[candidate.Name] = candidate + } + for _, candidate := range candidatesList.Items { + if candidate.Spec.PingTime != nil { + if oldCandidateMap[candidate.Name].Spec.PingTime == nil { + pinged = true + break + } + } + } + if !pinged { + t.Errorf("reconcileElectionStep() expected candidates to be pinged") + } + } + + }) + } +} + +func TestController(t *testing.T) { + cases := []struct { + name string + leaseNN types.NamespacedName + createAfterControllerStart []*v1alpha1.LeaseCandidate + deleteAfterControllerStart []types.NamespacedName + expectedLeaderLeases []*v1.Lease + }{ + { + name: "single candidate leader election", + leaseNN: types.NamespacedName{Namespace: "kube-system", Name: "component-A"}, + createAfterControllerStart: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + expectedLeaderLeases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-A", + Annotations: map[string]string{ + ElectedByAnnotationName: controllerName, + }, + }, + Spec: v1.LeaseSpec{ + HolderIdentity: ptr.To("component-identity-1"), + }, + }, + }, + }, + { + name: "multiple candidate leader election", + leaseNN: types.NamespacedName{Namespace: "kube-system", Name: "component-A"}, + createAfterControllerStart: []*v1alpha1.LeaseCandidate{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-identity-2", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.20.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-identity-3", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.20.0", + BinaryVersion: "1.20.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + expectedLeaderLeases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-A", + Annotations: map[string]string{ + ElectedByAnnotationName: controllerName, + }, + }, + Spec: v1.LeaseSpec{ + HolderIdentity: ptr.To("component-identity-1"), + }, + }, + }, + }, + { + name: "deletion of lease triggers reelection", + leaseNN: types.NamespacedName{Namespace: "kube-system", Name: "component-A"}, + createAfterControllerStart: []*v1alpha1.LeaseCandidate{ + { + // Leader lease + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-A", + Annotations: map[string]string{ + ElectedByAnnotationName: controllerName, + }, + }, + Spec: v1alpha1.LeaseCandidateSpec{}, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + deleteAfterControllerStart: []types.NamespacedName{ + {Namespace: "kube-system", Name: "component-A"}, + }, + expectedLeaderLeases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-A", + Annotations: map[string]string{ + ElectedByAnnotationName: controllerName, + }, + }, + Spec: v1.LeaseSpec{ + HolderIdentity: ptr.To("component-identity-1"), + }, + }, + }, + }, + { + name: "better candidate triggers reelection", + leaseNN: types.NamespacedName{Namespace: "kube-system", Name: "component-A"}, + createAfterControllerStart: []*v1alpha1.LeaseCandidate{ + { + // Leader lease + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-A", + Annotations: map[string]string{ + ElectedByAnnotationName: controllerName, + }, + }, + Spec: v1alpha1.LeaseCandidateSpec{}, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-identity-1", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.20.0", + BinaryVersion: "1.20.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-identity-2", + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: "component-A", + EmulationVersion: "1.19.0", + BinaryVersion: "1.19.0", + RenewTime: ptr.To(metav1.NewMicroTime(time.Now())), + PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + }, + }, + }, + expectedLeaderLeases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "component-A", + Annotations: map[string]string{ + ElectedByAnnotationName: controllerName, + }, + }, + Spec: v1.LeaseSpec{ + HolderIdentity: ptr.To("component-identity-2"), + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + controller, err := NewController( + informerFactory.Coordination().V1().Leases(), + informerFactory.Coordination().V1alpha1().LeaseCandidates(), + client.CoordinationV1(), + client.CoordinationV1alpha1(), + ) + if err != nil { + t.Fatal(err) + } + + go informerFactory.Start(ctx.Done()) + go controller.Run(ctx, 1) + + go func() { + ticker := time.NewTicker(10 * time.Millisecond) + // Mock out the removal of preferredHolder leases. + // When controllers are running, they are expected to do this voluntarily + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, expectedLease := range tc.expectedLeaderLeases { + lease, err := client.CoordinationV1().Leases(expectedLease.Namespace).Get(ctx, expectedLease.Name, metav1.GetOptions{}) + if err == nil { + if preferredHolder := lease.Spec.PreferredHolder; preferredHolder != nil { + err = client.CoordinationV1().Leases(expectedLease.Namespace).Delete(ctx, expectedLease.Name, metav1.DeleteOptions{}) + if err != nil { + runtime.HandleError(err) + } + } + } + } + } + } + }() + + go func() { + ticker := time.NewTicker(10 * time.Millisecond) + // Mock out leasecandidate ack. + // When controllers are running, they are expected to watch and ack + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for _, lc := range tc.createAfterControllerStart { + lease, err := client.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Get(ctx, lc.Name, metav1.GetOptions{}) + if err == nil { + if lease.Spec.PingTime != nil { + c := lease.DeepCopy() + c.Spec.PingTime = nil + c.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()} + _, err = client.CoordinationV1alpha1().LeaseCandidates(lc.Namespace).Update(ctx, c, metav1.UpdateOptions{}) + if err != nil { + runtime.HandleError(err) + } + + } + } + } + } + } + }() + + for _, obj := range tc.createAfterControllerStart { + _, err := client.CoordinationV1alpha1().LeaseCandidates(obj.Namespace).Create(ctx, obj, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + } + + for _, obj := range tc.deleteAfterControllerStart { + err := client.CoordinationV1alpha1().LeaseCandidates(obj.Namespace).Delete(ctx, obj.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatal(err) + } + } + + for _, expectedLease := range tc.expectedLeaderLeases { + var lease *v1.Lease + err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 600*time.Second, true, func(ctx context.Context) (done bool, err error) { + lease, err = client.CoordinationV1().Leases(expectedLease.Namespace).Get(ctx, expectedLease.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return true, err + } + if expectedLease.Spec.HolderIdentity == nil || lease.Spec.HolderIdentity == nil { + return expectedLease.Spec.HolderIdentity == nil && lease.Spec.HolderIdentity == nil, nil + } + if expectedLease.Spec.HolderIdentity != nil && lease.Spec.HolderIdentity != nil && *expectedLease.Spec.HolderIdentity != *lease.Spec.HolderIdentity { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatal(err) + } + if lease.Spec.HolderIdentity == nil { + t.Fatalf("Expected HolderIdentity of %s but got nil", expectedLease.Name) + } + if *lease.Spec.HolderIdentity != *expectedLease.Spec.HolderIdentity { + t.Errorf("Expected HolderIdentity of %s but got %s", *expectedLease.Spec.HolderIdentity, *lease.Spec.HolderIdentity) + } + } + }) + } +} diff --git a/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go b/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go new file mode 100644 index 00000000000..f34c5a2e405 --- /dev/null +++ b/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go @@ -0,0 +1,91 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "os" + "time" + + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" +) + +type NewRunner func() (func(ctx context.Context, workers int), error) + +// RunWithLeaderElection runs the provided runner function with leader election. +// newRunnerFn might be called multiple times, and it should return another +// controller instance's Run method each time. +// RunWithLeaderElection only returns when the context is done, or initial +// leader election fails. +func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn NewRunner) { + var cancel context.CancelFunc + + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + ctx, cancel = context.WithCancel(ctx) + var err error + run, err := newRunnerFn() + if err != nil { + klog.Infof("Error creating runner: %v", err) + return + } + run(ctx, 1) + }, + OnStoppedLeading: func() { + cancel() + }, + } + + hostname, err := os.Hostname() + if err != nil { + klog.Infof("Error parsing hostname: %v", err) + return + } + + rl, err := resourcelock.NewFromKubeconfig( + "leases", + "kube-system", + controllerName, + resourcelock.ResourceLockConfig{ + Identity: hostname + "_" + string(uuid.NewUUID()), + }, + config, + 10, + ) + if err != nil { + klog.Infof("Error creating resourcelock: %v", err) + return + } + + le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: callbacks, + Name: controllerName, + }) + if err != nil { + klog.Infof("Error creating leader elector: %v", err) + return + } + le.Run(ctx) +} diff --git a/staging/publishing/import-restrictions.yaml b/staging/publishing/import-restrictions.yaml index 79133f0425a..287446878b4 100644 --- a/staging/publishing/import-restrictions.yaml +++ b/staging/publishing/import-restrictions.yaml @@ -91,6 +91,7 @@ ignoredSubTrees: - "./staging/src/k8s.io/client-go/tools/cache/testing" - "./staging/src/k8s.io/client-go/tools/leaderelection/resourcelock" + - "./staging/src/k8s.io/client-go/tools/leaderelection" - "./staging/src/k8s.io/client-go/tools/portforward" - "./staging/src/k8s.io/client-go/tools/record" - "./staging/src/k8s.io/client-go/tools/events" diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index 5a1194b4aba..c61c600a7f7 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -159,6 +159,9 @@ type LeaderElectionConfig struct { // Name is the name of the resource lock for debugging Name string + + // Coordinated will use the Coordinated Leader Election feature + Coordinated bool } // LeaderCallbacks are callbacks that are triggered during certain @@ -249,7 +252,11 @@ func (le *LeaderElector) acquire(ctx context.Context) bool { desc := le.config.Lock.Describe() klog.Infof("attempting to acquire leader lease %v...", desc) wait.JitterUntil(func() { - succeeded = le.tryAcquireOrRenew(ctx) + if !le.config.Coordinated { + succeeded = le.tryAcquireOrRenew(ctx) + } else { + succeeded = le.tryCoordinatedRenew(ctx) + } le.maybeReportTransition() if !succeeded { klog.V(4).Infof("failed to acquire lease %v", desc) @@ -272,7 +279,11 @@ func (le *LeaderElector) renew(ctx context.Context) { timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { - return le.tryAcquireOrRenew(timeoutCtx), nil + if !le.config.Coordinated { + return le.tryAcquireOrRenew(timeoutCtx), nil + } else { + return le.tryCoordinatedRenew(timeoutCtx), nil + } }, timeoutCtx.Done()) le.maybeReportTransition() @@ -282,7 +293,6 @@ func (le *LeaderElector) renew(ctx context.Context) { return } le.metrics.leaderOff(le.config.Name) - klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) @@ -315,6 +325,81 @@ func (le *LeaderElector) release() bool { return true } +// tryCoordinatedRenew checks if it acquired a lease and tries to renew the +// lease if it has already been acquired. Returns true on success else returns +// false. +func (le *LeaderElector) tryCoordinatedRenew(ctx context.Context) bool { + now := metav1.NewTime(le.clock.Now()) + leaderElectionRecord := rl.LeaderElectionRecord{ + HolderIdentity: le.config.Lock.Identity(), + LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), + RenewTime: now, + AcquireTime: now, + } + + // 1. obtain the electionRecord + oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) + if err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) + return false + } + klog.Infof("lease lock not found: %v", le.config.Lock.Describe()) + return false + } + + // 2. Record obtained, check the Identity & Time + if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { + le.setObservedRecord(oldLeaderElectionRecord) + + le.observedRawRecord = oldLeaderElectionRawRecord + } + hasExpired := le.observedTime.Add(time.Second * time.Duration(oldLeaderElectionRecord.LeaseDurationSeconds)).Before(now.Time) + + if hasExpired { + klog.Infof("lock has expired: %v", le.config.Lock.Describe()) + return false + } + + if !le.IsLeader() { + klog.V(4).Infof("lock is held by %v and has not yet expired: %v", oldLeaderElectionRecord.HolderIdentity, le.config.Lock.Describe()) + return false + } + + // 2b. If the lease has been marked as "end of term", don't renew it + if le.IsLeader() && oldLeaderElectionRecord.PreferredHolder != "" { + klog.V(4).Infof("lock is marked as 'end of term': %v", le.config.Lock.Describe()) + // TODO: Instead of letting lease expire, the holder may deleted it directly + // This will not be compatible with all controllers, so it needs to be opt-in behavior.. + // We must ensure all code guarded by this lease has successfully completed + // prior to releasing or there may be two processes + // simultaneously acting on the critical path. + // Usually once this returns false, the process is terminated.. + // xref: OnStoppedLeading + return false + } + + // 3. We're going to try to update. The leaderElectionRecord is set to it's default + // here. Let's correct it before updating. + if le.IsLeader() { + leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + leaderElectionRecord.Strategy = oldLeaderElectionRecord.Strategy + le.metrics.slowpathExercised(le.config.Name) + } else { + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 + } + + // update the lock itself + if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { + klog.Errorf("Failed to update lock: %v", err) + return false + } + + le.setObservedRecord(&leaderElectionRecord) + return true +} + // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, // else it tries to renew the lease if it has already been acquired. Returns true // on success else returns false. diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go index de481e0adfc..91959ae386f 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -37,8 +38,6 @@ import ( rl "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" - - "github.com/stretchr/testify/assert" ) func createLockObject(t *testing.T, objectType, namespace, name string, record *rl.LeaderElectionRecord) (obj runtime.Object) { @@ -353,6 +352,147 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { } } +func TestTryCoordinatedRenew(t *testing.T) { + objectType := "leases" + clock := clock.RealClock{} + future := clock.Now().Add(1000 * time.Hour) + + tests := []struct { + name string + observedRecord rl.LeaderElectionRecord + observedTime time.Time + retryAfter time.Duration + reactors []Reactor + expectedEvents []string + + expectSuccess bool + transitionLeader bool + outHolder string + }{ + { + name: "don't acquire from led, acked object", + reactors: []Reactor{ + { + verb: "get", + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil + }, + }, + }, + observedTime: future, + + expectSuccess: false, + outHolder: "bing", + }, + { + name: "renew already acquired object", + reactors: []Reactor{ + { + verb: "get", + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil + }, + }, + { + verb: "update", + reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(fakeclient.CreateAction).GetObject(), nil + }, + }, + }, + observedTime: future, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"}, + + expectSuccess: true, + outHolder: "baz", + }, + } + + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + var lock rl.Interface + + objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + recorder := record.NewFakeRecorder(100) + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: recorder, + } + c := &fake.Clientset{} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) + } + c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) + + lock = &rl.LeaseLock{ + LeaseMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c.CoordinationV1(), + } + lec := LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, + }, + Coordinated: true, + } + observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord) + le := &LeaderElector{ + config: lec, + observedRecord: test.observedRecord, + observedRawRecord: observedRawRecord, + observedTime: test.observedTime, + clock: clock, + metrics: globalMetricsFactory.newLeaderMetrics(), + } + if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) { + if test.retryAfter != 0 { + time.Sleep(test.retryAfter) + if test.expectSuccess != le.tryCoordinatedRenew(context.Background()) { + t.Errorf("unexpected result of tryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess) + } + } else { + t.Errorf("unexpected result of gryCoordinatedRenew: [succeeded=%v]", !test.expectSuccess) + } + } + + le.observedRecord.AcquireTime = metav1.Time{} + le.observedRecord.RenewTime = metav1.Time{} + if le.observedRecord.HolderIdentity != test.outHolder { + t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity) + } + if len(test.reactors) != len(c.Actions()) { + t.Errorf("wrong number of api interactions") + } + if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { + t.Errorf("leader should have transitioned but did not") + } + if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { + t.Errorf("leader should not have transitioned but did") + } + + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) + } + assertEqualEvents(t, test.expectedEvents, recorder.Events) + }) + } +} + // Will test leader election using lease as the resource func TestTryAcquireOrRenewLeases(t *testing.T) { testTryAcquireOrRenew(t, "leases") diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go new file mode 100644 index 00000000000..f071dd33acc --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go @@ -0,0 +1,196 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "time" + + v1 "k8s.io/api/coordination/v1" + v1alpha1 "k8s.io/api/coordination/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + coordinationv1alpha1client "k8s.io/client-go/kubernetes/typed/coordination/v1alpha1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const requeueInterval = 5 * time.Minute + +type LeaseCandidate struct { + LeaseClient coordinationv1alpha1client.LeaseCandidateInterface + LeaseCandidateInformer cache.SharedIndexInformer + InformerFactory informers.SharedInformerFactory + HasSynced cache.InformerSynced + + // At most there will be one item in this Queue (since we only watch one item) + queue workqueue.TypedRateLimitingInterface[int] + + name string + namespace string + + // controller lease + leaseName string + + Clock clock.Clock + + binaryVersion, emulationVersion string + preferredStrategies []v1.CoordinatedLeaseStrategy +} + +func NewCandidate(clientset kubernetes.Interface, + candidateName string, + candidateNamespace string, + targetLease string, + clock clock.Clock, + binaryVersion, emulationVersion string, + preferredStrategies []v1.CoordinatedLeaseStrategy, +) (*LeaseCandidate, error) { + fieldSelector := fields.OneTermEqualSelector("metadata.name", candidateName).String() + // A separate informer factory is required because this must start before informerFactories + // are started for leader elected components + informerFactory := informers.NewSharedInformerFactoryWithOptions( + clientset, 5*time.Minute, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fieldSelector + }), + ) + leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates().Informer() + + lc := &LeaseCandidate{ + LeaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace), + LeaseCandidateInformer: leaseCandidateInformer, + InformerFactory: informerFactory, + name: candidateName, + namespace: candidateNamespace, + leaseName: targetLease, + Clock: clock, + binaryVersion: binaryVersion, + emulationVersion: emulationVersion, + preferredStrategies: preferredStrategies, + } + lc.queue = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[int](), workqueue.TypedRateLimitingQueueConfig[int]{Name: "leasecandidate"}) + + synced, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok { + if leasecandidate.Spec.PingTime != nil { + lc.enqueueLease() + } + } + }, + }) + if err != nil { + return nil, err + } + lc.HasSynced = synced.HasSynced + + return lc, nil +} + +func (c *LeaseCandidate) Run(ctx context.Context) { + defer c.queue.ShutDown() + + go c.InformerFactory.Start(ctx.Done()) + if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.HasSynced) { + return + } + + c.enqueueLease() + go c.runWorker(ctx) + <-ctx.Done() +} + +func (c *LeaseCandidate) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *LeaseCandidate) processNextWorkItem(ctx context.Context) bool { + key, shutdown := c.queue.Get() + if shutdown { + return false + } + defer c.queue.Done(key) + + err := c.ensureLease(ctx) + if err == nil { + c.queue.AddAfter(key, requeueInterval) + return true + } + + utilruntime.HandleError(err) + klog.Infof("processNextWorkItem.AddRateLimited: %v", key) + c.queue.AddRateLimited(key) + + return true +} + +func (c *LeaseCandidate) enqueueLease() { + c.queue.Add(0) +} + +// ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and +// a bool (true if this call created the lease), or any error that occurs. +func (c *LeaseCandidate) ensureLease(ctx context.Context) error { + lease, err := c.LeaseClient.Get(ctx, c.name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + klog.V(2).Infof("Creating lease candidate") + // lease does not exist, create it. + leaseToCreate := c.newLease() + _, err := c.LeaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}) + if err != nil { + return err + } + klog.V(2).Infof("Created lease candidate") + return nil + } else if err != nil { + return err + } + klog.V(2).Infof("lease candidate exists.. renewing") + clone := lease.DeepCopy() + clone.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()} + clone.Spec.PingTime = nil + _, err = c.LeaseClient.Update(ctx, clone, metav1.UpdateOptions{}) + if err != nil { + return err + } + return nil +} + +func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate { + lease := &v1alpha1.LeaseCandidate{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.name, + Namespace: c.namespace, + }, + Spec: v1alpha1.LeaseCandidateSpec{ + LeaseName: c.leaseName, + BinaryVersion: c.binaryVersion, + EmulationVersion: c.emulationVersion, + PreferredStrategies: c.preferredStrategies, + }, + } + lease.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()} + return lease +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go new file mode 100644 index 00000000000..5265abfc387 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go @@ -0,0 +1,146 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "testing" + "time" + + v1 "k8s.io/api/coordination/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/clock" +) + +type testcase struct { + candidateName, candidateNamespace, leaseName string + binaryVersion, emulationVersion string +} + +func TestLeaseCandidateCreation(t *testing.T) { + tc := testcase{ + candidateName: "foo", + candidateNamespace: "default", + leaseName: "lease", + binaryVersion: "1.30.0", + emulationVersion: "1.30.0", + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + client := fake.NewSimpleClientset() + candidate, err := NewCandidate( + client, + tc.candidateName, + tc.candidateNamespace, + tc.leaseName, + clock.RealClock{}, + tc.binaryVersion, + tc.emulationVersion, + []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + ) + if err != nil { + t.Fatal(err) + } + + go candidate.Run(ctx) + err = pollForLease(ctx, tc, client, nil) + if err != nil { + t.Fatal(err) + } +} + +func TestLeaseCandidateAck(t *testing.T) { + tc := testcase{ + candidateName: "foo", + candidateNamespace: "default", + leaseName: "lease", + binaryVersion: "1.30.0", + emulationVersion: "1.30.0", + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + client := fake.NewSimpleClientset() + + candidate, err := NewCandidate( + client, + tc.candidateName, + tc.candidateNamespace, + tc.leaseName, + clock.RealClock{}, + tc.binaryVersion, + tc.emulationVersion, + []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, + ) + if err != nil { + t.Fatal(err) + } + + go candidate.Run(ctx) + err = pollForLease(ctx, tc, client, nil) + if err != nil { + t.Fatal(err) + } + + // Update PingTime and verify that the client renews + ensureAfter := &metav1.MicroTime{Time: time.Now()} + lc, err := client.CoordinationV1alpha1().LeaseCandidates(tc.candidateNamespace).Get(ctx, tc.candidateName, metav1.GetOptions{}) + if err == nil { + if lc.Spec.PingTime == nil { + c := lc.DeepCopy() + c.Spec.PingTime = &metav1.MicroTime{Time: time.Now()} + _, err = client.CoordinationV1alpha1().LeaseCandidates(tc.candidateNamespace).Update(ctx, c, metav1.UpdateOptions{}) + if err != nil { + t.Error(err) + } + } + } + err = pollForLease(ctx, tc, client, ensureAfter) + if err != nil { + t.Fatal(err) + } +} + +func pollForLease(ctx context.Context, tc testcase, client *fake.Clientset, t *metav1.MicroTime) error { + return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + lc, err := client.CoordinationV1alpha1().LeaseCandidates(tc.candidateNamespace).Get(ctx, tc.candidateName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return true, err + } + if lc.Spec.BinaryVersion == tc.binaryVersion && + lc.Spec.EmulationVersion == tc.emulationVersion && + lc.Spec.LeaseName == tc.leaseName && + lc.Spec.PingTime == nil && + lc.Spec.RenewTime != nil { + // Ensure that if a time is provided, the renewTime occurred after the provided time. + if t != nil && t.After(lc.Spec.RenewTime.Time) { + return false, nil + } + return true, nil + } + return false, nil + }) +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go index 483753d632c..053a7570d78 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go @@ -19,14 +19,15 @@ package resourcelock import ( "context" "fmt" - clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" "time" + v1 "k8s.io/api/coordination/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + clientset "k8s.io/client-go/kubernetes" coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + restclient "k8s.io/client-go/rest" ) const ( @@ -114,11 +115,13 @@ type LeaderElectionRecord struct { // attempt to acquire leases with empty identities and will wait for the full lease // interval to expire before attempting to reacquire. This value is set to empty when // a client voluntarily steps down. - HolderIdentity string `json:"holderIdentity"` - LeaseDurationSeconds int `json:"leaseDurationSeconds"` - AcquireTime metav1.Time `json:"acquireTime"` - RenewTime metav1.Time `json:"renewTime"` - LeaderTransitions int `json:"leaderTransitions"` + HolderIdentity string `json:"holderIdentity"` + LeaseDurationSeconds int `json:"leaseDurationSeconds"` + AcquireTime metav1.Time `json:"acquireTime"` + RenewTime metav1.Time `json:"renewTime"` + LeaderTransitions int `json:"leaderTransitions"` + Strategy v1.CoordinatedLeaseStrategy `json:"strategy"` + PreferredHolder string `json:"preferredHolder"` } // EventRecorder records a change in the ResourceLock. diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go index 8a9d7d60f2d..7cd2a8b9ca7 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go @@ -122,6 +122,12 @@ func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElec if spec.RenewTime != nil { r.RenewTime = metav1.Time{Time: spec.RenewTime.Time} } + if spec.PreferredHolder != nil { + r.PreferredHolder = *spec.PreferredHolder + } + if spec.Strategy != nil { + r.Strategy = *spec.Strategy + } return &r } @@ -129,11 +135,18 @@ func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElec func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec { leaseDurationSeconds := int32(ler.LeaseDurationSeconds) leaseTransitions := int32(ler.LeaderTransitions) - return coordinationv1.LeaseSpec{ + spec := coordinationv1.LeaseSpec{ HolderIdentity: &ler.HolderIdentity, LeaseDurationSeconds: &leaseDurationSeconds, AcquireTime: &metav1.MicroTime{Time: ler.AcquireTime.Time}, RenewTime: &metav1.MicroTime{Time: ler.RenewTime.Time}, LeaseTransitions: &leaseTransitions, } + if ler.PreferredHolder != "" { + spec.PreferredHolder = &ler.PreferredHolder + } + if ler.Strategy != "" { + spec.Strategy = &ler.Strategy + } + return spec } diff --git a/test/integration/apiserver/coordinated_leader_election_test.go b/test/integration/apiserver/coordinated_leader_election_test.go new file mode 100644 index 00000000000..4ac7483c3be --- /dev/null +++ b/test/integration/apiserver/coordinated_leader_election_test.go @@ -0,0 +1,296 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + "fmt" + "testing" + "time" + + v1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + kubernetes "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" + "k8s.io/utils/clock" + + apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/test/integration/framework" +) + +func TestSingleLeaseCandidate(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) + + server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd()) + if err != nil { + t.Fatal(err) + } + defer server.TearDownFn() + config := server.ClientConfig + + ctx, cancel := context.WithCancel(context.Background()) + cletest := setupCLE(config, ctx, cancel, t) + defer cletest.cleanup() + go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0") + cletest.pollForLease("foo", "default", "foo1") +} + +func TestMultipleLeaseCandidate(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) + + server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd()) + if err != nil { + t.Fatal(err) + } + defer server.TearDownFn() + config := server.ClientConfig + + ctx, cancel := context.WithCancel(context.Background()) + cletest := setupCLE(config, ctx, cancel, t) + defer cletest.cleanup() + go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0") + go cletest.createAndRunFakeController("foo2", "default", "foo", "1.20.0", "1.19.0") + go cletest.createAndRunFakeController("foo3", "default", "foo", "1.19.0", "1.19.0") + go cletest.createAndRunFakeController("foo4", "default", "foo", "1.2.0", "1.19.0") + go cletest.createAndRunFakeController("foo5", "default", "foo", "1.20.0", "1.19.0") + cletest.pollForLease("foo", "default", "foo3") +} + +func TestLeaderDisappear(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) + + server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd()) + if err != nil { + t.Fatal(err) + } + defer server.TearDownFn() + config := server.ClientConfig + + ctx, cancel := context.WithCancel(context.Background()) + cletest := setupCLE(config, ctx, cancel, t) + defer cletest.cleanup() + + go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0") + go cletest.createAndRunFakeController("foo2", "default", "foo", "1.20.0", "1.19.0") + cletest.pollForLease("foo", "default", "foo2") + cletest.cancelController("foo2", "default") + cletest.deleteLC("foo2", "default") + cletest.pollForLease("foo", "default", "foo1") +} + +func TestLeaseSwapIfBetterAvailable(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) + + server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd()) + if err != nil { + t.Fatal(err) + } + defer server.TearDownFn() + config := server.ClientConfig + + ctx, cancel := context.WithCancel(context.Background()) + cletest := setupCLE(config, ctx, cancel, t) + defer cletest.cleanup() + + go cletest.createAndRunFakeController("bar1", "default", "bar", "1.20.0", "1.20.0") + cletest.pollForLease("bar", "default", "bar1") + go cletest.createAndRunFakeController("bar2", "default", "bar", "1.19.0", "1.19.0") + cletest.pollForLease("bar", "default", "bar2") +} + +// TestUpgradeSkew tests that a legacy client and a CLE aware client operating on the same lease do not cause errors +func TestUpgradeSkew(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) + + server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd()) + if err != nil { + t.Fatal(err) + } + defer server.TearDownFn() + config := server.ClientConfig + + ctx, cancel := context.WithCancel(context.Background()) + cletest := setupCLE(config, ctx, cancel, t) + defer cletest.cleanup() + + go cletest.createAndRunFakeLegacyController("foo1-130", "default", "foo") + cletest.pollForLease("foo", "default", "foo1-130") + go cletest.createAndRunFakeController("foo1-131", "default", "foo", "1.31.0", "1.31.0") + // running a new controller should not kick off old leader + cletest.pollForLease("foo", "default", "foo1-130") + cletest.cancelController("foo1-130", "default") + cletest.pollForLease("foo", "default", "foo1-131") +} + +type ctxCancelPair struct { + ctx context.Context + cancel func() +} +type cleTest struct { + config *rest.Config + clientset *kubernetes.Clientset + t *testing.T + ctxList map[string]ctxCancelPair +} + +func (t cleTest) createAndRunFakeLegacyController(name string, namespace string, targetLease string) { + ctx, cancel := context.WithCancel(context.Background()) + t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel} + + electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) + go leaderElectAndRunUncoordinated(ctx, t.config, name, electionChecker, + namespace, + "leases", + targetLease, + leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.Info("Elected leader, starting..") + }, + OnStoppedLeading: func() { + klog.Errorf("%s Lost leadership, stopping", name) + // klog.FlushAndExit(klog.ExitFlushTimeout, 1) + }, + }) + +} +func (t cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) { + identityLease, err := leaderelection.NewCandidate( + t.clientset, + name, + namespace, + targetLease, + clock.RealClock{}, + binaryVersion, + compatibilityVersion, + []v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"}, + ) + if err != nil { + t.t.Error(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel} + go identityLease.Run(ctx) + + electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) + go leaderElectAndRunCoordinated(ctx, t.config, name, electionChecker, + namespace, + "leases", + targetLease, + leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.Info("Elected leader, starting..") + }, + OnStoppedLeading: func() { + klog.Errorf("%s Lost leadership, stopping", name) + // klog.FlushAndExit(klog.ExitFlushTimeout, 1) + }, + }) +} + +func leaderElectAndRunUncoordinated(ctx context.Context, kubeconfig *rest.Config, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceNamespace, resourceLock, leaseName string, callbacks leaderelection.LeaderCallbacks) { + leaderElectAndRun(ctx, kubeconfig, lockIdentity, electionChecker, resourceNamespace, resourceLock, leaseName, callbacks, false) +} + +func leaderElectAndRunCoordinated(ctx context.Context, kubeconfig *rest.Config, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceNamespace, resourceLock, leaseName string, callbacks leaderelection.LeaderCallbacks) { + leaderElectAndRun(ctx, kubeconfig, lockIdentity, electionChecker, resourceNamespace, resourceLock, leaseName, callbacks, true) +} + +func leaderElectAndRun(ctx context.Context, kubeconfig *rest.Config, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceNamespace, resourceLock, leaseName string, callbacks leaderelection.LeaderCallbacks, coordinated bool) { + logger := klog.FromContext(ctx) + rl, err := resourcelock.NewFromKubeconfig(resourceLock, + resourceNamespace, + leaseName, + resourcelock.ResourceLockConfig{ + Identity: lockIdentity, + }, + kubeconfig, + 5*time.Second) + if err != nil { + logger.Error(err, "Error creating lock") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: 5 * time.Second, + RenewDeadline: 3 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: callbacks, + WatchDog: electionChecker, + Name: leaseName, + Coordinated: coordinated, + }) +} + +func (t cleTest) pollForLease(name, namespace, holder string) { + err := wait.PollUntilContextTimeout(t.ctxList["main"].ctx, 1000*time.Millisecond, 15*time.Second, true, func(ctx context.Context) (done bool, err error) { + lease, err := t.clientset.CoordinationV1().Leases(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + fmt.Println(err) + return false, nil + } + return lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == holder, nil + }) + if err != nil { + t.t.Fatalf("timeout awiting for Lease %s %s err: %v", name, namespace, err) + } +} + +func (t cleTest) cancelController(name, namespace string) { + t.ctxList[name+"/"+namespace].cancel() + delete(t.ctxList, name+"/"+namespace) +} + +func (t cleTest) cleanup() { + err := t.clientset.CoordinationV1().Leases("kube-system").Delete(context.TODO(), "leader-election-controller", metav1.DeleteOptions{}) + if err != nil { + t.t.Error(err) + } + for _, c := range t.ctxList { + c.cancel() + } +} + +func (t cleTest) deleteLC(name, namespace string) { + err := t.clientset.CoordinationV1alpha1().LeaseCandidates(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) + if err != nil { + t.t.Error(err) + } +} + +func setupCLE(config *rest.Config, ctx context.Context, cancel func(), t *testing.T) cleTest { + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + + a := ctxCancelPair{ctx, cancel} + return cleTest{ + config: config, + clientset: clientset, + ctxList: map[string]ctxCancelPair{"main": a}, + t: t, + } +}