Merge pull request #128505 from Jefftree/fix-cle-lock-acquisition

Fix CLE leader lock acquisition
This commit is contained in:
Kubernetes Prow Robot 2024-11-06 00:35:42 +00:00 committed by GitHub
commit 67a4d20970
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 244 additions and 47 deletions

View File

@ -22,12 +22,20 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
)
var (
// TODO: Eventually these should be configurable
LeaseDuration = 15 * time.Second
RenewDeadline = 10 * time.Second
RetryPeriod = 2 * time.Second
)
type NewRunner func() (func(ctx context.Context, workers int), error)
// RunWithLeaderElection runs the provided runner function with leader election.
@ -36,58 +44,56 @@ type NewRunner func() (func(ctx context.Context, workers int), error)
// RunWithLeaderElection only returns when the context is done, or initial
// leader election fails.
func RunWithLeaderElection(ctx context.Context, config *rest.Config, newRunnerFn NewRunner) {
var cancel context.CancelFunc
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
ctx, cancel = context.WithCancel(ctx)
var err error
run, err := newRunnerFn()
if err != nil {
klog.Infof("Error creating runner: %v", err)
return
}
run(ctx, 1)
},
OnStoppedLeading: func() {
if cancel != nil {
cancel()
}
},
}
hostname, err := os.Hostname()
if err != nil {
klog.Infof("Error parsing hostname: %v", err)
return
}
identity := hostname + "_" + string(uuid.NewUUID())
rl, err := resourcelock.NewFromKubeconfig(
"leases",
"kube-system",
controllerName,
resourcelock.ResourceLockConfig{
Identity: hostname + "_" + string(uuid.NewUUID()),
},
config,
10,
)
if err != nil {
klog.Infof("Error creating resourcelock: %v", err)
return
}
wait.Until(func() {
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
var err error
run, err := newRunnerFn()
if err != nil {
klog.Infof("Error creating runner: %v", err)
return
}
run(ctx, 1)
},
OnStoppedLeading: func() {
},
}
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: callbacks,
Name: controllerName,
})
if err != nil {
klog.Infof("Error creating leader elector: %v", err)
return
}
le.Run(ctx)
rl, err := resourcelock.NewFromKubeconfig(
"leases",
"kube-system",
controllerName,
resourcelock.ResourceLockConfig{
Identity: identity,
},
config,
10,
)
if err != nil {
klog.Infof("Error creating resourcelock: %v", err)
return
}
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: LeaseDuration,
RenewDeadline: RenewDeadline,
RetryPeriod: RetryPeriod,
Callbacks: callbacks,
Name: controllerName,
ReleaseOnCancel: true,
})
if err != nil {
klog.Infof("Error creating leader elector: %v", err)
return
}
le.Run(ctx)
}, RetryPeriod, ctx.Done())
}

View File

@ -0,0 +1,164 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"context"
"fmt"
"testing"
"time"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubernetes "k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing"
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controlplane/controller/leaderelection"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/ptr"
)
func TestCoordinatedLeaderElectionLeaseTransfer(t *testing.T) {
// Reset the coordinated leader election variables after the test
defaultLeaseDuration := leaderelection.LeaseDuration
defaultRenewDeadline := leaderelection.RenewDeadline
defaultRetryPeriod := leaderelection.RetryPeriod
defer func() {
leaderelection.LeaseDuration = defaultLeaseDuration
leaderelection.RenewDeadline = defaultRenewDeadline
leaderelection.RetryPeriod = defaultRetryPeriod
}()
// Use shorter interval for lease duration in integration test
leaderelection.LeaseDuration = 5 * time.Second
leaderelection.RenewDeadline = 3 * time.Second
leaderelection.RetryPeriod = 2 * time.Second
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
etcd := framework.SharedEtcd()
server := apiservertesting.StartTestServerOrDie(t, apiservertesting.NewDefaultTestServerOptions(), nil, etcd)
defer server.TearDownFn()
config := server.ClientConfig
clientset := kubernetes.NewForConfigOrDie(config)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{})
if err != nil {
fmt.Println(err)
return false, nil
}
return lease.Spec.HolderIdentity != nil, nil
})
if err != nil {
t.Fatalf("timeout waiting for Lease %s %s err: %v", "leader-election-controller", "kube-system", err)
}
lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
leaseName := *lease.Spec.HolderIdentity
server2 := apiservertesting.StartTestServerOrDie(t, apiservertesting.NewDefaultTestServerOptions(), nil, etcd)
vap := &admissionregistrationv1.ValidatingAdmissionPolicy{
ObjectMeta: metav1.ObjectMeta{Name: "cle-block-renewal"},
Spec: admissionregistrationv1.ValidatingAdmissionPolicySpec{
FailurePolicy: ptr.To(admissionregistrationv1.Fail),
MatchConstraints: &admissionregistrationv1.MatchResources{
ResourceRules: []admissionregistrationv1.NamedRuleWithOperations{
{
RuleWithOperations: admissionregistrationv1.RuleWithOperations{
Operations: []admissionregistrationv1.OperationType{admissionregistrationv1.OperationAll},
Rule: admissionregistrationv1.Rule{APIGroups: []string{"coordination.k8s.io"}, APIVersions: []string{"v1"}, Resources: []string{"leases"}},
},
},
},
},
Validations: []admissionregistrationv1.Validation{{
Expression: "object.spec.holderIdentity != '" + leaseName + "'",
}},
},
}
_, err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicies().Create(ctx, vap, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
vapBinding := &admissionregistrationv1.ValidatingAdmissionPolicyBinding{
ObjectMeta: metav1.ObjectMeta{Name: "cle-block-renewal"},
Spec: admissionregistrationv1.ValidatingAdmissionPolicyBindingSpec{
PolicyName: "cle-block-renewal",
ValidationActions: []admissionregistrationv1.ValidationAction{
admissionregistrationv1.Deny,
},
},
}
_, err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicyBindings().Create(ctx, vapBinding, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
// Wait until the first apiserver releases the lease and second apiserver takes over the lock
err = wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{})
if err != nil {
fmt.Println(err)
return false, nil
}
return lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity != leaseName, nil
})
if err != nil {
t.Error("Expected the cle lease lock to transition to the second apiserver")
}
// Shutdown the second apiserver
server2.TearDownFn()
// Allow writes again from the first apiserver
err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicies().Delete(ctx, vap.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatal(err)
}
err = clientset.AdmissionregistrationV1().ValidatingAdmissionPolicyBindings().Delete(ctx, vapBinding.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatal(err)
}
// Ensure that the first apiserver is able to reacquire the CLE leader lease
err = wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
lease, err := clientset.CoordinationV1().Leases("kube-system").Get(ctx, "leader-election-controller", metav1.GetOptions{})
if err != nil {
fmt.Println(err)
return false, nil
}
return *lease.Spec.HolderIdentity == leaseName, nil
})
if err != nil {
t.Error("Expected the cle lease lock to transition to the first apiserver")
}
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leaderelection
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}