From 8c80d384b2b893c7cd5b0c255393d40c7dccd75d Mon Sep 17 00:00:00 2001 From: googs1025 Date: Wed, 22 Jan 2025 11:54:04 +0800 Subject: [PATCH] feature: add scheduler queuesort plugins integration test --- .../eventhandler/eventhandler_test.go | 2 +- .../scheduler/plugins/plugins_test.go | 151 +++++++++++++++--- .../scheduler/rescheduling_test.go | 2 +- test/integration/scheduler/util.go | 11 +- 4 files changed, 140 insertions(+), 26 deletions(-) diff --git a/test/integration/scheduler/eventhandler/eventhandler_test.go b/test/integration/scheduler/eventhandler/eventhandler_test.go index 495d357d6f9..abaa572abe9 100644 --- a/test/integration/scheduler/eventhandler/eventhandler_test.go +++ b/test/integration/scheduler/eventhandler/eventhandler_test.go @@ -98,7 +98,7 @@ func TestUpdateNodeEvent(t *testing.T) { }}, }) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, true, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), ) diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 71e4b68d1cc..bac4f6cd910 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -26,6 +26,8 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +40,7 @@ import ( clientset "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" featuregatetesting "k8s.io/component-base/featuregate/testing" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" configv1 "k8s.io/kube-scheduler/config/v1" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" @@ -75,6 +78,11 @@ func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory { } } +type QueueSortPlugin struct { + // lessFunc is used to compare two queued pod infos. + lessFunc func(info1, info2 *framework.QueuedPodInfo) bool +} + type PreEnqueuePlugin struct { called int admit bool @@ -282,6 +290,7 @@ func (pp *PermitPlugin) deepCopy() *PermitPlugin { } const ( + queuesortPluginName = "queuesort-plugin" enqueuePluginName = "enqueue-plugin" prefilterPluginName = "prefilter-plugin" postfilterPluginName = "postfilter-plugin" @@ -308,6 +317,25 @@ var _ framework.PreBindPlugin = &PreBindPlugin{} var _ framework.BindPlugin = &BindPlugin{} var _ framework.PostBindPlugin = &PostBindPlugin{} var _ framework.PermitPlugin = &PermitPlugin{} +var _ framework.QueueSortPlugin = &QueueSortPlugin{} + +func (ep *QueueSortPlugin) Name() string { + return queuesortPluginName +} + +func (ep *QueueSortPlugin) Less(info1, info2 *framework.QueuedPodInfo) bool { + if ep.lessFunc != nil { + return ep.lessFunc(info1, info2) + } + // If no custom less function is provided, default to return true. + return true +} + +func NewQueueSortPlugin(lessFunc func(info1, info2 *framework.QueuedPodInfo) bool) *QueueSortPlugin { + return &QueueSortPlugin{ + lessFunc: lessFunc, + } +} func (ep *PreEnqueuePlugin) Name() string { return enqueuePluginName @@ -668,7 +696,7 @@ func TestPreFilterPlugin(t *testing.T) { preFilterPlugin := &PreFilterPlugin{} registry, prof := initRegistryAndConfig(t, preFilterPlugin) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -705,6 +733,85 @@ func TestPreFilterPlugin(t *testing.T) { } } +// TestQueueSortPlugin tests invocation of queueSort plugins. +func TestQueueSortPlugin(t *testing.T) { + tests := []struct { + name string + podNames []string + expectedOrder []string + customLessFunc func(info1, info2 *framework.QueuedPodInfo) bool + }{ + { + name: "timestamp_sort_order", + podNames: []string{"pod-1", "pod-2", "pod-3"}, + expectedOrder: []string{"pod-1", "pod-2", "pod-3"}, + customLessFunc: func(info1, info2 *framework.QueuedPodInfo) bool { + return info1.Timestamp.Before(info2.Timestamp) + }, + }, + { + name: "priority_sort_order", + podNames: []string{"pod-1", "pod-2", "pod-3"}, + expectedOrder: []string{"pod-3", "pod-2", "pod-1"}, // depends on pod priority + customLessFunc: func(info1, info2 *framework.QueuedPodInfo) bool { + p1 := corev1helpers.PodPriority(info1.Pod) + p2 := corev1helpers.PodPriority(info2.Pod) + return (p1 > p2) || (p1 == p2 && info1.Timestamp.Before(info2.Timestamp)) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queueSortPlugin := NewQueueSortPlugin(tt.customLessFunc) + registry, prof := initRegistryAndConfig(t, queueSortPlugin) + + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "queuesort-plugin", nil), 2, false, + scheduler.WithProfiles(prof), + scheduler.WithFrameworkOutOfTreeRegistry(registry)) + defer teardown() + + pods := make([]*v1.Pod, 0, len(tt.podNames)) + for i, name := range tt.podNames { + // Create a pod with different priority. + priority := int32(i + 1) + pod, err := testutils.CreatePausePod(testCtx.ClientSet, + testutils.InitPausePod(&testutils.PausePodConfig{Name: name, Namespace: testCtx.NS.Name, Priority: &priority})) + if err != nil { + t.Fatalf("Error while creating %v: %v", name, err) + } + pods = append(pods, pod) + } + + // Wait for all Pods to be in the scheduling queue. + err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() + if len(pendingPods) == len(pods) { + t.Logf("All Pods are in the pending queue.") + return true, nil + } + t.Logf("Waiting for all Pods to be in the scheduling queue. %d/%d", len(pendingPods), len(pods)) + return false, nil + }) + if err != nil { + t.Fatalf("Failed to observe all Pods in the scheduling queue: %v", err) + } + + actualOrder := make([]string, len(tt.expectedOrder)) + for i := 0; i < len(tt.expectedOrder); i++ { + queueInfo := testutils.NextPodOrDie(t, testCtx) + actualOrder[i] = queueInfo.Pod.Name + t.Logf("Popped Pod %q", queueInfo.Pod.Name) + } + if diff := cmp.Diff(tt.expectedOrder, actualOrder); diff != "" { + t.Errorf("Expected Pod order (-want,+got):\n%s", diff) + } else { + t.Logf("Pods were popped out in the expected order based on custom sorting logic.") + } + }) + } +} + // TestPostFilterPlugin tests invocation of postFilter plugins. func TestPostFilterPlugin(t *testing.T) { numNodes := 1 @@ -848,7 +955,7 @@ func TestPostFilterPlugin(t *testing.T) { }, }}}) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes), + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes), true, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), ) @@ -916,7 +1023,7 @@ func TestScorePlugin(t *testing.T) { scorePlugin := &ScorePlugin{} registry, prof := initRegistryAndConfig(t, scorePlugin) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 10, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 10, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -960,7 +1067,7 @@ func TestNormalizeScorePlugin(t *testing.T) { scoreWithNormalizePlugin := &ScoreWithNormalizePlugin{} registry, prof := initRegistryAndConfig(t, scoreWithNormalizePlugin) - testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, + testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) @@ -1008,7 +1115,7 @@ func TestReservePluginReserve(t *testing.T) { reservePlugin := &ReservePlugin{} registry, prof := initRegistryAndConfig(t, reservePlugin) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1123,7 +1230,7 @@ func TestPrebindPlugin(t *testing.T) { }, }) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, nodesNum, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, nodesNum, true, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1279,7 +1386,7 @@ func TestUnReserveReservePlugins(t *testing.T) { } registry, prof := initRegistryAndConfig(t, pls...) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1373,7 +1480,7 @@ func TestUnReservePermitPlugins(t *testing.T) { } registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(profile), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1445,7 +1552,7 @@ func TestUnReservePreBindPlugins(t *testing.T) { } registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(profile), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1516,7 +1623,7 @@ func TestUnReserveBindPlugins(t *testing.T) { test.plugin.client = testContext.ClientSet - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(profile), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1663,7 +1770,7 @@ func TestBindPlugin(t *testing.T) { }}, }) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), ) @@ -1789,7 +1896,7 @@ func TestPostBindPlugin(t *testing.T) { } registry, prof := initRegistryAndConfig(t, preBindPlugin, postBindPlugin) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1881,7 +1988,7 @@ func TestPermitPlugin(t *testing.T) { perPlugin := &PermitPlugin{name: permitPluginName} registry, prof := initRegistryAndConfig(t, perPlugin) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -1931,7 +2038,7 @@ func TestMultiplePermitPlugins(t *testing.T) { registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) // Create the API server and the scheduler with the test plugin set. - testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2, + testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) @@ -1983,7 +2090,7 @@ func TestPermitPluginsCancelled(t *testing.T) { registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) // Create the API server and the scheduler with the test plugin set. - testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2, + testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) @@ -2046,7 +2153,7 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) { permitPlugin := &PermitPlugin{name: permitPluginName} registry, prof := initRegistryAndConfig(t, permitPlugin) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -2128,7 +2235,7 @@ func TestFilterPlugin(t *testing.T) { filterPlugin := &FilterPlugin{} registry, prof := initRegistryAndConfig(t, filterPlugin) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -2185,7 +2292,7 @@ func TestPreScorePlugin(t *testing.T) { preScorePlugin := &PreScorePlugin{} registry, prof := initRegistryAndConfig(t, preScorePlugin) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -2245,7 +2352,7 @@ func TestPreEnqueuePlugin(t *testing.T) { preFilterPlugin := &PreFilterPlugin{} registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() @@ -2377,7 +2484,7 @@ func TestPreemptWithPermitPlugin(t *testing.T) { }, }) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, true, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), ) @@ -2557,7 +2664,7 @@ func TestActivatePods(t *testing.T) { }) // Create the API server and the scheduler with the test plugin set. - testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, + testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, true, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) @@ -2730,7 +2837,7 @@ func TestPreEnqueuePluginEventsToRegister(t *testing.T) { }}, }) - testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), ) diff --git a/test/integration/scheduler/rescheduling_test.go b/test/integration/scheduler/rescheduling_test.go index be09574293e..51a943dbc40 100644 --- a/test/integration/scheduler/rescheduling_test.go +++ b/test/integration/scheduler/rescheduling_test.go @@ -205,7 +205,7 @@ func TestReScheduling(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. registry, prof := InitRegistryAndConfig(t, nil, test.plugins...) - testCtx, teardown := InitTestSchedulerForFrameworkTest(t, testContext, 2, + testCtx, teardown := InitTestSchedulerForFrameworkTest(t, testContext, 2, true, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer teardown() diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 3b8ff05c62e..4581d3678e2 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -31,6 +31,7 @@ import ( configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" testutils "k8s.io/kubernetes/test/integration/util" @@ -44,10 +45,12 @@ import ( // This should only be called when you want to kill the scheduler alone, away from apiserver. // For example, in scheduler integration tests, recreating apiserver is performance consuming, // then shutdown the scheduler and recreate it between each test case is a better approach. -func InitTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) { +func InitTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, runScheduler bool, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) { testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...) testutils.SyncSchedulerInformerFactory(testCtx) - go testCtx.Scheduler.Run(testCtx.SchedulerCtx) + if runScheduler { + go testCtx.Scheduler.Run(testCtx.SchedulerCtx) + } if nodeCount > 0 { if _, err := testutils.CreateAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil { @@ -105,6 +108,10 @@ func InitRegistryAndConfig(t *testing.T, factory func(plugin framework.Plugin) f plugin := configv1.Plugin{Name: p.Name()} switch p.(type) { + case framework.QueueSortPlugin: + pls.QueueSort.Enabled = append(pls.QueueSort.Enabled, plugin) + // It's intentional to disable the PrioritySort plugin. + pls.QueueSort.Disabled = []configv1.Plugin{{Name: queuesort.Name}} case framework.PreEnqueuePlugin: pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin) case framework.PreFilterPlugin: