diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index c12a565c462..83909f4e9fd 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -60,6 +60,10 @@ var ( // IdentityLeaseRenewIntervalPeriod is the interval of kube-apiserver renewing its lease in seconds // IdentityLeaseRenewIntervalPeriod is exposed so integration tests can tune this value. IdentityLeaseRenewIntervalPeriod = 10 * time.Second + + // LeaseCandidateGCPeriod is the interval which the leasecandidate GC controller checks for expired leases + // This is exposed so integration tests can tune this value. + LeaseCandidateGCPeriod = 30 * time.Minute ) const ( @@ -164,7 +168,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele ) gccontroller := leaderelection.NewLeaseCandidateGC( client, - 1*time.Hour, + LeaseCandidateGCPeriod, lcInformer, ) return func(ctx context.Context, workers int) { diff --git a/test/integration/apiserver/coordinated_leader_election_test.go b/test/integration/apiserver/coordinated_leader_election_test.go index e66f11b96f9..55e496d63b6 100644 --- a/test/integration/apiserver/coordinated_leader_election_test.go +++ b/test/integration/apiserver/coordinated_leader_election_test.go @@ -19,6 +19,7 @@ package apiserver import ( "context" "fmt" + "sync" "testing" "time" @@ -36,6 +37,7 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controlplane/apiserver" "k8s.io/kubernetes/test/integration/framework" ) @@ -50,10 +52,11 @@ func TestSingleLeaseCandidate(t *testing.T) { config := server.ClientConfig ctx, cancel := context.WithCancel(context.Background()) + defer cancel() cletest := setupCLE(config, ctx, cancel, t) defer cletest.cleanup() go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0") - cletest.pollForLease("foo", "default", "foo1") + cletest.pollForLease(ctx, "foo", "default", "foo1") } func TestMultipleLeaseCandidate(t *testing.T) { @@ -67,14 +70,15 @@ func TestMultipleLeaseCandidate(t *testing.T) { config := server.ClientConfig ctx, cancel := context.WithCancel(context.Background()) + defer cancel() cletest := setupCLE(config, ctx, cancel, t) defer cletest.cleanup() - go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0") - go cletest.createAndRunFakeController("foo2", "default", "foo", "1.20.0", "1.19.0") - go cletest.createAndRunFakeController("foo3", "default", "foo", "1.19.0", "1.19.0") - go cletest.createAndRunFakeController("foo4", "default", "foo", "1.2.0", "1.19.0") - go cletest.createAndRunFakeController("foo5", "default", "foo", "1.20.0", "1.19.0") - cletest.pollForLease("foo", "default", "foo3") + go cletest.createAndRunFakeController("baz1", "default", "baz", "1.20.0", "1.20.0") + go cletest.createAndRunFakeController("baz2", "default", "baz", "1.20.0", "1.19.0") + go cletest.createAndRunFakeController("baz3", "default", "baz", "1.19.0", "1.19.0") + go cletest.createAndRunFakeController("baz4", "default", "baz", "1.2.0", "1.19.0") + go cletest.createAndRunFakeController("baz5", "default", "baz", "1.20.0", "1.19.0") + cletest.pollForLease(ctx, "baz", "default", "baz3") } func TestLeaseSwapIfBetterAvailable(t *testing.T) { @@ -92,9 +96,9 @@ func TestLeaseSwapIfBetterAvailable(t *testing.T) { defer cletest.cleanup() go cletest.createAndRunFakeController("bar1", "default", "bar", "1.20.0", "1.20.0") - cletest.pollForLease("bar", "default", "bar1") + cletest.pollForLease(ctx, "bar", "default", "bar1") go cletest.createAndRunFakeController("bar2", "default", "bar", "1.19.0", "1.19.0") - cletest.pollForLease("bar", "default", "bar2") + cletest.pollForLease(ctx, "bar", "default", "bar2") } // TestUpgradeSkew tests that a legacy client and a CLE aware client operating on the same lease do not cause errors @@ -109,20 +113,26 @@ func TestUpgradeSkew(t *testing.T) { config := server.ClientConfig ctx, cancel := context.WithCancel(context.Background()) + defer cancel() cletest := setupCLE(config, ctx, cancel, t) defer cletest.cleanup() - go cletest.createAndRunFakeLegacyController("foo1-130", "default", "foo") - cletest.pollForLease("foo", "default", "foo1-130") - go cletest.createAndRunFakeController("foo1-131", "default", "foo", "1.31.0", "1.31.0") + go cletest.createAndRunFakeLegacyController("foo1-130", "default", "foobar") + cletest.pollForLease(ctx, "foobar", "default", "foo1-130") + go cletest.createAndRunFakeController("foo1-131", "default", "foobar", "1.31.0", "1.31.0") // running a new controller should not kick off old leader - cletest.pollForLease("foo", "default", "foo1-130") + cletest.pollForLease(ctx, "foobar", "default", "foo1-130") cletest.cancelController("foo1-130", "default") - cletest.pollForLease("foo", "default", "foo1-131") + cletest.pollForLease(ctx, "foobar", "default", "foo1-131") } func TestLeaseCandidateCleanup(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) + apiserver.LeaseCandidateGCPeriod = 1 * time.Second + + defer func() { + apiserver.LeaseCandidateGCPeriod = 30 * time.Minute + }() server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd()) if err != nil { @@ -140,7 +150,7 @@ func TestLeaseCandidateCleanup(t *testing.T) { Namespace: "default", }, Spec: v1alpha1.LeaseCandidateSpec{ - LeaseName: "foobar", + LeaseName: "foobaz", BinaryVersion: "0.1.0", EmulationVersion: "0.1.0", PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, @@ -175,11 +185,14 @@ type cleTest struct { config *rest.Config clientset *kubernetes.Clientset t *testing.T + mu sync.Mutex ctxList map[string]ctxCancelPair } -func (t cleTest) createAndRunFakeLegacyController(name string, namespace string, targetLease string) { +func (t *cleTest) createAndRunFakeLegacyController(name string, namespace string, targetLease string) { ctx, cancel := context.WithCancel(context.Background()) + t.mu.Lock() + defer t.mu.Unlock() t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel} electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) @@ -197,7 +210,7 @@ func (t cleTest) createAndRunFakeLegacyController(name string, namespace string, }) } -func (t cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) { +func (t *cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) { identityLease, _, err := leaderelection.NewCandidate( t.clientset, namespace, @@ -212,7 +225,9 @@ func (t cleTest) createAndRunFakeController(name string, namespace string, targe } ctx, cancel := context.WithCancel(context.Background()) + t.mu.Lock() t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel} + t.mu.Unlock() go identityLease.Run(ctx) electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) @@ -266,8 +281,8 @@ func leaderElectAndRun(ctx context.Context, kubeconfig *rest.Config, lockIdentit }) } -func (t cleTest) pollForLease(name, namespace, holder string) { - err := wait.PollUntilContextTimeout(t.ctxList["main"].ctx, 1000*time.Millisecond, 25*time.Second, true, func(ctx context.Context) (done bool, err error) { +func (t *cleTest) pollForLease(ctx context.Context, name, namespace, holder string) { + err := wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 25*time.Second, true, func(ctx context.Context) (done bool, err error) { lease, err := t.clientset.CoordinationV1().Leases(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { fmt.Println(err) @@ -280,29 +295,33 @@ func (t cleTest) pollForLease(name, namespace, holder string) { } } -func (t cleTest) cancelController(name, namespace string) { +func (t *cleTest) cancelController(name, namespace string) { + t.mu.Lock() + defer t.mu.Unlock() t.ctxList[name+"/"+namespace].cancel() delete(t.ctxList, name+"/"+namespace) } -func (t cleTest) cleanup() { +func (t *cleTest) cleanup() { + t.mu.Lock() + defer t.mu.Unlock() + for _, c := range t.ctxList { + c.cancel() + } err := t.clientset.CoordinationV1().Leases("kube-system").Delete(context.TODO(), "leader-election-controller", metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { t.t.Error(err) } - for _, c := range t.ctxList { - c.cancel() - } } -func setupCLE(config *rest.Config, ctx context.Context, cancel func(), t *testing.T) cleTest { +func setupCLE(config *rest.Config, ctx context.Context, cancel func(), t *testing.T) *cleTest { clientset, err := kubernetes.NewForConfig(config) if err != nil { t.Fatal(err) } a := ctxCancelPair{ctx, cancel} - return cleTest{ + return &cleTest{ config: config, clientset: clientset, ctxList: map[string]ctxCancelPair{"main": a},