From 68226b0501996fc86c9c2bddb7d61e6a64c91304 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 23 Jul 2024 14:52:25 +0200 Subject: [PATCH] Review feedback Signed-off-by: Dr. Stefan Schimanski --- .../app/controllermanager.go | 23 +++++---- cmd/kube-scheduler/app/server.go | 14 +++--- hack/local-up-cluster.sh | 1 - pkg/apis/coordination/types.go | 15 +++--- .../coordination/validation/validation.go | 6 +-- .../k8s.io/api/coordination/v1alpha1/types.go | 14 +++--- .../tools/leaderelection/leasecandidate.go | 48 ++++++++++--------- .../leaderelection/leasecandidate_test.go | 7 +-- .../coordinated_leader_election_test.go | 5 +- 9 files changed, 66 insertions(+), 67 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0223bdfdd95..cbfa4baf82b 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -30,7 +30,7 @@ import ( "github.com/blang/semver/v4" "github.com/spf13/cobra" - v1 "k8s.io/api/coordination/v1" + coordinationv1 "k8s.io/api/coordination/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -81,7 +81,6 @@ import ( serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/serviceaccount" - "k8s.io/utils/clock" ) func init() { @@ -292,27 +291,27 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { return startSATokenControllerInit(ctx, controllerContext, controllerName) } } - ver, err := semver.ParseTolerant(version.Get().String()) - if err != nil { - return err - } if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CoordinatedLeaderElection) { - // Start component identity lease management - leaseCandidate, err := leaderelection.NewCandidate( + ver, err := semver.ParseTolerant(version.Get().String()) + if err != nil { + return err + } + + // Start lease candidate controller for coordinated leader election + leaseCandidate, waitForSync, err := leaderelection.NewCandidate( c.Client, id, "kube-system", "kube-controller-manager", - clock.RealClock{}, ver.FinalizeVersion(), - ver.FinalizeVersion(), // TODO: Use compatibility version when it's available - []v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"}, + ver.FinalizeVersion(), // TODO(Jefftree): Use compatibility version when it's available + []coordinationv1.CoordinatedLeaseStrategy{coordinationv1.OldestEmulationVersion}, ) if err != nil { return err } - healthzHandler.AddHealthChecker(healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory)) + healthzHandler.AddHealthChecker(healthz.NewInformerSyncHealthz(waitForSync)) go leaseCandidate.Run(ctx) } diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 08f59829d4c..46989b03e1b 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -27,6 +27,7 @@ import ( "github.com/blang/semver/v4" "github.com/spf13/cobra" coordinationv1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/authentication/authenticator" @@ -57,8 +58,6 @@ import ( "k8s.io/component-base/version" "k8s.io/component-base/version/verflag" "k8s.io/klog/v2" - "k8s.io/utils/clock" - schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" "k8s.io/kubernetes/cmd/kube-scheduler/app/options" kubefeatures "k8s.io/kubernetes/pkg/features" @@ -221,21 +220,20 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * return err } - // Start component identity lease management - leaseCandidate, err := leaderelection.NewCandidate( + // Start lease candidate controller for coordinated leader election + leaseCandidate, waitForSync, err := leaderelection.NewCandidate( cc.Client, cc.LeaderElection.Lock.Identity(), - "kube-system", + metav1.NamespaceSystem, "kube-scheduler", - clock.RealClock{}, binaryVersion.FinalizeVersion(), emulationVersion.FinalizeVersion(), - []coordinationv1.CoordinatedLeaseStrategy{"OldestEmulationVersion"}, + []coordinationv1.CoordinatedLeaseStrategy{coordinationv1.OldestEmulationVersion}, ) if err != nil { return err } - readyzChecks = append(readyzChecks, healthz.NewInformerSyncHealthz(leaseCandidate.InformerFactory)) + readyzChecks = append(readyzChecks, healthz.NewInformerSyncHealthz(waitForSync)) go leaseCandidate.Run(ctx) } diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index f381eb442e8..31883ac147b 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -1027,7 +1027,6 @@ EOF --feature-gates="${FEATURE_GATES}" \ --authentication-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \ --authorization-kubeconfig "${CERT_DIR}"/scheduler.kubeconfig \ - --leader-elect=false \ --master="https://${API_HOST}:${API_SECURE_PORT}" >"${SCHEDULER_LOG}" 2>&1 & SCHEDULER_PID=$! } diff --git a/pkg/apis/coordination/types.go b/pkg/apis/coordination/types.go index 167cb727734..d67546ab940 100644 --- a/pkg/apis/coordination/types.go +++ b/pkg/apis/coordination/types.go @@ -96,7 +96,7 @@ type LeaseList struct { // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object -// LeaseCandidate defines a candidate for a lease object. +// LeaseCandidate defines a candidate for a Lease object. // Candidates are created such that coordinated leader election will pick the best leader from the list of candidates. type LeaseCandidate struct { metav1.TypeMeta @@ -109,6 +109,7 @@ type LeaseCandidate struct { type LeaseCandidateSpec struct { // LeaseName is the name of the lease for which this candidate is contending. // This field is immutable. + // +required LeaseName string // PingTime is the last time that the server has requested the LeaseCandidate // to renew. It is only done during leader election to check if any @@ -120,16 +121,18 @@ type LeaseCandidateSpec struct { // Any time a Lease needs to do leader election, the PingTime field // is updated to signal to the LeaseCandidate that they should update // the RenewTime. - // Old LeaseCandidate objects are also garbage collected if it has been hours since the last renew. + // Old LeaseCandidate objects are also garbage collected if it has been hours + // since the last renew. The PingTime field is updated regularly to prevent + // garbage collection for still active LeaseCandidates. // +optional RenewTime *metav1.MicroTime - // BinaryVersion is the binary version. It must be in a semver format without leadig `v`. - // This field is required when Strategy is "OldestEmulationVersion" + // BinaryVersion is the binary version. It must be in a semver format without leading `v`. + // This field is required when strategy is "OldestEmulationVersion" // +optional BinaryVersion string // EmulationVersion is the emulation version. It must be in a semver format without leading `v`. // EmulationVersion must be less than or equal to BinaryVersion. - // This field is required when Strategy is "OldestEmulationVersion" + // This field is required when strategy is "OldestEmulationVersion" // +optional EmulationVersion string // PreferredStrategies indicates the list of strategies for picking the leader for coordinated leader election. @@ -137,7 +140,7 @@ type LeaseCandidateSpec struct { // leader election to make a decision about the final election strategy. This follows as // - If all clients have strategy X as the first element in this list, strategy X will be used. // - If a candidate has strategy [X] and another candidate has strategy [Y, X], Y supersedes X and strategy Y - // will be used + // will be used. // - If a candidate has strategy [X, Y] and another candidate has strategy [Y, X], this is a user error and leader // election will not operate the Lease until resolved. // (Alpha) Using this field requires the CoordinatedLeaderElection feature gate to be enabled. diff --git a/pkg/apis/coordination/validation/validation.go b/pkg/apis/coordination/validation/validation.go index 04ac81d4277..44bb45052c9 100644 --- a/pkg/apis/coordination/validation/validation.go +++ b/pkg/apis/coordination/validation/validation.go @@ -124,11 +124,11 @@ func ValidateLeaseCandidateSpec(spec *coordination.LeaseCandidateSpec, fldPath * allErrs = append(allErrs, field.Invalid(fld, spec.BinaryVersion, "must be greater than or equal to `emulationVersion`")) } - strategySeen := make(map[coordination.CoordinatedLeaseStrategy]bool) - if len(spec.PreferredStrategies) > 0 { for i, strategy := range spec.PreferredStrategies { fld := fldPath.Child("preferredStrategies").Index(i) + + strategySeen := make(map[coordination.CoordinatedLeaseStrategy]bool) if _, ok := strategySeen[strategy]; ok { allErrs = append(allErrs, field.Duplicate(fld, strategy)) } else { @@ -153,7 +153,7 @@ func ValidateLeaseCandidateSpec(spec *coordination.LeaseCandidateSpec, fldPath * return allErrs } -// ValidateLeaseStrategy validates the Strategy field in both the Lease and LeaseCandidate +// ValidateCoordinatedLeaseStrategy validates the Strategy field in both the Lease and LeaseCandidate func ValidateCoordinatedLeaseStrategy(strategy coordination.CoordinatedLeaseStrategy, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} diff --git a/staging/src/k8s.io/api/coordination/v1alpha1/types.go b/staging/src/k8s.io/api/coordination/v1alpha1/types.go index 6af5b51fc2f..14066600cf5 100644 --- a/staging/src/k8s.io/api/coordination/v1alpha1/types.go +++ b/staging/src/k8s.io/api/coordination/v1alpha1/types.go @@ -25,7 +25,7 @@ import ( // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +k8s:prerelease-lifecycle-gen:introduced=1.31 -// LeaseCandidate defines a candidate for a lease object. +// LeaseCandidate defines a candidate for a Lease object. // Candidates are created such that coordinated leader election will pick the best leader from the list of candidates. type LeaseCandidate struct { metav1.TypeMeta `json:",inline"` @@ -39,7 +39,7 @@ type LeaseCandidate struct { Spec LeaseCandidateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` } -// LeaseSpec is a specification of a Lease. +// LeaseCandidateSpec is a specification of a Lease. type LeaseCandidateSpec struct { // LeaseName is the name of the lease for which this candidate is contending. // This field is immutable. @@ -55,16 +55,18 @@ type LeaseCandidateSpec struct { // Any time a Lease needs to do leader election, the PingTime field // is updated to signal to the LeaseCandidate that they should update // the RenewTime. - // Old LeaseCandidate objects are also garbage collected if it has been hours since the last renew. + // Old LeaseCandidate objects are also garbage collected if it has been hours + // since the last renew. The PingTime field is updated regularly to prevent + // garbage collection for still active LeaseCandidates. // +optional RenewTime *metav1.MicroTime `json:"renewTime,omitempty" protobuf:"bytes,3,opt,name=renewTime"` // BinaryVersion is the binary version. It must be in a semver format without leading `v`. - // This field is required when Strategy is "OldestEmulationVersion" + // This field is required when strategy is "OldestEmulationVersion" // +optional BinaryVersion string `json:"binaryVersion,omitempty" protobuf:"bytes,4,opt,name=binaryVersion"` // EmulationVersion is the emulation version. It must be in a semver format without leading `v`. // EmulationVersion must be less than or equal to BinaryVersion. - // This field is required when Strategy is "OldestEmulationVersion" + // This field is required when strategy is "OldestEmulationVersion" // +optional EmulationVersion string `json:"emulationVersion,omitempty" protobuf:"bytes,5,opt,name=emulationVersion"` // PreferredStrategies indicates the list of strategies for picking the leader for coordinated leader election. @@ -72,7 +74,7 @@ type LeaseCandidateSpec struct { // leader election to make a decision about the final election strategy. This follows as // - If all clients have strategy X as the first element in this list, strategy X will be used. // - If a candidate has strategy [X] and another candidate has strategy [Y, X], Y supersedes X and strategy Y - // will be used + // will be used. // - If a candidate has strategy [X, Y] and another candidate has strategy [Y, X], this is a user error and leader // election will not operate the Lease until resolved. // (Alpha) Using this field requires the CoordinatedLeaderElection feature gate to be enabled. diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go index f071dd33acc..98871213c9f 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate.go @@ -18,6 +18,7 @@ package leaderelection import ( "context" + "reflect" "time" v1 "k8s.io/api/coordination/v1" @@ -37,11 +38,15 @@ import ( const requeueInterval = 5 * time.Minute +type CacheSyncWaiter interface { + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool +} + type LeaseCandidate struct { - LeaseClient coordinationv1alpha1client.LeaseCandidateInterface - LeaseCandidateInformer cache.SharedIndexInformer - InformerFactory informers.SharedInformerFactory - HasSynced cache.InformerSynced + leaseClient coordinationv1alpha1client.LeaseCandidateInterface + leaseCandidateInformer cache.SharedIndexInformer + informerFactory informers.SharedInformerFactory + hasSynced cache.InformerSynced // At most there will be one item in this Queue (since we only watch one item) queue workqueue.TypedRateLimitingInterface[int] @@ -52,7 +57,7 @@ type LeaseCandidate struct { // controller lease leaseName string - Clock clock.Clock + clock clock.Clock binaryVersion, emulationVersion string preferredStrategies []v1.CoordinatedLeaseStrategy @@ -62,10 +67,9 @@ func NewCandidate(clientset kubernetes.Interface, candidateName string, candidateNamespace string, targetLease string, - clock clock.Clock, binaryVersion, emulationVersion string, preferredStrategies []v1.CoordinatedLeaseStrategy, -) (*LeaseCandidate, error) { +) (*LeaseCandidate, CacheSyncWaiter, error) { fieldSelector := fields.OneTermEqualSelector("metadata.name", candidateName).String() // A separate informer factory is required because this must start before informerFactories // are started for leader elected components @@ -78,20 +82,20 @@ func NewCandidate(clientset kubernetes.Interface, leaseCandidateInformer := informerFactory.Coordination().V1alpha1().LeaseCandidates().Informer() lc := &LeaseCandidate{ - LeaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace), - LeaseCandidateInformer: leaseCandidateInformer, - InformerFactory: informerFactory, + leaseClient: clientset.CoordinationV1alpha1().LeaseCandidates(candidateNamespace), + leaseCandidateInformer: leaseCandidateInformer, + informerFactory: informerFactory, name: candidateName, namespace: candidateNamespace, leaseName: targetLease, - Clock: clock, + clock: clock.RealClock{}, binaryVersion: binaryVersion, emulationVersion: emulationVersion, preferredStrategies: preferredStrategies, } lc.queue = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[int](), workqueue.TypedRateLimitingQueueConfig[int]{Name: "leasecandidate"}) - synced, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + h, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj, newObj interface{}) { if leasecandidate, ok := newObj.(*v1alpha1.LeaseCandidate); ok { if leasecandidate.Spec.PingTime != nil { @@ -101,18 +105,18 @@ func NewCandidate(clientset kubernetes.Interface, }, }) if err != nil { - return nil, err + return nil, nil, err } - lc.HasSynced = synced.HasSynced + lc.hasSynced = h.HasSynced - return lc, nil + return lc, informerFactory, nil } func (c *LeaseCandidate) Run(ctx context.Context) { defer c.queue.ShutDown() - go c.InformerFactory.Start(ctx.Done()) - if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.HasSynced) { + go c.informerFactory.Start(ctx.Done()) + if !cache.WaitForNamedCacheSync("leasecandidateclient", ctx.Done(), c.hasSynced) { return } @@ -153,12 +157,12 @@ func (c *LeaseCandidate) enqueueLease() { // ensureLease creates the lease if it does not exist and renew it if it exists. Returns the lease and // a bool (true if this call created the lease), or any error that occurs. func (c *LeaseCandidate) ensureLease(ctx context.Context) error { - lease, err := c.LeaseClient.Get(ctx, c.name, metav1.GetOptions{}) + lease, err := c.leaseClient.Get(ctx, c.name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { klog.V(2).Infof("Creating lease candidate") // lease does not exist, create it. leaseToCreate := c.newLease() - _, err := c.LeaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}) + _, err := c.leaseClient.Create(ctx, leaseToCreate, metav1.CreateOptions{}) if err != nil { return err } @@ -169,9 +173,9 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error { } klog.V(2).Infof("lease candidate exists.. renewing") clone := lease.DeepCopy() - clone.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()} + clone.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} clone.Spec.PingTime = nil - _, err = c.LeaseClient.Update(ctx, clone, metav1.UpdateOptions{}) + _, err = c.leaseClient.Update(ctx, clone, metav1.UpdateOptions{}) if err != nil { return err } @@ -191,6 +195,6 @@ func (c *LeaseCandidate) newLease() *v1alpha1.LeaseCandidate { PreferredStrategies: c.preferredStrategies, }, } - lease.Spec.RenewTime = &metav1.MicroTime{Time: c.Clock.Now()} + lease.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()} return lease } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go index 5265abfc387..43e0d2d11ae 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leasecandidate_test.go @@ -26,7 +26,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/fake" - "k8s.io/utils/clock" ) type testcase struct { @@ -47,12 +46,11 @@ func TestLeaseCandidateCreation(t *testing.T) { defer cancel() client := fake.NewSimpleClientset() - candidate, err := NewCandidate( + candidate, _, err := NewCandidate( client, tc.candidateName, tc.candidateNamespace, tc.leaseName, - clock.RealClock{}, tc.binaryVersion, tc.emulationVersion, []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, @@ -82,12 +80,11 @@ func TestLeaseCandidateAck(t *testing.T) { client := fake.NewSimpleClientset() - candidate, err := NewCandidate( + candidate, _, err := NewCandidate( client, tc.candidateName, tc.candidateNamespace, tc.leaseName, - clock.RealClock{}, tc.binaryVersion, tc.emulationVersion, []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, diff --git a/test/integration/apiserver/coordinated_leader_election_test.go b/test/integration/apiserver/coordinated_leader_election_test.go index d44474bcceb..8d2ce297b25 100644 --- a/test/integration/apiserver/coordinated_leader_election_test.go +++ b/test/integration/apiserver/coordinated_leader_election_test.go @@ -35,8 +35,6 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" - "k8s.io/utils/clock" - apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/test/integration/framework" ) @@ -199,12 +197,11 @@ func (t cleTest) createAndRunFakeLegacyController(name string, namespace string, } func (t cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) { - identityLease, err := leaderelection.NewCandidate( + identityLease, _, err := leaderelection.NewCandidate( t.clientset, name, namespace, targetLease, - clock.RealClock{}, binaryVersion, compatibilityVersion, []v1.CoordinatedLeaseStrategy{"OldestEmulationVersion"},