diff --git a/pkg/controlplane/controller/leaderelection/leaderelection_controller.go b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go index b33e2a8ec48..e606d085741 100644 --- a/pkg/controlplane/controller/leaderelection/leaderelection_controller.go +++ b/pkg/controlplane/controller/leaderelection/leaderelection_controller.go @@ -374,7 +374,7 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na orig := existing.DeepCopy() isExpired := isLeaseExpired(c.clock, existing) - noHolderIdentity := leaderLease.Spec.HolderIdentity != nil && existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity == "" + noHolderIdentity := leaderLease.Spec.HolderIdentity != nil && (existing.Spec.HolderIdentity == nil || *existing.Spec.HolderIdentity == "") expiredAndNewHolder := isExpired && leaderLease.Spec.HolderIdentity != nil && *existing.Spec.HolderIdentity != *leaderLease.Spec.HolderIdentity strategyChanged := existing.Spec.Strategy == nil || *existing.Spec.Strategy != strategy differentHolder := leaderLease.Spec.HolderIdentity != nil && *leaderLease.Spec.HolderIdentity != *existing.Spec.HolderIdentity @@ -402,7 +402,11 @@ func (c *Controller) reconcileElectionStep(ctx context.Context, leaseNN types.Na } if reflect.DeepEqual(existing, orig) { - klog.V(5).Infof("Lease %s already has the most optimal leader %q", leaseNN, *existing.Spec.HolderIdentity) + if existing.Spec.HolderIdentity != nil { + klog.V(5).Infof("Lease %s is managed by a third party strategy", *existing.Spec.HolderIdentity) + } else { + klog.V(5).Infof("Lease %s already has the most optimal leader %q", leaseNN, "") + } // We need to requeue to ensure that we are aware of an expired lease return defaultRequeueInterval, nil } diff --git a/test/integration/apiserver/coordinated_leader_election_test.go b/test/integration/apiserver/coordinated_leader_election_test.go index 505b9ee2308..af992d783b9 100644 --- a/test/integration/apiserver/coordinated_leader_election_test.go +++ b/test/integration/apiserver/coordinated_leader_election_test.go @@ -39,9 +39,21 @@ import ( apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controlplane/apiserver" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/utils/ptr" ) func TestSingleLeaseCandidate(t *testing.T) { + tests := []struct { + name string + preferredStrategy v1.CoordinatedLeaseStrategy + expectedHolderIdentity *string + }{ + { + name: "default strategy, has lease identity", + preferredStrategy: v1.OldestEmulationVersion, + expectedHolderIdentity: ptr.To("foo1"), + }, + } featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1alpha2.SchemeGroupVersion)} @@ -51,16 +63,72 @@ func TestSingleLeaseCandidate(t *testing.T) { } defer server.TearDownFn() config := server.ClientConfig - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cletest := setupCLE(config, ctx, cancel, t) - defer cletest.cleanup() - go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0") - cletest.pollForLease(ctx, "foo", "default", "foo1") + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cletest := setupCLE(config, ctx, cancel, t) + defer cletest.cleanup() + go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0", tc.preferredStrategy) + cletest.pollForLease(ctx, "foo", "default", tc.expectedHolderIdentity) + }) + } +} + +func TestSingleLeaseCandidateUsingThirdPartyStrategy(t *testing.T) { + tests := []struct { + name string + preferredStrategy v1.CoordinatedLeaseStrategy + expectedHolderIdentity *string + }{ + { + name: "third party strategy, no holder identity", + preferredStrategy: v1.CoordinatedLeaseStrategy("foo.com/bar"), + // Because a third-party CoordinatedLeaseStrategy is in use, + // the coordinated leader election controller doesn't manage + // the lease's leader election and does not set the holder identity, + // and leave it to some other controller. The lease will be created + // with the strategy set but holderIdentity set to nil. + expectedHolderIdentity: nil, + }, + } + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) + flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1alpha2.SchemeGroupVersion)} + server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), flags, framework.SharedEtcd()) + if err != nil { + t.Fatal(err) + } + defer server.TearDownFn() + config := server.ClientConfig + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cletest := setupCLE(config, ctx, cancel, t) + defer cletest.cleanup() + go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0", tc.preferredStrategy) + cletest.pollForLease(ctx, "foo", "default", tc.expectedHolderIdentity) + }) + } } func TestMultipleLeaseCandidate(t *testing.T) { + tests := []struct { + name string + preferredStrategy v1.CoordinatedLeaseStrategy + expectedHolderIdentity *string + }{ + { + name: "default strategy, has lease identity", + preferredStrategy: v1.OldestEmulationVersion, + // Because the OldestEmulationVersion strategy is used here, the + // the coordinated leader election controller will pick the candidate + // with OldestEmulationVersion and OldestBinaryVersion. + expectedHolderIdentity: ptr.To("baz3"), + }, + } featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1alpha2.SchemeGroupVersion)} @@ -73,14 +141,62 @@ func TestMultipleLeaseCandidate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cletest := setupCLE(config, ctx, cancel, t) - defer cletest.cleanup() - go cletest.createAndRunFakeController("baz1", "default", "baz", "1.20.0", "1.20.0") - go cletest.createAndRunFakeController("baz2", "default", "baz", "1.20.0", "1.19.0") - go cletest.createAndRunFakeController("baz3", "default", "baz", "1.19.0", "1.19.0") - go cletest.createAndRunFakeController("baz4", "default", "baz", "1.2.0", "1.19.0") - go cletest.createAndRunFakeController("baz5", "default", "baz", "1.20.0", "1.19.0") - cletest.pollForLease(ctx, "baz", "default", "baz3") + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + cletest := setupCLE(config, ctx, cancel, t) + defer cletest.cleanup() + go cletest.createAndRunFakeController("baz1", "default", "baz", "1.20.0", "1.20.0", tc.preferredStrategy) + go cletest.createAndRunFakeController("baz2", "default", "baz", "1.20.0", "1.19.0", tc.preferredStrategy) + go cletest.createAndRunFakeController("baz3", "default", "baz", "1.19.0", "1.19.0", tc.preferredStrategy) + go cletest.createAndRunFakeController("baz4", "default", "baz", "1.2.0", "1.19.0", tc.preferredStrategy) + go cletest.createAndRunFakeController("baz5", "default", "baz", "1.20.0", "1.19.0", tc.preferredStrategy) + cletest.pollForLease(ctx, "baz", "default", tc.expectedHolderIdentity) + }) + } +} + +func TestMultipleLeaseCandidateUsingThirdPartyStrategy(t *testing.T) { + tests := []struct { + name string + preferredStrategy v1.CoordinatedLeaseStrategy + expectedHolderIdentity *string + }{ + { + name: "third party strategy, no holder identity", + preferredStrategy: v1.CoordinatedLeaseStrategy("foo.com/bar"), + // Because a third-party CoordinatedLeaseStrategy is in use, + // the coordinated leader election controller doesn't manage + // the lease's leader election and does not set the holder identity, + // and leave it to some other controller. The lease will be created + // with the strategy set but holderIdentity set to nil. + expectedHolderIdentity: nil, + }, + } + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) + flags := []string{fmt.Sprintf("--runtime-config=%s=true", v1alpha2.SchemeGroupVersion)} + + server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), flags, framework.SharedEtcd()) + if err != nil { + t.Fatal(err) + } + defer server.TearDownFn() + config := server.ClientConfig + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cletest := setupCLE(config, ctx, cancel, t) + defer cletest.cleanup() + go cletest.createAndRunFakeController("baz1", "default", "baz", "1.20.0", "1.20.0", tc.preferredStrategy) + go cletest.createAndRunFakeController("baz2", "default", "baz", "1.20.0", "1.19.0", tc.preferredStrategy) + go cletest.createAndRunFakeController("baz3", "default", "baz", "1.19.0", "1.19.0", tc.preferredStrategy) + go cletest.createAndRunFakeController("baz4", "default", "baz", "1.2.0", "1.19.0", tc.preferredStrategy) + go cletest.createAndRunFakeController("baz5", "default", "baz", "1.20.0", "1.19.0", tc.preferredStrategy) + cletest.pollForLease(ctx, "baz", "default", tc.expectedHolderIdentity) + }) + } } func TestLeaseSwapIfBetterAvailable(t *testing.T) { @@ -98,10 +214,10 @@ func TestLeaseSwapIfBetterAvailable(t *testing.T) { cletest := setupCLE(config, ctx, cancel, t) defer cletest.cleanup() - go cletest.createAndRunFakeController("bar1", "default", "bar", "1.20.0", "1.20.0") - cletest.pollForLease(ctx, "bar", "default", "bar1") - go cletest.createAndRunFakeController("bar2", "default", "bar", "1.19.0", "1.19.0") - cletest.pollForLease(ctx, "bar", "default", "bar2") + go cletest.createAndRunFakeController("bar1", "default", "bar", "1.20.0", "1.20.0", v1.OldestEmulationVersion) + cletest.pollForLease(ctx, "bar", "default", ptr.To("bar1")) + go cletest.createAndRunFakeController("bar2", "default", "bar", "1.19.0", "1.19.0", v1.OldestEmulationVersion) + cletest.pollForLease(ctx, "bar", "default", ptr.To("bar2")) } // TestUpgradeSkew tests that a legacy client and a CLE aware client operating on the same lease do not cause errors @@ -122,12 +238,12 @@ func TestUpgradeSkew(t *testing.T) { defer cletest.cleanup() go cletest.createAndRunFakeLegacyController("foo1-130", "default", "foobar") - cletest.pollForLease(ctx, "foobar", "default", "foo1-130") - go cletest.createAndRunFakeController("foo1-131", "default", "foobar", "1.31.0", "1.31.0") + cletest.pollForLease(ctx, "foobar", "default", ptr.To("foo1-130")) + go cletest.createAndRunFakeController("foo1-131", "default", "foobar", "1.31.0", "1.31.0", v1.OldestEmulationVersion) // running a new controller should not kick off old leader - cletest.pollForLease(ctx, "foobar", "default", "foo1-130") + cletest.pollForLease(ctx, "foobar", "default", ptr.To("foo1-130")) cletest.cancelController("foo1-130", "default") - cletest.pollForLease(ctx, "foobar", "default", "foo1-131") + cletest.pollForLease(ctx, "foobar", "default", ptr.To("foo1-131")) } func TestLeaseCandidateCleanup(t *testing.T) { @@ -215,7 +331,7 @@ func (t *cleTest) createAndRunFakeLegacyController(name string, namespace string }) } -func (t *cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) { +func (t *cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string, preferredStrategy v1.CoordinatedLeaseStrategy) { identityLease, _, err := leaderelection.NewCandidate( t.clientset, namespace, @@ -223,7 +339,7 @@ func (t *cleTest) createAndRunFakeController(name string, namespace string, targ targetLease, binaryVersion, compatibilityVersion, - v1.OldestEmulationVersion, + preferredStrategy, ) if err != nil { t.t.Error(err) @@ -286,14 +402,17 @@ func leaderElectAndRun(ctx context.Context, kubeconfig *rest.Config, lockIdentit }) } -func (t *cleTest) pollForLease(ctx context.Context, name, namespace, holder string) { +func (t *cleTest) pollForLease(ctx context.Context, name, namespace string, holder *string) { err := wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 25*time.Second, true, func(ctx context.Context) (done bool, err error) { lease, err := t.clientset.CoordinationV1().Leases(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { fmt.Println(err) return false, nil } - return lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == holder, nil + if holder == nil { + return lease.Spec.HolderIdentity == nil, nil + } + return lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == *holder, nil }) if err != nil { t.t.Fatalf("timeout awiting for Lease %s %s err: %v", name, namespace, err)