Merge pull request #129973 from Henrywu573/myfeature

Add third party strategy to the coordinate leader election integratio…
This commit is contained in:
Kubernetes Prow Robot 2025-03-04 14:49:44 -08:00 committed by GitHub
commit dc1199a9fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 150 additions and 27 deletions

View File

@ -374,7 +374,7 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
orig := existing.DeepCopy()
isExpired := isLeaseExpired(c.clock, existing)
noHolderIdentity := leaderLease.Spec.HolderIdentity != nil && existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity == ""
noHolderIdentity := leaderLease.Spec.HolderIdentity != nil && (existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity == "")
expiredAndNewHolder := isExpired && leaderLease.Spec.HolderIdentity != nil && *existing.Spec.HolderIdentity != *leaderLease.Spec.HolderIdentity
strategyChanged := existing.Spec.Strategy == nil || *existing.Spec.Strategy != strategy
differentHolder := leaderLease.Spec.HolderIdentity != nil && *leaderLease.Spec.HolderIdentity != *existing.Spec.HolderIdentity
@ -402,7 +402,11 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na
}
if reflect.DeepEqual(existing, orig) {
klog.V(5).Infof("Lease %s already has the most optimal leader %q", leaseNN, *existing.Spec.HolderIdentity)
if existing.Spec.HolderIdentity != nil {
klog.V(5).Infof("Lease %s is managed by a third party strategy", *existing.Spec.HolderIdentity)
} else {
klog.V(5).Infof("Lease %s already has the most optimal leader %q", leaseNN, "")
}
// We need to requeue to ensure that we are aware of an expired lease
return defaultRequeueInterval, nil
}

View File

@ -39,9 +39,21 @@ import (
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/ptr"
)
func TestSingleLeaseCandidate(t *testing.T) {
tests := []struct {
name string
preferredStrategy v1.CoordinatedLeaseStrategy
expectedHolderIdentity *string
}{
{
name: "default strategy, has lease identity",
preferredStrategy: v1.OldestEmulationVersion,
expectedHolderIdentity: ptr.To("foo1"),
},
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1alpha2.SchemeGroupVersion)}
@ -51,16 +63,72 @@ func TestSingleLeaseCandidate(t *testing.T) {
}
defer server.TearDownFn()
config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0")
cletest.pollForLease(ctx, "foo", "default", "foo1")
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0", tc.preferredStrategy)
cletest.pollForLease(ctx, "foo", "default", tc.expectedHolderIdentity)
})
}
}
func TestSingleLeaseCandidateUsingThirdPartyStrategy(t *testing.T) {
tests := []struct {
name string
preferredStrategy v1.CoordinatedLeaseStrategy
expectedHolderIdentity *string
}{
{
name: "third party strategy, no holder identity",
preferredStrategy: v1.CoordinatedLeaseStrategy("foo.com/bar"),
// Because a third-party CoordinatedLeaseStrategy is in use,
// the coordinated leader election controller doesn't manage
// the lease's leader election and does not set the holder identity,
// and leave it to some other controller. The lease will be created
// with the strategy set but holderIdentity set to nil.
expectedHolderIdentity: nil,
},
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1alpha2.SchemeGroupVersion)}
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), flags, framework.SharedEtcd())
if err != nil {
t.Fatal(err)
}
defer server.TearDownFn()
config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0", tc.preferredStrategy)
cletest.pollForLease(ctx, "foo", "default", tc.expectedHolderIdentity)
})
}
}
func TestMultipleLeaseCandidate(t *testing.T) {
tests := []struct {
name string
preferredStrategy v1.CoordinatedLeaseStrategy
expectedHolderIdentity *string
}{
{
name: "default strategy, has lease identity",
preferredStrategy: v1.OldestEmulationVersion,
// Because the OldestEmulationVersion strategy is used here, the
// the coordinated leader election controller will pick the candidate
// with OldestEmulationVersion and OldestBinaryVersion.
expectedHolderIdentity: ptr.To("baz3"),
},
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1alpha2.SchemeGroupVersion)}
@ -73,14 +141,62 @@ func TestMultipleLeaseCandidate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("baz1", "default", "baz", "1.20.0", "1.20.0")
go cletest.createAndRunFakeController("baz2", "default", "baz", "1.20.0", "1.19.0")
go cletest.createAndRunFakeController("baz3", "default", "baz", "1.19.0", "1.19.0")
go cletest.createAndRunFakeController("baz4", "default", "baz", "1.2.0", "1.19.0")
go cletest.createAndRunFakeController("baz5", "default", "baz", "1.20.0", "1.19.0")
cletest.pollForLease(ctx, "baz", "default", "baz3")
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("baz1", "default", "baz", "1.20.0", "1.20.0", tc.preferredStrategy)
go cletest.createAndRunFakeController("baz2", "default", "baz", "1.20.0", "1.19.0", tc.preferredStrategy)
go cletest.createAndRunFakeController("baz3", "default", "baz", "1.19.0", "1.19.0", tc.preferredStrategy)
go cletest.createAndRunFakeController("baz4", "default", "baz", "1.2.0", "1.19.0", tc.preferredStrategy)
go cletest.createAndRunFakeController("baz5", "default", "baz", "1.20.0", "1.19.0", tc.preferredStrategy)
cletest.pollForLease(ctx, "baz", "default", tc.expectedHolderIdentity)
})
}
}
func TestMultipleLeaseCandidateUsingThirdPartyStrategy(t *testing.T) {
tests := []struct {
name string
preferredStrategy v1.CoordinatedLeaseStrategy
expectedHolderIdentity *string
}{
{
name: "third party strategy, no holder identity",
preferredStrategy: v1.CoordinatedLeaseStrategy("foo.com/bar"),
// Because a third-party CoordinatedLeaseStrategy is in use,
// the coordinated leader election controller doesn't manage
// the lease's leader election and does not set the holder identity,
// and leave it to some other controller. The lease will be created
// with the strategy set but holderIdentity set to nil.
expectedHolderIdentity: nil,
},
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1alpha2.SchemeGroupVersion)}
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), flags, framework.SharedEtcd())
if err != nil {
t.Fatal(err)
}
defer server.TearDownFn()
config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("baz1", "default", "baz", "1.20.0", "1.20.0", tc.preferredStrategy)
go cletest.createAndRunFakeController("baz2", "default", "baz", "1.20.0", "1.19.0", tc.preferredStrategy)
go cletest.createAndRunFakeController("baz3", "default", "baz", "1.19.0", "1.19.0", tc.preferredStrategy)
go cletest.createAndRunFakeController("baz4", "default", "baz", "1.2.0", "1.19.0", tc.preferredStrategy)
go cletest.createAndRunFakeController("baz5", "default", "baz", "1.20.0", "1.19.0", tc.preferredStrategy)
cletest.pollForLease(ctx, "baz", "default", tc.expectedHolderIdentity)
})
}
}
func TestLeaseSwapIfBetterAvailable(t *testing.T) {
@ -98,10 +214,10 @@ func TestLeaseSwapIfBetterAvailable(t *testing.T) {
cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup()
go cletest.createAndRunFakeController("bar1", "default", "bar", "1.20.0", "1.20.0")
cletest.pollForLease(ctx, "bar", "default", "bar1")
go cletest.createAndRunFakeController("bar2", "default", "bar", "1.19.0", "1.19.0")
cletest.pollForLease(ctx, "bar", "default", "bar2")
go cletest.createAndRunFakeController("bar1", "default", "bar", "1.20.0", "1.20.0", v1.OldestEmulationVersion)
cletest.pollForLease(ctx, "bar", "default", ptr.To("bar1"))
go cletest.createAndRunFakeController("bar2", "default", "bar", "1.19.0", "1.19.0", v1.OldestEmulationVersion)
cletest.pollForLease(ctx, "bar", "default", ptr.To("bar2"))
}
// TestUpgradeSkew tests that a legacy client and a CLE aware client operating on the same lease do not cause errors
@ -122,12 +238,12 @@ func TestUpgradeSkew(t *testing.T) {
defer cletest.cleanup()
go cletest.createAndRunFakeLegacyController("foo1-130", "default", "foobar")
cletest.pollForLease(ctx, "foobar", "default", "foo1-130")
go cletest.createAndRunFakeController("foo1-131", "default", "foobar", "1.31.0", "1.31.0")
cletest.pollForLease(ctx, "foobar", "default", ptr.To("foo1-130"))
go cletest.createAndRunFakeController("foo1-131", "default", "foobar", "1.31.0", "1.31.0", v1.OldestEmulationVersion)
// running a new controller should not kick off old leader
cletest.pollForLease(ctx, "foobar", "default", "foo1-130")
cletest.pollForLease(ctx, "foobar", "default", ptr.To("foo1-130"))
cletest.cancelController("foo1-130", "default")
cletest.pollForLease(ctx, "foobar", "default", "foo1-131")
cletest.pollForLease(ctx, "foobar", "default", ptr.To("foo1-131"))
}
func TestLeaseCandidateCleanup(t *testing.T) {
@ -215,7 +331,7 @@ func (t *cleTest) createAndRunFakeLegacyController(name string, namespace string
})
}
func (t *cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) {
func (t *cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string, preferredStrategy v1.CoordinatedLeaseStrategy) {
identityLease, _, err := leaderelection.NewCandidate(
t.clientset,
namespace,
@ -223,7 +339,7 @@ func (t *cleTest) createAndRunFakeController(name string, namespace string, targ
targetLease,
binaryVersion,
compatibilityVersion,
v1.OldestEmulationVersion,
preferredStrategy,
)
if err != nil {
t.t.Error(err)
@ -286,14 +402,17 @@ func leaderElectAndRun(ctx context.Context, kubeconfig *rest.Config, lockIdentit
})
}
func (t *cleTest) pollForLease(ctx context.Context, name, namespace, holder string) {
func (t *cleTest) pollForLease(ctx context.Context, name, namespace string, holder *string) {
err := wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 25*time.Second, true, func(ctx context.Context) (done bool, err error) {
lease, err := t.clientset.CoordinationV1().Leases(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
fmt.Println(err)
return false, nil
}
return lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == holder, nil
if holder == nil {
return lease.Spec.HolderIdentity == nil, nil
}
return lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == *holder, nil
})
if err != nil {
t.t.Fatalf("timeout awiting for Lease %s %s err: %v", name, namespace, err)