fix flake in TestLeaseCandidateCleanup

This commit is contained in:
Jefftree 2024-07-24 03:42:48 +00:00
parent 919e7abe0f
commit 56b278d5d2
2 changed files with 50 additions and 27 deletions

View File

@ -60,6 +60,10 @@ var (
// IdentityLeaseRenewIntervalPeriod is the interval of kube-apiserver renewing its lease in seconds // IdentityLeaseRenewIntervalPeriod is the interval of kube-apiserver renewing its lease in seconds
// IdentityLeaseRenewIntervalPeriod is exposed so integration tests can tune this value. // IdentityLeaseRenewIntervalPeriod is exposed so integration tests can tune this value.
IdentityLeaseRenewIntervalPeriod = 10 * time.Second IdentityLeaseRenewIntervalPeriod = 10 * time.Second
// LeaseCandidateGCPeriod is the interval which the leasecandidate GC controller checks for expired leases
// This is exposed so integration tests can tune this value.
LeaseCandidateGCPeriod = 30 * time.Minute
) )
const ( const (
@ -164,7 +168,7 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
) )
gccontroller := leaderelection.NewLeaseCandidateGC( gccontroller := leaderelection.NewLeaseCandidateGC(
client, client,
1*time.Hour, LeaseCandidateGCPeriod,
lcInformer, lcInformer,
) )
return func(ctx context.Context, workers int) { return func(ctx context.Context, workers int) {

View File

@ -19,6 +19,7 @@ package apiserver
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"testing" "testing"
"time" "time"
@ -36,6 +37,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
@ -50,10 +52,11 @@ func TestSingleLeaseCandidate(t *testing.T) {
config := server.ClientConfig config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cletest := setupCLE(config, ctx, cancel, t) cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup() defer cletest.cleanup()
go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0") go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0")
cletest.pollForLease("foo", "default", "foo1") cletest.pollForLease(ctx, "foo", "default", "foo1")
} }
func TestMultipleLeaseCandidate(t *testing.T) { func TestMultipleLeaseCandidate(t *testing.T) {
@ -67,14 +70,15 @@ func TestMultipleLeaseCandidate(t *testing.T) {
config := server.ClientConfig config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cletest := setupCLE(config, ctx, cancel, t) cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup() defer cletest.cleanup()
go cletest.createAndRunFakeController("foo1", "default", "foo", "1.20.0", "1.20.0") go cletest.createAndRunFakeController("baz1", "default", "baz", "1.20.0", "1.20.0")
go cletest.createAndRunFakeController("foo2", "default", "foo", "1.20.0", "1.19.0") go cletest.createAndRunFakeController("baz2", "default", "baz", "1.20.0", "1.19.0")
go cletest.createAndRunFakeController("foo3", "default", "foo", "1.19.0", "1.19.0") go cletest.createAndRunFakeController("baz3", "default", "baz", "1.19.0", "1.19.0")
go cletest.createAndRunFakeController("foo4", "default", "foo", "1.2.0", "1.19.0") go cletest.createAndRunFakeController("baz4", "default", "baz", "1.2.0", "1.19.0")
go cletest.createAndRunFakeController("foo5", "default", "foo", "1.20.0", "1.19.0") go cletest.createAndRunFakeController("baz5", "default", "baz", "1.20.0", "1.19.0")
cletest.pollForLease("foo", "default", "foo3") cletest.pollForLease(ctx, "baz", "default", "baz3")
} }
func TestLeaseSwapIfBetterAvailable(t *testing.T) { func TestLeaseSwapIfBetterAvailable(t *testing.T) {
@ -92,9 +96,9 @@ func TestLeaseSwapIfBetterAvailable(t *testing.T) {
defer cletest.cleanup() defer cletest.cleanup()
go cletest.createAndRunFakeController("bar1", "default", "bar", "1.20.0", "1.20.0") go cletest.createAndRunFakeController("bar1", "default", "bar", "1.20.0", "1.20.0")
cletest.pollForLease("bar", "default", "bar1") cletest.pollForLease(ctx, "bar", "default", "bar1")
go cletest.createAndRunFakeController("bar2", "default", "bar", "1.19.0", "1.19.0") go cletest.createAndRunFakeController("bar2", "default", "bar", "1.19.0", "1.19.0")
cletest.pollForLease("bar", "default", "bar2") cletest.pollForLease(ctx, "bar", "default", "bar2")
} }
// TestUpgradeSkew tests that a legacy client and a CLE aware client operating on the same lease do not cause errors // TestUpgradeSkew tests that a legacy client and a CLE aware client operating on the same lease do not cause errors
@ -109,20 +113,26 @@ func TestUpgradeSkew(t *testing.T) {
config := server.ClientConfig config := server.ClientConfig
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cletest := setupCLE(config, ctx, cancel, t) cletest := setupCLE(config, ctx, cancel, t)
defer cletest.cleanup() defer cletest.cleanup()
go cletest.createAndRunFakeLegacyController("foo1-130", "default", "foo") go cletest.createAndRunFakeLegacyController("foo1-130", "default", "foobar")
cletest.pollForLease("foo", "default", "foo1-130") cletest.pollForLease(ctx, "foobar", "default", "foo1-130")
go cletest.createAndRunFakeController("foo1-131", "default", "foo", "1.31.0", "1.31.0") go cletest.createAndRunFakeController("foo1-131", "default", "foobar", "1.31.0", "1.31.0")
// running a new controller should not kick off old leader // running a new controller should not kick off old leader
cletest.pollForLease("foo", "default", "foo1-130") cletest.pollForLease(ctx, "foobar", "default", "foo1-130")
cletest.cancelController("foo1-130", "default") cletest.cancelController("foo1-130", "default")
cletest.pollForLease("foo", "default", "foo1-131") cletest.pollForLease(ctx, "foobar", "default", "foo1-131")
} }
func TestLeaseCandidateCleanup(t *testing.T) { func TestLeaseCandidateCleanup(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.CoordinatedLeaderElection, true)
apiserver.LeaseCandidateGCPeriod = 1 * time.Second
defer func() {
apiserver.LeaseCandidateGCPeriod = 30 * time.Minute
}()
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd()) server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd())
if err != nil { if err != nil {
@ -140,7 +150,7 @@ func TestLeaseCandidateCleanup(t *testing.T) {
Namespace: "default", Namespace: "default",
}, },
Spec: v1alpha1.LeaseCandidateSpec{ Spec: v1alpha1.LeaseCandidateSpec{
LeaseName: "foobar", LeaseName: "foobaz",
BinaryVersion: "0.1.0", BinaryVersion: "0.1.0",
EmulationVersion: "0.1.0", EmulationVersion: "0.1.0",
PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion}, PreferredStrategies: []v1.CoordinatedLeaseStrategy{v1.OldestEmulationVersion},
@ -175,11 +185,14 @@ type cleTest struct {
config *rest.Config config *rest.Config
clientset *kubernetes.Clientset clientset *kubernetes.Clientset
t *testing.T t *testing.T
mu sync.Mutex
ctxList map[string]ctxCancelPair ctxList map[string]ctxCancelPair
} }
func (t cleTest) createAndRunFakeLegacyController(name string, namespace string, targetLease string) { func (t *cleTest) createAndRunFakeLegacyController(name string, namespace string, targetLease string) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.mu.Lock()
defer t.mu.Unlock()
t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel} t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel}
electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
@ -197,7 +210,7 @@ func (t cleTest) createAndRunFakeLegacyController(name string, namespace string,
}) })
} }
func (t cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) { func (t *cleTest) createAndRunFakeController(name string, namespace string, targetLease string, binaryVersion string, compatibilityVersion string) {
identityLease, _, err := leaderelection.NewCandidate( identityLease, _, err := leaderelection.NewCandidate(
t.clientset, t.clientset,
namespace, namespace,
@ -212,7 +225,9 @@ func (t cleTest) createAndRunFakeController(name string, namespace string, targe
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.mu.Lock()
t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel} t.ctxList[name+"/"+namespace] = ctxCancelPair{ctx, cancel}
t.mu.Unlock()
go identityLease.Run(ctx) go identityLease.Run(ctx)
electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) electionChecker := leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
@ -266,8 +281,8 @@ func leaderElectAndRun(ctx context.Context, kubeconfig *rest.Config, lockIdentit
}) })
} }
func (t cleTest) pollForLease(name, namespace, holder string) { func (t *cleTest) pollForLease(ctx context.Context, name, namespace, holder string) {
err := wait.PollUntilContextTimeout(t.ctxList["main"].ctx, 1000*time.Millisecond, 25*time.Second, true, func(ctx context.Context) (done bool, err error) { err := wait.PollUntilContextTimeout(ctx, 1000*time.Millisecond, 25*time.Second, true, func(ctx context.Context) (done bool, err error) {
lease, err := t.clientset.CoordinationV1().Leases(namespace).Get(ctx, name, metav1.GetOptions{}) lease, err := t.clientset.CoordinationV1().Leases(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -280,29 +295,33 @@ func (t cleTest) pollForLease(name, namespace, holder string) {
} }
} }
func (t cleTest) cancelController(name, namespace string) { func (t *cleTest) cancelController(name, namespace string) {
t.mu.Lock()
defer t.mu.Unlock()
t.ctxList[name+"/"+namespace].cancel() t.ctxList[name+"/"+namespace].cancel()
delete(t.ctxList, name+"/"+namespace) delete(t.ctxList, name+"/"+namespace)
} }
func (t cleTest) cleanup() { func (t *cleTest) cleanup() {
t.mu.Lock()
defer t.mu.Unlock()
for _, c := range t.ctxList {
c.cancel()
}
err := t.clientset.CoordinationV1().Leases("kube-system").Delete(context.TODO(), "leader-election-controller", metav1.DeleteOptions{}) err := t.clientset.CoordinationV1().Leases("kube-system").Delete(context.TODO(), "leader-election-controller", metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) { if err != nil && !apierrors.IsNotFound(err) {
t.t.Error(err) t.t.Error(err)
} }
for _, c := range t.ctxList {
c.cancel()
}
} }
func setupCLE(config *rest.Config, ctx context.Context, cancel func(), t *testing.T) cleTest { func setupCLE(config *rest.Config, ctx context.Context, cancel func(), t *testing.T) *cleTest {
clientset, err := kubernetes.NewForConfig(config) clientset, err := kubernetes.NewForConfig(config)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
a := ctxCancelPair{ctx, cancel} a := ctxCancelPair{ctx, cancel}
return cleTest{ return &cleTest{
config: config, config: config,
clientset: clientset, clientset: clientset,
ctxList: map[string]ctxCancelPair{"main": a}, ctxList: map[string]ctxCancelPair{"main": a},