Clean shutdown of volumescheduling integration tests

This commit is contained in:
Wojciech Tyczyński 2022-05-27 20:31:32 +02:00
parent 3b61f4ac20
commit 8a959396b8
10 changed files with 52 additions and 150 deletions

View File

@ -353,7 +353,7 @@ func TestSchedulerExtender(t *testing.T) {
},
}
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, scheduler.WithExtenders(extenders...))
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, scheduler.WithExtenders(extenders...))
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
defer testutils.CleanupTest(t, testCtx)

View File

@ -1489,7 +1489,7 @@ func TestBindPlugin(t *testing.T) {
})
// Create the scheduler with the test plugin set.
testCtx := testutils.InitTestSchedulerWithOptions(t, testContext,
testCtx := testutils.InitTestSchedulerWithOptions(t, testContext, 0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
testutils.SyncInformerFactory(testCtx)
@ -2421,7 +2421,7 @@ func TestActivatePods(t *testing.T) {
}
func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext {
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, opts...)
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...)
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)

View File

@ -171,6 +171,7 @@ func TestPreemption(t *testing.T) {
testCtx := testutils.InitTestSchedulerWithOptions(t,
testutils.InitTestAPIServer(t, "preemption", nil),
0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
testutils.SyncInformerFactory(testCtx)
@ -1436,7 +1437,7 @@ func TestPDBInPreemption(t *testing.T) {
}
func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext {
testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), opts...)
testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), 0, opts...)
testutils.SyncInformerFactory(testCtx)
// wraps the NextPod() method to make it appear the preemption has been done already and the nominated node has been set.
f := testCtx.Scheduler.NextPod

View File

@ -56,6 +56,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
0,
scheduler.WithPodInitialBackoffSeconds(0),
scheduler.WithPodMaxBackoffSeconds(0),
)
@ -236,6 +237,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
testCtx = testutils.InitTestSchedulerWithOptions(
t,
testCtx,
0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
scheduler.WithPodInitialBackoffSeconds(0),

View File

@ -267,7 +267,7 @@ func TestMultipleSchedulers(t *testing.T) {
}},
},
})
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, scheduler.WithProfiles(cfg.Profiles...))
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, scheduler.WithProfiles(cfg.Profiles...))
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)

View File

@ -92,6 +92,7 @@ func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *tes
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, strings.ToLower(scorePluginName), nil),
0,
scheduler.WithProfiles(cfg.Profiles...),
)
testutils.SyncInformerFactory(testCtx)
@ -127,6 +128,7 @@ func initTestSchedulerForNodeResourcesTest(t *testing.T) *testutils.TestContext
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, strings.ToLower(noderesources.Name), nil),
0,
scheduler.WithProfiles(cfg.Profiles...),
)
testutils.SyncInformerFactory(testCtx)

View File

@ -342,7 +342,7 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf
testCtx.ClientSet, testCtx.KubeConfig, testCtx.CloseFn = framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(options *options.ServerRunOptions) {
options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"}
options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority", "StorageObjectInUseProtection"}
},
ModifyServerConfig: func(config *controlplane.Config) {
if admission != nil {
@ -380,7 +380,7 @@ func InitTestScheduler(
testCtx *TestContext,
) *TestContext {
// Pod preemption is enabled by default scheduler configuration.
return InitTestSchedulerWithOptions(t, testCtx)
return InitTestSchedulerWithOptions(t, testCtx, 0)
}
// InitTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -388,10 +388,11 @@ func InitTestScheduler(
func InitTestSchedulerWithOptions(
t *testing.T,
testCtx *TestContext,
resyncPeriod time.Duration,
opts ...scheduler.Option,
) *TestContext {
// 1. Create scheduler
testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, 0)
testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, resyncPeriod)
if testCtx.KubeConfig != nil {
dynClient := dynamic.NewForConfigOrDie(testCtx.KubeConfig)
testCtx.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)
@ -489,7 +490,7 @@ func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.Di
// InitTestSchedulerWithNS initializes a test environment and creates API server and scheduler with default
// configuration.
func InitTestSchedulerWithNS(t *testing.T, nsPrefix string, opts ...scheduler.Option) *TestContext {
testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), opts...)
testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), 0, opts...)
SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
return testCtx
@ -512,6 +513,7 @@ func InitTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
})
testCtx := InitTestSchedulerWithOptions(
t, InitTestAPIServer(t, nsPrefix, nil),
0,
scheduler.WithProfiles(cfg.Profiles...))
SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)

View File

@ -18,125 +18,15 @@ package volumescheduling
import (
"context"
"net/http/httptest"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/test/integration/framework"
)
type testContext struct {
closeFn framework.CloseFunc
httpServer *httptest.Server
ns *v1.Namespace
clientSet *clientset.Clientset
informerFactory informers.SharedInformerFactory
scheduler *scheduler.Scheduler
ctx context.Context
cancelFn context.CancelFunc
}
// initTestAPIServer initializes a test environment and creates an API server with default
// configuration. Alpha resources are enabled automatically if the corresponding feature
// is enabled.
func initTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interface) *testContext {
ctx, cancelFunc := context.WithCancel(context.Background())
testCtx := testContext{
ctx: ctx,
cancelFn: cancelFunc,
}
// 1. Create API server
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
resourceConfig := controlplane.DefaultAPIResourceConfigSource()
controlPlaneConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
if admission != nil {
controlPlaneConfig.GenericConfig.AdmissionControl = admission
}
_, testCtx.httpServer, testCtx.closeFn = framework.RunAnAPIServer(controlPlaneConfig)
s := testCtx.httpServer
if nsPrefix != "default" {
testCtx.ns = framework.CreateTestingNamespace(nsPrefix+string(uuid.NewUUID()), t)
} else {
testCtx.ns = framework.CreateTestingNamespace("default", t)
}
// 2. Create kubeclient
testCtx.clientSet = clientset.NewForConfigOrDie(
&restclient.Config{
QPS: -1, Host: s.URL,
ContentConfig: restclient.ContentConfig{
GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"},
},
},
)
return &testCtx
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
// configuration and other options.
func initTestSchedulerWithOptions(
t *testing.T,
testCtx *testContext,
resyncPeriod time.Duration,
) *testContext {
// 1. Create scheduler
testCtx.informerFactory = informers.NewSharedInformerFactory(testCtx.clientSet, resyncPeriod)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: testCtx.clientSet.EventsV1(),
})
var err error
testCtx.scheduler, err = scheduler.New(
testCtx.clientSet,
testCtx.informerFactory,
nil,
profile.NewRecorderFactory(eventBroadcaster),
testCtx.ctx.Done())
if err != nil {
t.Fatalf("Couldn't create scheduler: %v", err)
}
eventBroadcaster.StartRecordingToSink(testCtx.ctx.Done())
testCtx.informerFactory.Start(testCtx.scheduler.StopEverything)
testCtx.informerFactory.WaitForCacheSync(testCtx.scheduler.StopEverything)
go testCtx.scheduler.Run(testCtx.ctx)
return testCtx
}
// cleanupTest deletes the scheduler and the test namespace. It should be called
// at the end of a test.
func cleanupTest(t *testing.T, testCtx *testContext) {
// Kill the scheduler.
testCtx.cancelFn()
// Cleanup nodes.
testCtx.clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
framework.DeleteTestingNamespace(testCtx.ns, t)
testCtx.closeFn()
}
// waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns
// an error if it does not scheduled within the given timeout.
func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
testutil "k8s.io/kubernetes/test/integration/util"
imageutils "k8s.io/kubernetes/test/utils/image"
)
@ -990,19 +991,16 @@ func TestCapacity(t *testing.T) {
// selectedNode annotation from a claim to reschedule volume provision
// on provision failure.
func TestRescheduleProvisioning(t *testing.T) {
// Set feature gates
ctx, cancel := context.WithCancel(context.Background())
testCtx := testutil.InitTestAPIServer(t, "reschedule-volume-provision", nil)
testCtx := initTestAPIServer(t, "reschedule-volume-provision", nil)
clientset := testCtx.clientSet
ns := testCtx.ns.Name
clientset := testCtx.ClientSet
ns := testCtx.NS.Name
defer func() {
cancel()
testCtx.CancelFn()
deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
testCtx.clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
testCtx.closeFn()
testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
testCtx.CloseFn()
}()
ctrl, informerFactory, err := initPVController(t, testCtx, 0)
@ -1038,9 +1036,9 @@ func TestRescheduleProvisioning(t *testing.T) {
}
// Start controller.
go ctrl.Run(ctx)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
go ctrl.Run(testCtx.Ctx)
informerFactory.Start(testCtx.Ctx.Done())
informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
// Validate that the annotation is removed by controller for provision reschedule.
if err := waitForProvisionAnn(clientset, pvc, false); err != nil {
@ -1049,18 +1047,21 @@ func TestRescheduleProvisioning(t *testing.T) {
}
func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
testCtx := initTestSchedulerWithOptions(t, initTestAPIServer(t, nsName, nil), resyncPeriod)
clientset := testCtx.clientSet
ns := testCtx.ns.Name
testCtx := testutil.InitTestSchedulerWithOptions(t, testutil.InitTestAPIServer(t, nsName, nil), resyncPeriod)
testutil.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
clientset := testCtx.ClientSet
ns := testCtx.NS.Name
ctrl, informerFactory, err := initPVController(t, testCtx, provisionDelaySeconds)
if err != nil {
t.Fatalf("Failed to create PV controller: %v", err)
}
go ctrl.Run(testCtx.ctx)
go ctrl.Run(testCtx.Ctx)
// Start informer factory after all controllers are configured and running.
informerFactory.Start(testCtx.ctx.Done())
informerFactory.WaitForCacheSync(testCtx.ctx.Done())
informerFactory.Start(testCtx.Ctx.Done())
informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
// Create shared objects
// Create nodes
@ -1081,17 +1082,17 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod t
return &testConfig{
client: clientset,
ns: ns,
stop: testCtx.ctx.Done(),
stop: testCtx.Ctx.Done(),
teardown: func() {
klog.Infof("test cluster %q start to tear down", ns)
deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
cleanupTest(t, testCtx)
testutil.CleanupTest(t, testCtx)
},
}
}
func initPVController(t *testing.T, testCtx *testContext, provisionDelaySeconds int) (*persistentvolume.PersistentVolumeController, informers.SharedInformerFactory, error) {
clientset := testCtx.clientSet
func initPVController(t *testing.T, testCtx *testutil.TestContext, provisionDelaySeconds int) (*persistentvolume.PersistentVolumeController, informers.SharedInformerFactory, error) {
clientset := testCtx.ClientSet
// Informers factory for controllers
informerFactory := informers.NewSharedInformerFactory(clientset, 0)

View File

@ -31,6 +31,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
testutil "k8s.io/kubernetes/test/integration/util"
)
var (
@ -46,28 +47,31 @@ func mergeNodeLabels(node *v1.Node, labels map[string]string) *v1.Node {
}
func setupClusterForVolumeCapacityPriority(t *testing.T, nsName string, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
textCtx := initTestSchedulerWithOptions(t, initTestAPIServer(t, nsName, nil), resyncPeriod)
clientset := textCtx.clientSet
ns := textCtx.ns.Name
testCtx := testutil.InitTestSchedulerWithOptions(t, testutil.InitTestAPIServer(t, nsName, nil), resyncPeriod)
testutil.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
ctrl, informerFactory, err := initPVController(t, textCtx, provisionDelaySeconds)
clientset := testCtx.ClientSet
ns := testCtx.NS.Name
ctrl, informerFactory, err := initPVController(t, testCtx, provisionDelaySeconds)
if err != nil {
t.Fatalf("Failed to create PV controller: %v", err)
}
go ctrl.Run(textCtx.ctx)
go ctrl.Run(testCtx.Ctx)
// Start informer factory after all controllers are configured and running.
informerFactory.Start(textCtx.ctx.Done())
informerFactory.WaitForCacheSync(textCtx.ctx.Done())
informerFactory.Start(testCtx.Ctx.Done())
informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
return &testConfig{
client: clientset,
ns: ns,
stop: textCtx.ctx.Done(),
stop: testCtx.Ctx.Done(),
teardown: func() {
klog.Infof("test cluster %q start to tear down", ns)
deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
cleanupTest(t, textCtx)
testutil.CleanupTest(t, testCtx)
},
}
}