From 1ede4d8f15358fcab0f0f136449139d39a3fe504 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Fri, 25 Oct 2024 16:20:13 +0000 Subject: [PATCH] Allow cle mutual exclusion lock to be reacquired --- .../leaderelection/run_with_leaderelection.go | 100 ++++++----- .../leaderelection_test.go | 164 ++++++++++++++++++ .../coordinatedleaderelection/main_test.go | 27 +++ 3 files changed, 244 insertions(+), 47 deletions(-) create mode 100644 test/integration/apiserver/coordinatedleaderelection/leaderelection_test.go create mode 100644 test/integration/apiserver/coordinatedleaderelection/main_test.go diff --git a/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go b/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go index 0b73170d1f7..399fdb3faf0 100644 --- a/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go +++ b/pkg/controlplane/controller/leaderelection/run_with_leaderelection.go @@ -22,12 +22,20 @@ import ( "time" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" ) +var ( + // TODO: Eventually these should be configurable + LeaseDuration = 15 * time.Second + RenewDeadline = 10 * time.Second + RetryPeriod = 2 * time.Second +) + type NewRunner func() (func(ctx context.Context, workers int), error) // RunWithLeaderElection runs the provided runner function with leader election. @@ -36,58 +44,56 @@ type NewRunner func() (func(ctx context.Context, workers int), error) // RunWithLeaderElection only returns when the context is done, or initial // leader election fails. func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn NewRunner) { - var cancel context.CancelFunc - - callbacks := leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - ctx, cancel = context.WithCancel(ctx) - var err error - run, err := newRunnerFn() - if err != nil { - klog.Infof("Error creating runner: %v", err) - return - } - run(ctx, 1) - }, - OnStoppedLeading: func() { - if cancel != nil { - cancel() - } - }, - } - hostname, err := os.Hostname() if err != nil { klog.Infof("Error parsing hostname: %v", err) return } + identity := hostname + "_" + string(uuid.NewUUID()) - rl, err := resourcelock.NewFromKubeconfig( - "leases", - "kube-system", - controllerName, - resourcelock.ResourceLockConfig{ - Identity: hostname + "_" + string(uuid.NewUUID()), - }, - config, - 10, - ) - if err != nil { - klog.Infof("Error creating resourcelock: %v", err) - return - } + wait.Until(func() { + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + var err error + run, err := newRunnerFn() + if err != nil { + klog.Infof("Error creating runner: %v", err) + return + } + run(ctx, 1) + }, + OnStoppedLeading: func() { + }, + } - le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - Callbacks: callbacks, - Name: controllerName, - }) - if err != nil { - klog.Infof("Error creating leader elector: %v", err) - return - } - le.Run(ctx) + rl, err := resourcelock.NewFromKubeconfig( + "leases", + "kube-system", + controllerName, + resourcelock.ResourceLockConfig{ + Identity: identity, + }, + config, + 10, + ) + if err != nil { + klog.Infof("Error creating resourcelock: %v", err) + return + } + + le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: LeaseDuration, + RenewDeadline: RenewDeadline, + RetryPeriod: RetryPeriod, + Callbacks: callbacks, + Name: controllerName, + ReleaseOnCancel: true, + }) + if err != nil { + klog.Infof("Error creating leader elector: %v", err) + return + } + le.Run(ctx) + }, RetryPeriod, ctx.Done()) } diff --git a/test/integration/apiserver/coordinatedleaderelection/leaderelection_test.go b/test/integration/apiserver/coordinatedleaderelection/leaderelection_test.go new file mode 100644 index 00000000000..59f4ea2ef58 --- /dev/null +++ b/test/integration/apiserver/coordinatedleaderelection/leaderelection_test.go @@ -0,0 +1,164 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "context" + "fmt" + "testing" + "time" + + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + kubernetes "k8s.io/client-go/kubernetes" + featuregatetesting "k8s.io/component-base/featuregate/testing" + apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controlplane/controller/leaderelection" + "k8s.io/kubernetes/test/integration/framework" + "k8s.io/utils/ptr" +) + +func TestCoordinatedLeaderElectionLeaseTransfer(t *testing.T) { + // Reset the coordinated leader election variables after the test + defaultLeaseDuration := leaderelection.LeaseDuration + defaultRenewDeadline := leaderelection.RenewDeadline + defaultRetryPeriod := leaderelection.RetryPeriod + defer func() { + leaderelection.LeaseDuration = defaultLeaseDuration + leaderelection.RenewDeadline = defaultRenewDeadline + leaderelection.RetryPeriod = defaultRetryPeriod + }() + + // Use shorter interval for lease duration in integration test + leaderelection.LeaseDuration = 5 * time.Second + leaderelection.RenewDeadline = 3 * time.Second + leaderelection.RetryPeriod = 2 * time.Second + + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) + etcd := framework.SharedEtcd() + + server := apiservertesting.StartTestServerOrDie(t, apiservertesting.NewDefaultTestServerOptions(), nil, etcd) + defer server.TearDownFn() + + config := server.ClientConfig + clientset := kubernetes.NewForConfigOrDie(config) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{}) + if err != nil { + fmt.Println(err) + return false, nil + } + return lease.Spec.HolderIdentity != nil, nil + }) + + if err != nil { + t.Fatalf("timeout waiting for Lease %s %s err: %v", "leader-election-controller", "kube-system", err) + } + + lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + leaseName := *lease.Spec.HolderIdentity + + server2 := apiservertesting.StartTestServerOrDie(t, apiservertesting.NewDefaultTestServerOptions(), nil, etcd) + vap := &admissionregistrationv1.ValidatingAdmissionPolicy{ + ObjectMeta: metav1.ObjectMeta{Name: "cle-block-renewal"}, + Spec: admissionregistrationv1.ValidatingAdmissionPolicySpec{ + FailurePolicy: ptr.To(admissionregistrationv1.Fail), + MatchConstraints: &admissionregistrationv1.MatchResources{ + ResourceRules: []admissionregistrationv1.NamedRuleWithOperations{ + { + RuleWithOperations: admissionregistrationv1.RuleWithOperations{ + Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.OperationAll}, + Rule: admissionregistrationv1.Rule{APIGroups: []string{"coordination.k8s.io"}, APIVersions: []string{"v1"}, Resources: []string{"leases"}}, + }, + }, + }, + }, + Validations: []admissionregistrationv1.Validation{{ + Expression: "object.spec.holderIdentity != '" + leaseName + "'", + }}, + }, + } + _, err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicies().Create(ctx, vap, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + vapBinding := &admissionregistrationv1.ValidatingAdmissionPolicyBinding{ + ObjectMeta: metav1.ObjectMeta{Name: "cle-block-renewal"}, + Spec: admissionregistrationv1.ValidatingAdmissionPolicyBindingSpec{ + PolicyName: "cle-block-renewal", + ValidationActions: []admissionregistrationv1.ValidationAction{ + admissionregistrationv1.Deny, + }, + }, + } + + _, err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicyBindings().Create(ctx, vapBinding, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + + // Wait until the first apiserver releases the lease and second apiserver takes over the lock + err = wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{}) + if err != nil { + fmt.Println(err) + return false, nil + } + return lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != leaseName, nil + }) + if err != nil { + t.Error("Expected the cle lease lock to transition to the second apiserver") + } + + // Shutdown the second apiserver + server2.TearDownFn() + + // Allow writes again from the first apiserver + err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicies().Delete(ctx, vap.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatal(err) + } + err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicyBindings().Delete(ctx, vapBinding.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatal(err) + } + + // Ensure that the first apiserver is able to reacquire the CLE leader lease + err = wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{}) + if err != nil { + fmt.Println(err) + return false, nil + } + return *lease.Spec.HolderIdentity == leaseName, nil + }) + if err != nil { + t.Error("Expected the cle lease lock to transition to the first apiserver") + } +} diff --git a/test/integration/apiserver/coordinatedleaderelection/main_test.go b/test/integration/apiserver/coordinatedleaderelection/main_test.go new file mode 100644 index 00000000000..cdf70cb7277 --- /dev/null +++ b/test/integration/apiserver/coordinatedleaderelection/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +}