feature: add scheduler queuesort plugins integration test

This commit is contained in:
googs1025 2025-01-22 11:54:04 +08:00
parent 25be4a4bd4
commit 8c80d384b2
4 changed files with 140 additions and 26 deletions

View File

@ -98,7 +98,7 @@ func TestUpdateNodeEvent(t *testing.T) {
}},
})
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, true,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)

View File

@ -26,6 +26,8 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -38,6 +40,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
featuregatetesting "k8s.io/component-base/featuregate/testing"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
@ -75,6 +78,11 @@ func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory {
}
}
type QueueSortPlugin struct {
// lessFunc is used to compare two queued pod infos.
lessFunc func(info1, info2 *framework.QueuedPodInfo) bool
}
type PreEnqueuePlugin struct {
called int
admit bool
@ -282,6 +290,7 @@ func (pp *PermitPlugin) deepCopy() *PermitPlugin {
}
const (
queuesortPluginName = "queuesort-plugin"
enqueuePluginName = "enqueue-plugin"
prefilterPluginName = "prefilter-plugin"
postfilterPluginName = "postfilter-plugin"
@ -308,6 +317,25 @@ var _ framework.PreBindPlugin = &PreBindPlugin{}
var _ framework.BindPlugin = &BindPlugin{}
var _ framework.PostBindPlugin = &PostBindPlugin{}
var _ framework.PermitPlugin = &PermitPlugin{}
var _ framework.QueueSortPlugin = &QueueSortPlugin{}
func (ep *QueueSortPlugin) Name() string {
return queuesortPluginName
}
func (ep *QueueSortPlugin) Less(info1, info2 *framework.QueuedPodInfo) bool {
if ep.lessFunc != nil {
return ep.lessFunc(info1, info2)
}
// If no custom less function is provided, default to return true.
return true
}
func NewQueueSortPlugin(lessFunc func(info1, info2 *framework.QueuedPodInfo) bool) *QueueSortPlugin {
return &QueueSortPlugin{
lessFunc: lessFunc,
}
}
func (ep *PreEnqueuePlugin) Name() string {
return enqueuePluginName
@ -668,7 +696,7 @@ func TestPreFilterPlugin(t *testing.T) {
preFilterPlugin := &PreFilterPlugin{}
registry, prof := initRegistryAndConfig(t, preFilterPlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -705,6 +733,85 @@ func TestPreFilterPlugin(t *testing.T) {
}
}
// TestQueueSortPlugin tests invocation of queueSort plugins.
func TestQueueSortPlugin(t *testing.T) {
tests := []struct {
name string
podNames []string
expectedOrder []string
customLessFunc func(info1, info2 *framework.QueuedPodInfo) bool
}{
{
name: "timestamp_sort_order",
podNames: []string{"pod-1", "pod-2", "pod-3"},
expectedOrder: []string{"pod-1", "pod-2", "pod-3"},
customLessFunc: func(info1, info2 *framework.QueuedPodInfo) bool {
return info1.Timestamp.Before(info2.Timestamp)
},
},
{
name: "priority_sort_order",
podNames: []string{"pod-1", "pod-2", "pod-3"},
expectedOrder: []string{"pod-3", "pod-2", "pod-1"}, // depends on pod priority
customLessFunc: func(info1, info2 *framework.QueuedPodInfo) bool {
p1 := corev1helpers.PodPriority(info1.Pod)
p2 := corev1helpers.PodPriority(info2.Pod)
return (p1 > p2) || (p1 == p2 && info1.Timestamp.Before(info2.Timestamp))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
queueSortPlugin := NewQueueSortPlugin(tt.customLessFunc)
registry, prof := initRegistryAndConfig(t, queueSortPlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "queuesort-plugin", nil), 2, false,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
pods := make([]*v1.Pod, 0, len(tt.podNames))
for i, name := range tt.podNames {
// Create a pod with different priority.
priority := int32(i + 1)
pod, err := testutils.CreatePausePod(testCtx.ClientSet,
testutils.InitPausePod(&testutils.PausePodConfig{Name: name, Namespace: testCtx.NS.Name, Priority: &priority}))
if err != nil {
t.Fatalf("Error while creating %v: %v", name, err)
}
pods = append(pods, pod)
}
// Wait for all Pods to be in the scheduling queue.
err := wait.PollUntilContextTimeout(testCtx.Ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
if len(pendingPods) == len(pods) {
t.Logf("All Pods are in the pending queue.")
return true, nil
}
t.Logf("Waiting for all Pods to be in the scheduling queue. %d/%d", len(pendingPods), len(pods))
return false, nil
})
if err != nil {
t.Fatalf("Failed to observe all Pods in the scheduling queue: %v", err)
}
actualOrder := make([]string, len(tt.expectedOrder))
for i := 0; i < len(tt.expectedOrder); i++ {
queueInfo := testutils.NextPodOrDie(t, testCtx)
actualOrder[i] = queueInfo.Pod.Name
t.Logf("Popped Pod %q", queueInfo.Pod.Name)
}
if diff := cmp.Diff(tt.expectedOrder, actualOrder); diff != "" {
t.Errorf("Expected Pod order (-want,+got):\n%s", diff)
} else {
t.Logf("Pods were popped out in the expected order based on custom sorting logic.")
}
})
}
}
// TestPostFilterPlugin tests invocation of postFilter plugins.
func TestPostFilterPlugin(t *testing.T) {
numNodes := 1
@ -848,7 +955,7 @@ func TestPostFilterPlugin(t *testing.T) {
},
}}})
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes),
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes), true,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
@ -916,7 +1023,7 @@ func TestScorePlugin(t *testing.T) {
scorePlugin := &ScorePlugin{}
registry, prof := initRegistryAndConfig(t, scorePlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 10,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 10, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -960,7 +1067,7 @@ func TestNormalizeScorePlugin(t *testing.T) {
scoreWithNormalizePlugin := &ScoreWithNormalizePlugin{}
registry, prof := initRegistryAndConfig(t, scoreWithNormalizePlugin)
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10,
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
@ -1008,7 +1115,7 @@ func TestReservePluginReserve(t *testing.T) {
reservePlugin := &ReservePlugin{}
registry, prof := initRegistryAndConfig(t, reservePlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1123,7 +1230,7 @@ func TestPrebindPlugin(t *testing.T) {
},
})
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, nodesNum,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, nodesNum, true,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1279,7 +1386,7 @@ func TestUnReserveReservePlugins(t *testing.T) {
}
registry, prof := initRegistryAndConfig(t, pls...)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1373,7 +1480,7 @@ func TestUnReservePermitPlugins(t *testing.T) {
}
registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(profile),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1445,7 +1552,7 @@ func TestUnReservePreBindPlugins(t *testing.T) {
}
registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(profile),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1516,7 +1623,7 @@ func TestUnReserveBindPlugins(t *testing.T) {
test.plugin.client = testContext.ClientSet
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(profile),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1663,7 +1770,7 @@ func TestBindPlugin(t *testing.T) {
}},
})
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
@ -1789,7 +1896,7 @@ func TestPostBindPlugin(t *testing.T) {
}
registry, prof := initRegistryAndConfig(t, preBindPlugin, postBindPlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1881,7 +1988,7 @@ func TestPermitPlugin(t *testing.T) {
perPlugin := &PermitPlugin{name: permitPluginName}
registry, prof := initRegistryAndConfig(t, perPlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1931,7 +2038,7 @@ func TestMultiplePermitPlugins(t *testing.T) {
registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2)
// Create the API server and the scheduler with the test plugin set.
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2,
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
@ -1983,7 +2090,7 @@ func TestPermitPluginsCancelled(t *testing.T) {
registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2)
// Create the API server and the scheduler with the test plugin set.
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2,
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
@ -2046,7 +2153,7 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
permitPlugin := &PermitPlugin{name: permitPluginName}
registry, prof := initRegistryAndConfig(t, permitPlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -2128,7 +2235,7 @@ func TestFilterPlugin(t *testing.T) {
filterPlugin := &FilterPlugin{}
registry, prof := initRegistryAndConfig(t, filterPlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -2185,7 +2292,7 @@ func TestPreScorePlugin(t *testing.T) {
preScorePlugin := &PreScorePlugin{}
registry, prof := initRegistryAndConfig(t, preScorePlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -2245,7 +2352,7 @@ func TestPreEnqueuePlugin(t *testing.T) {
preFilterPlugin := &PreFilterPlugin{}
registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -2377,7 +2484,7 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
},
})
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, true,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
@ -2557,7 +2664,7 @@ func TestActivatePods(t *testing.T) {
})
// Create the API server and the scheduler with the test plugin set.
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1,
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, true,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
@ -2730,7 +2837,7 @@ func TestPreEnqueuePluginEventsToRegister(t *testing.T) {
}},
})
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)

View File

@ -205,7 +205,7 @@ func TestReScheduling(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
registry, prof := InitRegistryAndConfig(t, nil, test.plugins...)
testCtx, teardown := InitTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := InitTestSchedulerForFrameworkTest(t, testContext, 2, true,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()

View File

@ -31,6 +31,7 @@ import (
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/integration/util"
@ -44,10 +45,12 @@ import (
// This should only be called when you want to kill the scheduler alone, away from apiserver.
// For example, in scheduler integration tests, recreating apiserver is performance consuming,
// then shutdown the scheduler and recreate it between each test case is a better approach.
func InitTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) {
func InitTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, runScheduler bool, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) {
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...)
testutils.SyncSchedulerInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
if runScheduler {
go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
}
if nodeCount > 0 {
if _, err := testutils.CreateAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil {
@ -105,6 +108,10 @@ func InitRegistryAndConfig(t *testing.T, factory func(plugin framework.Plugin) f
plugin := configv1.Plugin{Name: p.Name()}
switch p.(type) {
case framework.QueueSortPlugin:
pls.QueueSort.Enabled = append(pls.QueueSort.Enabled, plugin)
// It's intentional to disable the PrioritySort plugin.
pls.QueueSort.Disabled = []configv1.Plugin{{Name: queuesort.Name}}
case framework.PreEnqueuePlugin:
pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin)
case framework.PreFilterPlugin: