From c712230ac1c2e8f1010261b57a75fee0045ebf8a Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Wed, 15 Jan 2020 12:26:22 -0800 Subject: [PATCH] Implement default queue sort logic as a scheduler plugin --- pkg/scheduler/BUILD | 2 + pkg/scheduler/algorithmprovider/BUILD | 2 + pkg/scheduler/algorithmprovider/registry.go | 6 + .../algorithmprovider/registry_test.go | 16 + .../apis/config/testing/compatibility_test.go | 23 ++ pkg/scheduler/core/BUILD | 1 + pkg/scheduler/core/extender_test.go | 15 +- pkg/scheduler/core/generic_scheduler_test.go | 288 ++++++++++++------ pkg/scheduler/factory.go | 7 + pkg/scheduler/factory_test.go | 5 + pkg/scheduler/framework/plugins/BUILD | 2 + .../framework/plugins/queuesort/BUILD | 37 +++ .../plugins/queuesort/priority_sort.go | 50 +++ .../plugins/queuesort/priority_sort_test.go | 121 ++++++++ pkg/scheduler/framework/plugins/registry.go | 2 + pkg/scheduler/framework/v1alpha1/framework.go | 12 +- .../framework/v1alpha1/framework_test.go | 53 +++- pkg/scheduler/internal/queue/BUILD | 8 +- .../internal/queue/scheduling_queue.go | 26 +- .../internal/queue/scheduling_queue_test.go | 173 +++-------- pkg/scheduler/scheduler_test.go | 56 +++- pkg/scheduler/testing/framework_helpers.go | 7 + test/integration/scheduler/scheduler_test.go | 6 + 23 files changed, 648 insertions(+), 270 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/queuesort/BUILD create mode 100644 pkg/scheduler/framework/plugins/queuesort/priority_sort.go create mode 100644 pkg/scheduler/framework/plugins/queuesort/priority_sort_test.go diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 02da292df09..7ec0a0c0fea 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -21,6 +21,7 @@ go_library( "//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library", "//pkg/scheduler/framework/plugins/noderesources:go_default_library", + "//pkg/scheduler/framework/plugins/queuesort:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/debugger:go_default_library", @@ -71,6 +72,7 @@ go_test( "//pkg/scheduler/framework/plugins/nodelabel:go_default_library", "//pkg/scheduler/framework/plugins/nodeports:go_default_library", "//pkg/scheduler/framework/plugins/noderesources:go_default_library", + "//pkg/scheduler/framework/plugins/queuesort:go_default_library", "//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library", "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", diff --git a/pkg/scheduler/algorithmprovider/BUILD b/pkg/scheduler/algorithmprovider/BUILD index f583539cac8..bb6cdf6141e 100644 --- a/pkg/scheduler/algorithmprovider/BUILD +++ b/pkg/scheduler/algorithmprovider/BUILD @@ -24,6 +24,7 @@ go_library( "//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library", "//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", + "//pkg/scheduler/framework/plugins/queuesort:go_default_library", "//pkg/scheduler/framework/plugins/tainttoleration:go_default_library", "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", "//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library", @@ -52,6 +53,7 @@ go_test( "//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library", "//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", + "//pkg/scheduler/framework/plugins/queuesort:go_default_library", "//pkg/scheduler/framework/plugins/tainttoleration:go_default_library", "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", "//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library", diff --git a/pkg/scheduler/algorithmprovider/registry.go b/pkg/scheduler/algorithmprovider/registry.go index 509ee97e36b..8aba8dd09dc 100644 --- a/pkg/scheduler/algorithmprovider/registry.go +++ b/pkg/scheduler/algorithmprovider/registry.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" @@ -83,6 +84,11 @@ func ListAlgorithmProviders() string { func getDefaultConfig(hardPodAffinityWeight int64) *Config { return &Config{ FrameworkPlugins: &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: queuesort.Name}, + }, + }, PreFilter: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: noderesources.FitName}, diff --git a/pkg/scheduler/algorithmprovider/registry_test.go b/pkg/scheduler/algorithmprovider/registry_test.go index 1ce7a379e8b..c397dacf584 100644 --- a/pkg/scheduler/algorithmprovider/registry_test.go +++ b/pkg/scheduler/algorithmprovider/registry_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -48,6 +49,11 @@ func TestClusterAutoscalerProvider(t *testing.T) { hardPodAffinityWeight := int64(1) wantConfig := &Config{ FrameworkPlugins: &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: queuesort.Name}, + }, + }, PreFilter: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: noderesources.FitName}, @@ -120,6 +126,11 @@ func TestApplyFeatureGates(t *testing.T) { featuresEnabled: false, wantConfig: &Config{ FrameworkPlugins: &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: queuesort.Name}, + }, + }, PreFilter: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: noderesources.FitName}, @@ -178,6 +189,11 @@ func TestApplyFeatureGates(t *testing.T) { featuresEnabled: true, wantConfig: &Config{ FrameworkPlugins: &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: queuesort.Name}, + }, + }, PreFilter: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: noderesources.FitName}, diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index f7bf79b97d4..cfb10635403 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -61,6 +61,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { ] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodeResourcesFit"}, {Name: "NodePorts"}, @@ -87,6 +88,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { ] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "FilterPlugin": { {Name: "NodeUnschedulable"}, {Name: "TaintToleration"}, @@ -115,6 +117,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { ] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -167,6 +170,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { ] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -224,6 +228,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { ] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -290,6 +295,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { ] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -364,6 +370,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { ] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -449,6 +456,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -545,6 +553,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -642,6 +651,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -743,6 +753,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -856,6 +867,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -971,6 +983,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -1086,6 +1099,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -1206,6 +1220,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }] }`, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodePorts"}, {Name: "NodeResourcesFit"}, @@ -1279,6 +1294,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { features.EvenPodsSpread: true, }, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "PodTopologySpread"}, }, @@ -1309,6 +1325,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { features.ResourceLimitsPriorityFunction: true, }, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PostFilterPlugin": { {Name: "NodeResourceLimits"}, }, @@ -1382,6 +1399,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { func TestAlgorithmProviderCompatibility(t *testing.T) { // Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases defaultPlugins := map[string][]config.Plugin{ + "QueueSortPlugin": { + {Name: "PrioritySort"}, + }, "PreFilterPlugin": { {Name: "NodeResourcesFit"}, {Name: "NodePorts"}, @@ -1438,6 +1458,9 @@ func TestAlgorithmProviderCompatibility(t *testing.T) { name: "ClusterAutoscalerProvider", provider: algorithmprovider.ClusterAutoscalerProvider, wantPlugins: map[string][]config.Plugin{ + "QueueSortPlugin": { + {Name: "PrioritySort"}, + }, "PreFilterPlugin": { {Name: "NodeResourcesFit"}, {Name: "NodePorts"}, diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index b8d252cf3b0..4dd2633e627 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -57,6 +57,7 @@ go_test( "//pkg/scheduler/framework/plugins/noderesources:go_default_library", "//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", + "//pkg/scheduler/framework/plugins/queuesort:go_default_library", "//pkg/scheduler/framework/plugins/tainttoleration:go_default_library", "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", "//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library", diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 333333c6a75..2e9db9142ef 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -366,6 +367,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { { registerPlugins: []st.RegisterPluginFunc{ st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), }, extenders: []FakeExtender{ { @@ -382,6 +384,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { { registerPlugins: []st.RegisterPluginFunc{ st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), }, extenders: []FakeExtender{ { @@ -398,6 +401,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { { registerPlugins: []st.RegisterPluginFunc{ st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), }, extenders: []FakeExtender{ { @@ -418,6 +422,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { { registerPlugins: []st.RegisterPluginFunc{ st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), }, extenders: []FakeExtender{ { @@ -434,6 +439,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { { registerPlugins: []st.RegisterPluginFunc{ st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), }, extenders: []FakeExtender{ { @@ -453,6 +459,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { { registerPlugins: []st.RegisterPluginFunc{ st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), }, extenders: []FakeExtender{ { @@ -478,6 +485,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { registerPlugins: []st.RegisterPluginFunc{ st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 20), + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), }, extenders: []FakeExtender{ { @@ -505,6 +513,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { registerPlugins: []st.RegisterPluginFunc{ st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 1), + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), }, extenders: []FakeExtender{ { @@ -530,6 +539,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { // because of the errors from errorPredicateExtender. registerPlugins: []st.RegisterPluginFunc{ st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), }, extenders: []FakeExtender{ { @@ -568,8 +578,9 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { registry := framework.Registry{} plugins := &schedulerapi.Plugins{ - Filter: &schedulerapi.PluginSet{}, - Score: &schedulerapi.PluginSet{}, + QueueSort: &schedulerapi.PluginSet{}, + Filter: &schedulerapi.PluginSet{}, + Score: &schedulerapi.PluginSet{}, } var pluginConfigs []schedulerapi.PluginConfig for _, f := range test.registerPlugins { diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index ec1a94463ac..5e8f8cd8ab1 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -47,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" @@ -385,6 +386,7 @@ func TestGenericScheduler(t *testing.T) { }{ { registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin), }, nodes: []string{"machine1", "machine2"}, @@ -401,6 +403,7 @@ func TestGenericScheduler(t *testing.T) { }, { registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), }, nodes: []string{"machine1", "machine2"}, @@ -412,6 +415,7 @@ func TestGenericScheduler(t *testing.T) { { // Fits on a machine where the pod ID matches the machine name registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), }, nodes: []string{"machine1", "machine2"}, @@ -422,6 +426,7 @@ func TestGenericScheduler(t *testing.T) { }, { registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), }, @@ -433,6 +438,7 @@ func TestGenericScheduler(t *testing.T) { }, { registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), }, @@ -444,6 +450,7 @@ func TestGenericScheduler(t *testing.T) { }, { registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), st.RegisterScorePlugin("ReverseNumericMap", newReverseNumericMapPlugin(), 2), @@ -456,6 +463,7 @@ func TestGenericScheduler(t *testing.T) { }, { registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), @@ -475,6 +483,7 @@ func TestGenericScheduler(t *testing.T) { }, { registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("NoPodsFilter", NewNoPodsFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), @@ -505,6 +514,7 @@ func TestGenericScheduler(t *testing.T) { { // Pod with existing PVC registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), }, nodes: []string{"machine1", "machine2"}, @@ -530,6 +540,7 @@ func TestGenericScheduler(t *testing.T) { { // Pod with non existing PVC registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), }, nodes: []string{"machine1", "machine2"}, @@ -553,6 +564,7 @@ func TestGenericScheduler(t *testing.T) { { // Pod with deleting PVC registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), }, nodes: []string{"machine1", "machine2"}, @@ -576,6 +588,7 @@ func TestGenericScheduler(t *testing.T) { }, { registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterScorePlugin("FalseMap", newFalseMapPlugin(), 1), st.RegisterScorePlugin("TrueMap", newTrueMapPlugin(), 2), @@ -588,6 +601,7 @@ func TestGenericScheduler(t *testing.T) { { name: "test even pods spread predicate - 2 nodes with maxskew=1", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterPluginAsExtensions( podtopologyspread.Name, 1, @@ -634,6 +648,7 @@ func TestGenericScheduler(t *testing.T) { { name: "test even pods spread predicate - 3 nodes with maxskew=2", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterPluginAsExtensions( podtopologyspread.Name, 1, @@ -698,6 +713,7 @@ func TestGenericScheduler(t *testing.T) { { name: "test with filter plugin returning Unschedulable status", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin( "FakeFilter", NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}), @@ -718,6 +734,7 @@ func TestGenericScheduler(t *testing.T) { { name: "test with filter plugin returning UnschedulableAndUnresolvable status", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin( "FakeFilter", NewFakeFilterPlugin(map[string]framework.Code{"3": framework.UnschedulableAndUnresolvable}), @@ -738,6 +755,7 @@ func TestGenericScheduler(t *testing.T) { { name: "test with partial failed filter plugin", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin( "FakeFilter", NewFakeFilterPlugin(map[string]framework.Code{"1": framework.Unschedulable}), @@ -767,7 +785,9 @@ func TestGenericScheduler(t *testing.T) { } registry := framework.Registry{} + // TODO: instantiate the plugins dynamically. plugins := &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{}, PreFilter: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{}, Score: &schedulerapi.PluginSet{}, @@ -779,7 +799,7 @@ func TestGenericScheduler(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes)) fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs, framework.WithSnapshotSharedLister(snapshot)) - pvcs := []v1.PersistentVolumeClaim{} + var pvcs []v1.PersistentVolumeClaim pvcs = append(pvcs, test.pvcs...) pvcLister := fakelisters.PersistentVolumeClaimLister(pvcs) @@ -817,8 +837,10 @@ func makeScheduler(nodes []*v1.Node, fns ...st.RegisterPluginFunc) *genericSched } registry := framework.Registry{} + // TODO: instantiate the plugins dynamically. plugins := &schedulerapi.Plugins{ - Filter: &schedulerapi.PluginSet{}, + QueueSort: &schedulerapi.PluginSet{}, + Filter: &schedulerapi.PluginSet{}, } var pluginConfigs []schedulerapi.PluginConfig for _, f := range fns { @@ -842,6 +864,7 @@ func TestFindFitAllError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) scheduler := makeScheduler( nodes, + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), ) @@ -874,6 +897,7 @@ func TestFindFitSomeError(t *testing.T) { nodes := makeNodeList([]string{"3", "2", "1"}) scheduler := makeScheduler( nodes, + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), ) @@ -934,15 +958,20 @@ func TestFindFitPredicateCallCounts(t *testing.T) { registry := framework.Registry{} plugin := fakeFilterPlugin{} + registry.Register(queuesort.Name, queuesort.New) registry.Register("FakeFilter", func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) { return &plugin, nil }) plugins := &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{{Name: queuesort.Name}}, + }, Filter: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{{Name: "FakeFilter"}}, }, } pluginConfigs := []schedulerapi.PluginConfig{ + {Name: queuesort.Name}, {Name: "FakeFilter"}, } fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs) @@ -1089,13 +1118,16 @@ func TestZeroRequest(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes)) registry := framework.Registry{} + // TODO: instantiate the plugins dynamically. plugins := &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{}, PostFilter: &schedulerapi.PluginSet{}, Score: &schedulerapi.PluginSet{}, } var pluginConfigs []schedulerapi.PluginConfig pluginRegistrations := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterScorePlugin(noderesources.LeastAllocatedName, noderesources.NewLeastAllocated, 1), st.RegisterScorePlugin(noderesources.BalancedAllocationName, noderesources.NewBalancedAllocation, 1), st.RegisterScorePlugin(defaultpodtopologyspread.Name, defaultpodtopologyspread.New, 1), @@ -1270,6 +1302,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "a pod that does not fit on any machine", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin), }, nodes: []string{"machine1", "machine2"}, @@ -1283,6 +1316,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "a pod that fits with no preemption", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), }, nodes: []string{"machine1", "machine2"}, @@ -1296,6 +1330,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "a pod that fits on one machine with no preemption", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), }, nodes: []string{"machine1", "machine2"}, @@ -1309,6 +1344,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "a pod that fits on both machines when lower priority pods are preempted", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), }, nodes: []string{"machine1", "machine2"}, @@ -1322,6 +1358,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "a pod that would fit on the machines, but other pods running are higher priority", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), }, nodes: []string{"machine1", "machine2"}, @@ -1335,6 +1372,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "medium priority pod is preempted, but lower priority one stays as it is small", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), }, nodes: []string{"machine1", "machine2"}, @@ -1349,6 +1387,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "mixed priority pods are preempted", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), }, nodes: []string{"machine1", "machine2"}, @@ -1365,6 +1404,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "mixed priority pods are preempted, pick later StartTime one when priorities are equal", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), }, nodes: []string{"machine1", "machine2"}, @@ -1381,6 +1421,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "pod with anti-affinity is preempted", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), st.RegisterFilterPlugin(interpodaffinity.Name, interpodaffinity.New), }, @@ -1415,6 +1456,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "preemption to resolve even pods spread FitError", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterPluginAsExtensions( podtopologyspread.Name, 1, @@ -1497,6 +1539,7 @@ func TestSelectNodesForPreemption(t *testing.T) { { name: "get Unschedulable in the preemption phase when the filter plugins filtering the nodes", registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), }, nodes: []string{"machine1", "machine2"}, @@ -1539,7 +1582,9 @@ func TestSelectNodesForPreemption(t *testing.T) { } registry := framework.Registry{} + // TODO: instantiate the plugins dynamically. plugins := &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{}, PreFilter: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{}, } @@ -1607,27 +1652,33 @@ func TestSelectNodesForPreemption(t *testing.T) { // TestPickOneNodeForPreemption tests pickOneNodeForPreemption. func TestPickOneNodeForPreemption(t *testing.T) { tests := []struct { - name string - registerFilterPlugin st.RegisterPluginFunc - nodes []string - pod *v1.Pod - pods []*v1.Pod - expected []string // any of the items is valid + name string + registerPlugins []st.RegisterPluginFunc + nodes []string + pod *v1.Pod + pods []*v1.Pod + expected []string // any of the items is valid }{ { - name: "No node needs preemption", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}}, + name: "No node needs preemption", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}}, expected: []string{"machine1"}, }, { - name: "a pod that fits on both machines when lower priority pods are preempted", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}}, + name: "a pod that fits on both machines when lower priority pods are preempted", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, @@ -1635,10 +1686,13 @@ func TestPickOneNodeForPreemption(t *testing.T) { expected: []string{"machine1", "machine2"}, }, { - name: "a pod that fits on a machine with no preemption", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2", "machine3"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}}, + name: "a pod that fits on a machine with no preemption", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2", "machine3"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, @@ -1646,10 +1700,13 @@ func TestPickOneNodeForPreemption(t *testing.T) { expected: []string{"machine3"}, }, { - name: "machine with min highest priority pod is picked", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2", "machine3"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, + name: "machine with min highest priority pod is picked", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2", "machine3"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, {ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, @@ -1663,10 +1720,13 @@ func TestPickOneNodeForPreemption(t *testing.T) { expected: []string{"machine3"}, }, { - name: "when highest priorities are the same, minimum sum of priorities is picked", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2", "machine3"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, + name: "when highest priorities are the same, minimum sum of priorities is picked", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2", "machine3"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, {ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, @@ -1680,10 +1740,13 @@ func TestPickOneNodeForPreemption(t *testing.T) { expected: []string{"machine2"}, }, { - name: "when highest priority and sum are the same, minimum number of pods is picked", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2", "machine3"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, + name: "when highest priority and sum are the same, minimum number of pods is picked", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2", "machine3"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, {ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, @@ -1702,10 +1765,13 @@ func TestPickOneNodeForPreemption(t *testing.T) { { // pickOneNodeForPreemption adjusts pod priorities when finding the sum of the victims. This // test ensures that the logic works correctly. - name: "sum of adjusted priorities is considered", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2", "machine3"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, + name: "sum of adjusted priorities is considered", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2", "machine3"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, {ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, @@ -1721,10 +1787,13 @@ func TestPickOneNodeForPreemption(t *testing.T) { expected: []string{"machine2"}, }, { - name: "non-overlapping lowest high priority, sum priorities, and number of pods", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2", "machine3", "machine4"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: types.UID("pod1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &veryHighPriority}}, + name: "non-overlapping lowest high priority, sum priorities, and number of pods", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2", "machine3", "machine4"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: types.UID("pod1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &veryHighPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, {ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}, @@ -1745,10 +1814,13 @@ func TestPickOneNodeForPreemption(t *testing.T) { expected: []string{"machine1"}, }, { - name: "same priority, same number of victims, different start time for each machine's pod", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2", "machine3"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, + name: "same priority, same number of victims, different start time for each machine's pod", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2", "machine3"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190103}}, {ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190103}}, @@ -1762,10 +1834,13 @@ func TestPickOneNodeForPreemption(t *testing.T) { expected: []string{"machine2"}, }, { - name: "same priority, same number of victims, different start time for all pods", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2", "machine3"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, + name: "same priority, same number of victims, different start time for all pods", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2", "machine3"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190105}}, {ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190103}}, @@ -1779,10 +1854,13 @@ func TestPickOneNodeForPreemption(t *testing.T) { expected: []string{"machine3"}, }, { - name: "different priority, same number of victims, different start time for all pods", - registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - nodes: []string{"machine1", "machine2", "machine3"}, - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, + name: "different priority, same number of victims, different start time for all pods", + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + nodes: []string{"machine1", "machine2", "machine3"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}}, pods: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190105}}, {ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190103}}, @@ -1804,11 +1882,15 @@ func TestPickOneNodeForPreemption(t *testing.T) { } snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes)) registry := framework.Registry{} + // TODO: instantiate the plugins dynamically. plugins := &schedulerapi.Plugins{ - Filter: &schedulerapi.PluginSet{}, + QueueSort: &schedulerapi.PluginSet{}, + Filter: &schedulerapi.PluginSet{}, } var pluginConfigs []schedulerapi.PluginConfig - test.registerFilterPlugin(®istry, plugins, pluginConfigs) + for _, f := range test.registerPlugins { + f(®istry, plugins, pluginConfigs) + } fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs, framework.WithSnapshotSharedLister(snapshot)) g := &genericScheduler{ @@ -1976,7 +2058,7 @@ func TestPreempt(t *testing.T) { extenders []*FakeExtender failedNodeToStatusMap framework.NodeToStatusMap nodeNames []string - registerPlugin st.RegisterPluginFunc + registerPlugins []st.RegisterPluginFunc expectedNode string expectedPods []string // list of preempted pods }{ @@ -1993,9 +2075,12 @@ func TestPreempt(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "m2.1", UID: types.UID("m2.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, {ObjectMeta: metav1.ObjectMeta{Name: "m3.1", UID: types.UID("m3.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, }, - registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - expectedNode: "machine1", - expectedPods: []string{"m1.1", "m1.2"}, + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + expectedNode: "machine1", + expectedPods: []string{"m1.1", "m1.2"}, }, { name: "One node doesn't need any preemption", @@ -2010,9 +2095,12 @@ func TestPreempt(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "m2.1", UID: types.UID("m2.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, }, - registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - expectedNode: "machine3", - expectedPods: []string{}, + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + expectedNode: "machine3", + expectedPods: []string{}, }, { name: "preemption for topology spread constraints", @@ -2086,13 +2174,16 @@ func TestPreempt(t *testing.T) { "node-x": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch), }, nodeNames: []string{"node-a/zone1", "node-b/zone1", "node-x/zone2"}, - registerPlugin: st.RegisterPluginAsExtensions( - podtopologyspread.Name, - 1, - podtopologyspread.New, - "PreFilter", - "Filter", - ), + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPluginAsExtensions( + podtopologyspread.Name, + 1, + podtopologyspread.New, + "PreFilter", + "Filter", + ), + }, expectedNode: "node-b", expectedPods: []string{"pod-b1"}, }, @@ -2117,9 +2208,12 @@ func TestPreempt(t *testing.T) { predicates: []fitPredicate{machine1PredicateExtender}, }, }, - registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - expectedNode: "machine1", - expectedPods: []string{"m1.1", "m1.2"}, + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + expectedNode: "machine1", + expectedPods: []string{"m1.1", "m1.2"}, }, { name: "Scheduler extenders do not allow any preemption", @@ -2139,9 +2233,12 @@ func TestPreempt(t *testing.T) { predicates: []fitPredicate{falsePredicateExtender}, }, }, - registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - expectedNode: "", - expectedPods: []string{}, + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + expectedNode: "", + expectedPods: []string{}, }, { name: "One scheduler extender allows only machine1, the other returns error but ignorable. Only machine1 would be chosen", @@ -2165,9 +2262,12 @@ func TestPreempt(t *testing.T) { predicates: []fitPredicate{machine1PredicateExtender}, }, }, - registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - expectedNode: "machine1", - expectedPods: []string{"m1.1", "m1.2"}, + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + expectedNode: "machine1", + expectedPods: []string{"m1.1", "m1.2"}, }, { name: "One scheduler extender allows only machine1, but it is not interested in given pod, otherwise machine1 would have been chosen", @@ -2191,9 +2291,12 @@ func TestPreempt(t *testing.T) { predicates: []fitPredicate{truePredicateExtender}, }, }, - registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - expectedNode: "machine3", - expectedPods: []string{}, + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + expectedNode: "machine3", + expectedPods: []string{}, }, { name: "no preempting in pod", @@ -2208,9 +2311,12 @@ func TestPreempt(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "m2.1", UID: types.UID("m2.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, {ObjectMeta: metav1.ObjectMeta{Name: "m3.1", UID: types.UID("m3.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, }, - registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - expectedNode: "", - expectedPods: nil, + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + expectedNode: "", + expectedPods: nil, }, { name: "PreemptionPolicy is nil", @@ -2225,9 +2331,12 @@ func TestPreempt(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "m2.1", UID: types.UID("m2.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, {ObjectMeta: metav1.ObjectMeta{Name: "m3.1", UID: types.UID("m3.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}, Status: v1.PodStatus{Phase: v1.PodRunning}}, }, - registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), - expectedNode: "machine1", - expectedPods: []string{"m1.1", "m1.2"}, + registerPlugins: []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit), + }, + expectedNode: "machine1", + expectedPods: []string{"m1.1", "m1.2"}, }, } @@ -2274,12 +2383,16 @@ func TestPreempt(t *testing.T) { } registry := framework.Registry{} + // TODO: instantiate the plugins dynamically. plugins := &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{}, PreFilter: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{}, } var pluginConfigs []schedulerapi.PluginConfig - test.registerPlugin(®istry, plugins, pluginConfigs) + for _, f := range test.registerPlugins { + f(®istry, plugins, pluginConfigs) + } snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes)) fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs, framework.WithSnapshotSharedLister(snapshot)) @@ -2420,6 +2533,7 @@ func TestFairEvaluationForNodes(t *testing.T) { nodes := makeNodeList(nodeNames) g := makeScheduler( nodes, + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), ) // To make numAllNodes % nodesToFind != 0 diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 61faf9c01b7..a84f8aac987 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -45,6 +45,7 @@ import ( frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" @@ -289,6 +290,12 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, // Combine all framework configurations. If this results in any duplication, framework // instantiation should fail. var defaultPlugins schedulerapi.Plugins + // "PrioritySort" is neither a predicate nor priority before. We make it as the default QueueSort plugin. + defaultPlugins.Append(&schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{{Name: queuesort.Name}}, + }, + }) defaultPlugins.Append(pluginsForPredicates) defaultPlugins.Append(pluginsForPriorities) defaultPlugins.Apply(c.plugins) diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index d166d7d2523..060d4901d1d 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -44,6 +44,7 @@ import ( extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -110,6 +111,10 @@ func TestCreateFromConfig(t *testing.T) { if hpa != v1.DefaultHardPodAffinitySymmetricWeight { t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", v1.DefaultHardPodAffinitySymmetricWeight, hpa) } + queueSortPls := sched.Framework.ListPlugins()["QueueSortPlugin"] + if len(queueSortPls) == 0 || queueSortPls[0].Name != queuesort.Name { + t.Errorf("QueueSort plugin %q not registered.", queuesort.Name) + } // Verify that node label predicate/priority are converted to framework plugins. wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}` diff --git a/pkg/scheduler/framework/plugins/BUILD b/pkg/scheduler/framework/plugins/BUILD index 8302323e6ff..1bd66d44321 100644 --- a/pkg/scheduler/framework/plugins/BUILD +++ b/pkg/scheduler/framework/plugins/BUILD @@ -23,6 +23,7 @@ go_library( "//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library", "//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", + "//pkg/scheduler/framework/plugins/queuesort:go_default_library", "//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library", "//pkg/scheduler/framework/plugins/tainttoleration:go_default_library", "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", @@ -62,6 +63,7 @@ filegroup( "//pkg/scheduler/framework/plugins/nodeunschedulable:all-srcs", "//pkg/scheduler/framework/plugins/nodevolumelimits:all-srcs", "//pkg/scheduler/framework/plugins/podtopologyspread:all-srcs", + "//pkg/scheduler/framework/plugins/queuesort:all-srcs", "//pkg/scheduler/framework/plugins/serviceaffinity:all-srcs", "//pkg/scheduler/framework/plugins/tainttoleration:all-srcs", "//pkg/scheduler/framework/plugins/volumebinding:all-srcs", diff --git a/pkg/scheduler/framework/plugins/queuesort/BUILD b/pkg/scheduler/framework/plugins/queuesort/BUILD new file mode 100644 index 00000000000..14b56231e46 --- /dev/null +++ b/pkg/scheduler/framework/plugins/queuesort/BUILD @@ -0,0 +1,37 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["priority_sort.go"], + importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort", + visibility = ["//visibility:public"], + deps = [ + "//pkg/api/v1/pod:go_default_library", + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["priority_sort_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/scheduler/framework/plugins/queuesort/priority_sort.go b/pkg/scheduler/framework/plugins/queuesort/priority_sort.go new file mode 100644 index 00000000000..fe126b710a9 --- /dev/null +++ b/pkg/scheduler/framework/plugins/queuesort/priority_sort.go @@ -0,0 +1,50 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queuesort + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/api/v1/pod" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +// Name is the name of the plugin used in the plugin registry and configurations. +const Name = "PrioritySort" + +// PrioritySort is a plugin that implements Priority based sorting. +type PrioritySort struct{} + +var _ framework.QueueSortPlugin = &PrioritySort{} + +// Name returns name of the plugin. +func (pl *PrioritySort) Name() string { + return Name +} + +// Less is the function used by the activeQ heap algorithm to sort pods. +// It sorts pods based on their priority. When priorities are equal, it uses +// PodInfo.timestamp. +func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.PodInfo) bool { + p1 := pod.GetPodPriority(pInfo1.Pod) + p2 := pod.GetPodPriority(pInfo2.Pod) + return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp)) +} + +// New initializes a new plugin and returns it. +func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) { + return &PrioritySort{}, nil +} diff --git a/pkg/scheduler/framework/plugins/queuesort/priority_sort_test.go b/pkg/scheduler/framework/plugins/queuesort/priority_sort_test.go new file mode 100644 index 00000000000..0420efb6738 --- /dev/null +++ b/pkg/scheduler/framework/plugins/queuesort/priority_sort_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queuesort + +import ( + "testing" + "time" + + v1 "k8s.io/api/core/v1" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" +) + +func TestLess(t *testing.T) { + prioritySort := &PrioritySort{} + var lowPriority, highPriority = int32(10), int32(100) + t1 := time.Now() + t2 := t1.Add(time.Second) + for _, tt := range []struct { + name string + p1 *framework.PodInfo + p2 *framework.PodInfo + expected bool + }{ + { + name: "p1.priority less than p2.priority", + p1: &framework.PodInfo{ + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + Priority: &lowPriority, + }, + }, + }, + p2: &framework.PodInfo{ + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + }, + }, + expected: false, // p2 should be ahead of p1 in the queue + }, + { + name: "p1.priority greater than p2.priority", + p1: &framework.PodInfo{ + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + }, + }, + p2: &framework.PodInfo{ + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + Priority: &lowPriority, + }, + }, + }, + expected: true, // p1 should be ahead of p2 in the queue + }, + { + name: "equal priority. p1 is added to schedulingQ earlier than p2", + p1: &framework.PodInfo{ + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + }, + Timestamp: t1, + }, + p2: &framework.PodInfo{ + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + }, + Timestamp: t2, + }, + expected: true, // p1 should be ahead of p2 in the queue + }, + { + name: "equal priority. p2 is added to schedulingQ earlier than p1", + p1: &framework.PodInfo{ + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + }, + Timestamp: t2, + }, + p2: &framework.PodInfo{ + Pod: &v1.Pod{ + Spec: v1.PodSpec{ + Priority: &highPriority, + }, + }, + Timestamp: t1, + }, + expected: false, // p2 should be ahead of p1 in the queue + }, + } { + t.Run(tt.name, func(t *testing.T) { + if got := prioritySort.Less(tt.p1, tt.p2); got != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, got) + } + }) + } +} diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 088940763a7..38906da993b 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" @@ -77,5 +78,6 @@ func NewInTreeRegistry(args *RegistryArgs) framework.Registry { interpodaffinity.Name: interpodaffinity.New, nodelabel.Name: nodelabel.New, serviceaffinity.Name: serviceaffinity.New, + queuesort.Name: queuesort.New, } } diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 2da48880c37..f7a182d8896 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -248,6 +248,10 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } } + if len(f.queueSortPlugins) == 0 { + return nil, fmt.Errorf("no queue sort plugin is enabled") + } + if len(f.queueSortPlugins) > 1 { return nil, fmt.Errorf("only one queue sort plugin can be enabled") } @@ -287,8 +291,14 @@ func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, plugi // QueueSortFunc returns the function to sort pods in scheduling queue func (f *framework) QueueSortFunc() LessFunc { + if f == nil { + // If framework is nil, simply keep their order unchanged. + // NOTE: this is primarily for tests. + return func(_, _ *PodInfo) bool { return false } + } + if len(f.queueSortPlugins) == 0 { - return nil + panic("No QueueSort plugin is registered in the framework.") } // Only one QueueSort plugin can be enabled. diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index 429394c913d..7f508cdde4f 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -26,7 +26,6 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -37,6 +36,7 @@ import ( ) const ( + queueSortPlugin = "no-op-queue-sort-plugin" scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1" scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2" scorePlugin1 = "score-plugin-1" @@ -275,7 +275,24 @@ func (pp *TestPermitPlugin) Permit(ctx context.Context, state *CycleState, p *v1 return NewStatus(Wait, ""), time.Duration(10 * time.Second) } -var registry Registry = func() Registry { +var _ QueueSortPlugin = &TestQueueSortPlugin{} + +func newQueueSortPlugin(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) { + return &TestQueueSortPlugin{}, nil +} + +// TestQueueSortPlugin is a no-op implementation for QueueSort extension point. +type TestQueueSortPlugin struct{} + +func (pl *TestQueueSortPlugin) Name() string { + return queueSortPlugin +} + +func (pl *TestQueueSortPlugin) Less(_, _ *PodInfo) bool { + return false +} + +var registry = func() Registry { r := make(Registry) r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1) r.Register(scoreWithNormalizePlugin2, newScoreWithNormalizePlugin2) @@ -291,7 +308,7 @@ var defaultWeights = map[string]int32{ scorePlugin1: 1, } -var emptyArgs []config.PluginConfig = make([]config.PluginConfig, 0) +var emptyArgs = make([]config.PluginConfig, 0) var state = &CycleState{} // Pod is only used for logging errors. @@ -301,6 +318,22 @@ var nodes = []*v1.Node{ {ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, } +func newFrameworkWithQueueSortEnabled(r Registry, pl *config.Plugins, plc []config.PluginConfig, opts ...Option) (Framework, error) { + if _, ok := r[queueSortPlugin]; !ok { + r[queueSortPlugin] = newQueueSortPlugin + } + plugins := &config.Plugins{} + plugins.Append(&config.Plugins{ + QueueSort: &config.PluginSet{ + Enabled: []config.Plugin{ + {Name: queueSortPlugin}, + }, + }, + }) + plugins.Append(pl) + return NewFramework(r, plugins, plc, opts...) +} + func TestInitFrameworkWithScorePlugins(t *testing.T) { tests := []struct { name string @@ -338,7 +371,7 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := NewFramework(registry, tt.plugins, emptyArgs) + _, err := newFrameworkWithQueueSortEnabled(registry, tt.plugins, emptyArgs) if tt.initErr && err == nil { t.Fatal("Framework initialization should fail") } @@ -534,7 +567,7 @@ func TestRunScorePlugins(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Inject the results via Args in PluginConfig. - f, err := NewFramework(registry, tt.plugins, tt.pluginConfigs) + f, err := newFrameworkWithQueueSortEnabled(registry, tt.plugins, tt.pluginConfigs) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -572,7 +605,7 @@ func TestPreFilterPlugins(t *testing.T) { }) plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithExtensionsPluginName}, {Name: preFilterPluginName}}}} t.Run("TestPreFilterPlugin", func(t *testing.T) { - f, err := NewFramework(r, plugins, emptyArgs) + f, err := newFrameworkWithQueueSortEnabled(r, plugins, emptyArgs) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -798,7 +831,7 @@ func TestFilterPlugins(t *testing.T) { config.Plugin{Name: pl.name}) } - f, err := NewFramework(registry, cfgPls, emptyArgs, WithRunAllFilters(tt.runAllFilters)) + f, err := newFrameworkWithQueueSortEnabled(registry, cfgPls, emptyArgs, WithRunAllFilters(tt.runAllFilters)) if err != nil { t.Fatalf("fail to create framework: %s", err) } @@ -1003,7 +1036,7 @@ func TestRecordingMetrics(t *testing.T) { Unreserve: pluginSet, } recorder := newMetricsRecorder(100, time.Nanosecond) - f, err := NewFramework(r, plugins, emptyArgs, withMetricsRecorder(recorder)) + f, err := newFrameworkWithQueueSortEnabled(r, plugins, emptyArgs, withMetricsRecorder(recorder)) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -1052,7 +1085,7 @@ func TestPermitWaitingMetric(t *testing.T) { plugins := &config.Plugins{ Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 1}}}, } - f, err := NewFramework(r, plugins, emptyArgs) + f, err := newFrameworkWithQueueSortEnabled(r, plugins, emptyArgs) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } @@ -1082,7 +1115,7 @@ func TestRejectWaitingPod(t *testing.T) { Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}}, } - f, err := NewFramework(r, plugins, emptyArgs) + f, err := newFrameworkWithQueueSortEnabled(r, plugins, emptyArgs) if err != nil { t.Fatalf("Failed to create framework for testing: %v", err) } diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 86ebbc60561..e5b150cde41 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -10,7 +10,6 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler/internal/queue", visibility = ["//pkg/scheduler:__subpackages__"], deps = [ - "//pkg/api/v1/pod:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/heap:go_default_library", "//pkg/scheduler/metrics:go_default_library", @@ -33,20 +32,19 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/api/v1/pod:go_default_library", + "//pkg/scheduler/algorithmprovider:go_default_library", "//pkg/scheduler/apis/config:go_default_library", + "//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", - "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/metrics:go_default_library", - "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/util:go_default_library", - "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", ], ) diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 52cbdfd7445..7130c42dcb6 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -36,7 +36,6 @@ import ( ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/api/v1/pod" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/internal/heap" "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -196,17 +195,6 @@ func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo { } } -// activeQComp is the function used by the activeQ heap algorithm to sort pods. -// It sorts pods based on their priority. When priorities are equal, it uses -// PodInfo.timestamp. -func activeQComp(podInfo1, podInfo2 interface{}) bool { - pInfo1 := podInfo1.(*framework.PodInfo) - pInfo2 := podInfo2.(*framework.PodInfo) - prio1 := pod.GetPodPriority(pInfo1.Pod) - prio2 := pod.GetPodPriority(pInfo2.Pod) - return (prio1 > prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp)) -} - // NewPriorityQueue creates a PriorityQueue object. func NewPriorityQueue( fwk framework.Framework, @@ -217,16 +205,10 @@ func NewPriorityQueue( opt(&options) } - comp := activeQComp - if fwk != nil { - if queueSortFunc := fwk.QueueSortFunc(); queueSortFunc != nil { - comp = func(podInfo1, podInfo2 interface{}) bool { - pInfo1 := podInfo1.(*framework.PodInfo) - pInfo2 := podInfo2.(*framework.PodInfo) - - return queueSortFunc(pInfo1, pInfo2) - } - } + comp := func(podInfo1, podInfo2 interface{}) bool { + pInfo1 := podInfo1.(*framework.PodInfo) + pInfo2 := podInfo2.(*framework.PodInfo) + return fwk.QueueSortFunc()(pInfo1, pInfo2) } pq := &PriorityQueue{ diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 6c424ac251c..2242a9e9629 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -17,7 +17,6 @@ limitations under the License. package queue import ( - "context" "fmt" "reflect" "strings" @@ -30,17 +29,16 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/informers" - clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/testutil" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" + frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/metrics" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" "k8s.io/kubernetes/pkg/scheduler/util" - "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) const queueMetricMetadata = ` @@ -130,7 +128,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { } func TestPriorityQueue_Add(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -166,121 +164,42 @@ func TestPriorityQueue_Add(t *testing.T) { } } -type fakeFramework struct{} - -func (*fakeFramework) QueueSortFunc() framework.LessFunc { - return func(podInfo1, podInfo2 *framework.PodInfo) bool { - prio1 := podutil.GetPodPriority(podInfo1.Pod) - prio2 := podutil.GetPodPriority(podInfo2.Pod) - return prio1 < prio2 +func newDefaultFramework() framework.Framework { + defaultCfg := algorithmprovider.NewRegistry(1)[schedulerapi.SchedulerDefaultProviderName] + pl, pls := defaultCfg.FrameworkPlugins, defaultCfg.FrameworkPluginConfig + fakeClient := fake.NewSimpleClientset() + fwk, err := framework.NewFramework( + frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{}), + pl, + pls, + framework.WithClientSet(fakeClient), + framework.WithInformerFactory(informers.NewSharedInformerFactory(fakeClient, 0)), + framework.WithSnapshotSharedLister(nodeinfosnapshot.NewEmptySnapshot()), + ) + if err != nil { + panic(err) } -} - -func (f *fakeFramework) HasFilterPlugins() bool { - return true -} - -func (f *fakeFramework) HasScorePlugins() bool { - return true -} - -func (f *fakeFramework) ListPlugins() map[string][]config.Plugin { - return nil -} - -func (*fakeFramework) NodeInfoSnapshot() *nodeinfosnapshot.Snapshot { - return nil -} - -func (*fakeFramework) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { - return nil -} - -func (*fakeFramework) RunFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) framework.PluginToStatus { - return nil -} - -func (*fakeFramework) RunPreFilterExtensionAddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { - return nil -} - -func (*fakeFramework) RunPreFilterExtensionRemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { - return nil -} - -func (*fakeFramework) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) { - return nil, nil -} - -func (*fakeFramework) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { - return nil -} - -func (*fakeFramework) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { - return nil -} - -func (*fakeFramework) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { -} - -func (*fakeFramework) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status { - return nil -} - -func (*fakeFramework) RunReservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { - return nil -} - -func (*fakeFramework) RunUnreservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { -} - -func (*fakeFramework) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { - return nil -} - -func (*fakeFramework) IterateOverWaitingPods(callback func(framework.WaitingPod)) {} - -func (*fakeFramework) GetWaitingPod(uid types.UID) framework.WaitingPod { - return nil -} - -func (*fakeFramework) RejectWaitingPod(uid types.UID) { -} - -func (*fakeFramework) ClientSet() clientset.Interface { - return nil -} - -func (*fakeFramework) SharedInformerFactory() informers.SharedInformerFactory { - return nil -} - -func (*fakeFramework) SnapshotSharedLister() schedulerlisters.SharedLister { - return nil -} - -func (*fakeFramework) VolumeBinder() *volumebinder.VolumeBinder { - return nil + return fwk } func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { - q := createAndRunPriorityQueue(&fakeFramework{}) + q := createAndRunPriorityQueue(newDefaultFramework()) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } if err := q.Add(&highPriorityPod); err != nil { t.Errorf("add failed: %v", err) } - if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { - t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) - } if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) } + if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod { + t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name) + } } func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) q.Add(&highPriNominatedPod) q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything. q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) @@ -312,7 +231,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { // Pods in and before current scheduling cycle will be put back to activeQueue // if we were trying to schedule them when we received move request. func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { - q := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(time.Now()))) + q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(time.Now()))) totalNum := 10 expectedPods := make([]v1.Pod, 0, totalNum) for i := 0; i < totalNum; i++ { @@ -379,7 +298,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) { } func TestPriorityQueue_Pop(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -396,7 +315,7 @@ func TestPriorityQueue_Pop(t *testing.T) { } func TestPriorityQueue_Update(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) q.Update(nil, &highPriorityPod) if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists { t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name) @@ -432,7 +351,7 @@ func TestPriorityQueue_Update(t *testing.T) { } func TestPriorityQueue_Delete(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) q.Update(&highPriorityPod, &highPriNominatedPod) q.Add(&unschedulablePod) if err := q.Delete(&highPriNominatedPod); err != nil { @@ -456,7 +375,7 @@ func TestPriorityQueue_Delete(t *testing.T) { } func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) q.Add(&medPriorityPod) addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod)) @@ -502,7 +421,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { Spec: v1.PodSpec{NodeName: "machine1"}, } - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) q.Add(&medPriorityPod) // Add a couple of pods to the unschedulableQ. addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) @@ -523,7 +442,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) q.Add(&medPriorityPod) q.Add(&unschedulablePod) q.Add(&highPriorityPod) @@ -548,7 +467,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { return pendingSet } - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) q.Add(&medPriorityPod) addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod)) @@ -564,7 +483,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) { } func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) if err := q.Add(&medPriorityPod); err != nil { t.Errorf("add failed: %v", err) } @@ -634,7 +553,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { func TestPriorityQueue_NewWithOptions(t *testing.T) { q := createAndRunPriorityQueue( - nil, + newDefaultFramework(), WithPodInitialBackoffDuration(2*time.Second), WithPodMaxBackoffDuration(20*time.Second), ) @@ -806,7 +725,7 @@ func TestSchedulingQueue_Close(t *testing.T) { }{ { name: "PriorityQueue close", - q: createAndRunPriorityQueue(nil), + q: createAndRunPriorityQueue(newDefaultFramework()), expectedErr: fmt.Errorf(queueClosed), }, } @@ -835,7 +754,7 @@ func TestSchedulingQueue_Close(t *testing.T) { // ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestRecentlyTriedPodsGoBack(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) // Add a few pods to priority queue. for i := 0; i < 5; i++ { p := v1.Pod{ @@ -889,7 +808,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { // This behavior ensures that an unschedulable pod does not block head of the queue when there // are frequent events that move pods to the active queue. func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) // Add an unschedulable pod to a priority queue. // This makes a situation that the pod was tried to schedule @@ -980,7 +899,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // TestHighPriorityBackoff tests that a high priority pod does not block // other pods if it is unschedulable func TestHighPriorityBackoff(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1043,7 +962,7 @@ func TestHighPriorityBackoff(t *testing.T) { // TestHighPriorityFlushUnschedulableQLeftover tests that pods will be moved to // activeQ after one minutes if it is in unschedulableQ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { - q := createAndRunPriorityQueue(nil) + q := createAndRunPriorityQueue(newDefaultFramework()) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-midpod", @@ -1240,7 +1159,7 @@ func TestPodTimestamp(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp))) + queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp))) var podInfoList []*framework.PodInfo for i, op := range test.operations { @@ -1407,7 +1326,7 @@ scheduler_pending_pods{queue="unschedulable"} 0 for _, test := range tests { t.Run(test.name, func(t *testing.T) { resetMetrics() - queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp))) + queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp))) for i, op := range test.operations { for _, pInfo := range test.operands[i] { op(queue, pInfo) @@ -1436,7 +1355,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 1: A pod is created and scheduled after 1 attempt. The queue operations are // Add -> Pop. c := clock.NewFakeClock(timestamp) - queue := createAndRunPriorityQueue(nil, WithClock(c)) + queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) queue.Add(pod) pInfo, err := queue.Pop() if err != nil { @@ -1447,7 +1366,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 2: A pod is created and scheduled after 2 attempts. The queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop. c = clock.NewFakeClock(timestamp) - queue = createAndRunPriorityQueue(nil, WithClock(c)) + queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1467,7 +1386,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) { // Case 3: Similar to case 2, but before the second pop, call update, the queue operations are // Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop. c = clock.NewFakeClock(timestamp) - queue = createAndRunPriorityQueue(nil, WithClock(c)) + queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c)) queue.Add(pod) pInfo, err = queue.Pop() if err != nil { @@ -1565,7 +1484,7 @@ func TestIncomingPodsMetrics(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { metrics.SchedulerQueueIncomingPods.Reset() - queue := NewPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp))) + queue := NewPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp))) queue.Close() queue.Run() for _, op := range test.operations { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index b051b2d37ac..8c79f5a1e2d 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -49,6 +49,7 @@ import ( frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -355,8 +356,11 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { client := clientsetfake.NewSimpleClientset(&node) informerFactory := informers.NewSharedInformerFactory(client, 0) - f := st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter") - scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, f, pod, &node) + fns := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter"), + } + scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, pod, &node, fns...) waitPodExpireChan := make(chan struct{}) timeout := make(chan struct{}) @@ -418,8 +422,11 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { scache.AddNode(&node) client := clientsetfake.NewSimpleClientset(&node) informerFactory := informers.NewSharedInformerFactory(client, 0) - f := st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter") - scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, f, firstPod, &node) + fns := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter"), + } + scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, firstPod, &node, fns...) // We use conflicted pod ports to incur fit predicate failure. secondPod := podWithPort("bar", "", 8080) @@ -512,10 +519,13 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) { client := clientsetfake.NewSimpleClientset(&node) informerFactory := informers.NewSharedInformerFactory(client, 0) - f := st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter") + fns := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter"), + } scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry( - queuedPodStore, scache, informerFactory, f, stop, test.BindingDuration) + queuedPodStore, scache, informerFactory, stop, test.BindingDuration, fns...) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) @@ -547,9 +557,9 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) { // queuedPodStore: pods queued before processing. // cache: scheduler cache that might contain assumed pods. func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, - informerFactory informers.SharedInformerFactory, stop chan struct{}, f st.RegisterPluginFunc, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) { + informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { - scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, nil) + scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) @@ -629,8 +639,11 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { fmt.Sprintf("Insufficient %v", v1.ResourceMemory), ) } - f := st.RegisterFilterPlugin("PodFitsResources", noderesources.NewFit) - scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, nil) + fns := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin("PodFitsResources", noderesources.NewFit), + } + scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) @@ -657,14 +670,18 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. -func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, f st.RegisterPluginFunc, recorder events.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, recorder events.EventRecorder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { registry := framework.Registry{} + // TODO: instantiate the plugins dynamically. plugins := &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{}, PreFilter: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{}, } var pluginConfigs []schedulerapi.PluginConfig - f(®istry, plugins, pluginConfigs) + for _, f := range fns { + f(®istry, plugins, pluginConfigs) + } fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs) algo := core.NewGenericScheduler( scache, @@ -711,14 +728,18 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C return sched, bindingChan, errChan } -func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, f st.RegisterPluginFunc, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { +func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, stop chan struct{}, bindingTime time.Duration, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding) { registry := framework.Registry{} + // TODO: instantiate the plugins dynamically. plugins := &schedulerapi.Plugins{ + QueueSort: &schedulerapi.PluginSet{}, PreFilter: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{}, } var pluginConfigs []schedulerapi.PluginConfig - f(®istry, plugins, pluginConfigs) + for _, f := range fns { + f(®istry, plugins, pluginConfigs) + } fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs) queue := internalqueue.NewSchedulingQueue(nil) algo := core.NewGenericScheduler( @@ -785,8 +806,11 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi volumeBindingNewFunc := func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { return volumebinding.NewFromVolumeBinder(fakeVolumeBinder), nil } - f := st.RegisterFilterPlugin(volumebinding.Name, volumeBindingNewFunc) - s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, recorder) + fns := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterFilterPlugin(volumebinding.Name, volumeBindingNewFunc), + } + s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, recorder, fns...) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) s.VolumeBinder = fakeVolumeBinder diff --git a/pkg/scheduler/testing/framework_helpers.go b/pkg/scheduler/testing/framework_helpers.go index 5865535db56..3c08ddf5906 100644 --- a/pkg/scheduler/testing/framework_helpers.go +++ b/pkg/scheduler/testing/framework_helpers.go @@ -25,6 +25,11 @@ import ( // to register a Filter Plugin to a given registry. type RegisterPluginFunc func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig) +// RegisterQueueSortPlugin returns a function to register a QueueSort Plugin to a given registry. +func RegisterQueueSortPlugin(pluginName string, pluginNewFunc framework.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, 1, pluginNewFunc, "QueueSort") +} + // RegisterFilterPlugin returns a function to register a Filter Plugin to a given registry. func RegisterFilterPlugin(pluginName string, pluginNewFunc framework.PluginFactory) RegisterPluginFunc { return RegisterPluginAsExtensions(pluginName, 1, pluginNewFunc, "Filter") @@ -59,6 +64,8 @@ func RegisterPluginAsExtensions(pluginName string, weight int32, pluginNewFunc f func getPluginSetByExtension(plugins *schedulerapi.Plugins, extension string) *schedulerapi.PluginSet { switch extension { + case "QueueSort": + return plugins.QueueSort case "Filter": return plugins.Filter case "PreFilter": diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index dbb09cb84c5..75b2060b20e 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -79,6 +79,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { ] }`, expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodeResourcesFit"}, }, @@ -98,6 +99,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { "apiVersion" : "v1" }`, expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodeResourcesFit"}, {Name: "NodePorts"}, @@ -144,6 +146,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { "priorities" : [] }`, expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "FilterPlugin": { {Name: "NodeUnschedulable"}, {Name: "TaintToleration"}, @@ -160,6 +163,7 @@ priorities: weight: 1 `, expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodeResourcesFit"}, }, @@ -178,6 +182,7 @@ priorities: kind: Policy `, expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "PreFilterPlugin": { {Name: "NodeResourcesFit"}, {Name: "NodePorts"}, @@ -223,6 +228,7 @@ predicates: [] priorities: [] `, expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ + "QueueSortPlugin": {{Name: "PrioritySort"}}, "FilterPlugin": { {Name: "NodeUnschedulable"}, {Name: "TaintToleration"},