Merge pull request #87261 from Huang-Wei/sched-q-sort

Implement default queue sort logic as a scheduler plugin
This commit is contained in:
Kubernetes Prow Robot 2020-01-17 01:40:56 -08:00 committed by GitHub
commit 05b6c32886
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 648 additions and 270 deletions

View File

@ -21,6 +21,7 @@ go_library(
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library",
@ -71,6 +72,7 @@ go_test(
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/nodeports:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",

View File

@ -24,6 +24,7 @@ go_library(
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
"//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
@ -52,6 +53,7 @@ go_test(
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
"//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
@ -83,6 +84,11 @@ func ListAlgorithmProviders() string {
func getDefaultConfig(hardPodAffinityWeight int64) *Config {
return &Config{
FrameworkPlugins: &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name},
},
},
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -48,6 +49,11 @@ func TestClusterAutoscalerProvider(t *testing.T) {
hardPodAffinityWeight := int64(1)
wantConfig := &Config{
FrameworkPlugins: &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name},
},
},
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
@ -120,6 +126,11 @@ func TestApplyFeatureGates(t *testing.T) {
featuresEnabled: false,
wantConfig: &Config{
FrameworkPlugins: &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name},
},
},
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
@ -178,6 +189,11 @@ func TestApplyFeatureGates(t *testing.T) {
featuresEnabled: true,
wantConfig: &Config{
FrameworkPlugins: &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: queuesort.Name},
},
},
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},

View File

@ -61,6 +61,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
@ -87,6 +88,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "TaintToleration"},
@ -115,6 +117,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -167,6 +170,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -224,6 +228,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -290,6 +295,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -364,6 +370,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -449,6 +456,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -545,6 +553,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -642,6 +651,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -743,6 +753,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -856,6 +867,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -971,6 +983,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -1086,6 +1099,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -1206,6 +1220,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}]
}`,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodePorts"},
{Name: "NodeResourcesFit"},
@ -1279,6 +1294,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
features.EvenPodsSpread: true,
},
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "PodTopologySpread"},
},
@ -1309,6 +1325,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
features.ResourceLimitsPriorityFunction: true,
},
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PostFilterPlugin": {
{Name: "NodeResourceLimits"},
},
@ -1382,6 +1399,9 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
func TestAlgorithmProviderCompatibility(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
defaultPlugins := map[string][]config.Plugin{
"QueueSortPlugin": {
{Name: "PrioritySort"},
},
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
@ -1438,6 +1458,9 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
name: "ClusterAutoscalerProvider",
provider: algorithmprovider.ClusterAutoscalerProvider,
wantPlugins: map[string][]config.Plugin{
"QueueSortPlugin": {
{Name: "PrioritySort"},
},
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},

View File

@ -57,6 +57,7 @@ go_test(
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -366,6 +367,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
},
extenders: []FakeExtender{
{
@ -382,6 +384,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
},
extenders: []FakeExtender{
{
@ -398,6 +401,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
},
extenders: []FakeExtender{
{
@ -418,6 +422,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
},
extenders: []FakeExtender{
{
@ -434,6 +439,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
},
extenders: []FakeExtender{
{
@ -453,6 +459,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
},
extenders: []FakeExtender{
{
@ -478,6 +485,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 20),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
},
extenders: []FakeExtender{
{
@ -505,6 +513,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 1),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
},
extenders: []FakeExtender{
{
@ -530,6 +539,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
// because of the errors from errorPredicateExtender.
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
},
extenders: []FakeExtender{
{
@ -568,8 +578,9 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
registry := framework.Registry{}
plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
QueueSort: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
for _, f := range test.registerPlugins {

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
@ -385,6 +386,7 @@ func TestGenericScheduler(t *testing.T) {
}{
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin),
},
nodes: []string{"machine1", "machine2"},
@ -401,6 +403,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
},
nodes: []string{"machine1", "machine2"},
@ -412,6 +415,7 @@ func TestGenericScheduler(t *testing.T) {
{
// Fits on a machine where the pod ID matches the machine name
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
},
nodes: []string{"machine1", "machine2"},
@ -422,6 +426,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
@ -433,6 +438,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
@ -444,6 +450,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterScorePlugin("ReverseNumericMap", newReverseNumericMapPlugin(), 2),
@ -456,6 +463,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin),
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
@ -475,6 +483,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("NoPodsFilter", NewNoPodsFilterPlugin),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
@ -505,6 +514,7 @@ func TestGenericScheduler(t *testing.T) {
{
// Pod with existing PVC
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
},
nodes: []string{"machine1", "machine2"},
@ -530,6 +540,7 @@ func TestGenericScheduler(t *testing.T) {
{
// Pod with non existing PVC
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
},
nodes: []string{"machine1", "machine2"},
@ -553,6 +564,7 @@ func TestGenericScheduler(t *testing.T) {
{
// Pod with deleting PVC
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
},
nodes: []string{"machine1", "machine2"},
@ -576,6 +588,7 @@ func TestGenericScheduler(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterScorePlugin("FalseMap", newFalseMapPlugin(), 1),
st.RegisterScorePlugin("TrueMap", newTrueMapPlugin(), 2),
@ -588,6 +601,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test even pods spread predicate - 2 nodes with maxskew=1",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPluginAsExtensions(
podtopologyspread.Name,
1,
@ -634,6 +648,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test even pods spread predicate - 3 nodes with maxskew=2",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPluginAsExtensions(
podtopologyspread.Name,
1,
@ -698,6 +713,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test with filter plugin returning Unschedulable status",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(
"FakeFilter",
NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}),
@ -718,6 +734,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test with filter plugin returning UnschedulableAndUnresolvable status",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(
"FakeFilter",
NewFakeFilterPlugin(map[string]framework.Code{"3": framework.UnschedulableAndUnresolvable}),
@ -738,6 +755,7 @@ func TestGenericScheduler(t *testing.T) {
{
name: "test with partial failed filter plugin",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(
"FakeFilter",
NewFakeFilterPlugin(map[string]framework.Code{"1": framework.Unschedulable}),
@ -767,7 +785,9 @@ func TestGenericScheduler(t *testing.T) {
}
registry := framework.Registry{}
// TODO: instantiate the plugins dynamically.
plugins := &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{},
PreFilter: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
@ -779,7 +799,7 @@ func TestGenericScheduler(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs, framework.WithSnapshotSharedLister(snapshot))
pvcs := []v1.PersistentVolumeClaim{}
var pvcs []v1.PersistentVolumeClaim
pvcs = append(pvcs, test.pvcs...)
pvcLister := fakelisters.PersistentVolumeClaimLister(pvcs)
@ -817,8 +837,10 @@ func makeScheduler(nodes []*v1.Node, fns ...st.RegisterPluginFunc) *genericSched
}
registry := framework.Registry{}
// TODO: instantiate the plugins dynamically.
plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{},
QueueSort: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
for _, f := range fns {
@ -842,6 +864,7 @@ func TestFindFitAllError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(
nodes,
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
)
@ -874,6 +897,7 @@ func TestFindFitSomeError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(
nodes,
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
)
@ -934,15 +958,20 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
registry := framework.Registry{}
plugin := fakeFilterPlugin{}
registry.Register(queuesort.Name, queuesort.New)
registry.Register("FakeFilter", func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
return &plugin, nil
})
plugins := &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: queuesort.Name}},
},
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: "FakeFilter"}},
},
}
pluginConfigs := []schedulerapi.PluginConfig{
{Name: queuesort.Name},
{Name: "FakeFilter"},
}
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
@ -1089,13 +1118,16 @@ func TestZeroRequest(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
registry := framework.Registry{}
// TODO: instantiate the plugins dynamically.
plugins := &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
PostFilter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
pluginRegistrations := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterScorePlugin(noderesources.LeastAllocatedName, noderesources.NewLeastAllocated, 1),
st.RegisterScorePlugin(noderesources.BalancedAllocationName, noderesources.NewBalancedAllocation, 1),
st.RegisterScorePlugin(defaultpodtopologyspread.Name, defaultpodtopologyspread.New, 1),
@ -1270,6 +1302,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "a pod that does not fit on any machine",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin),
},
nodes: []string{"machine1", "machine2"},
@ -1283,6 +1316,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "a pod that fits with no preemption",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
},
nodes: []string{"machine1", "machine2"},
@ -1296,6 +1330,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "a pod that fits on one machine with no preemption",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
},
nodes: []string{"machine1", "machine2"},
@ -1309,6 +1344,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "a pod that fits on both machines when lower priority pods are preempted",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2"},
@ -1322,6 +1358,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "a pod that would fit on the machines, but other pods running are higher priority",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2"},
@ -1335,6 +1372,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "medium priority pod is preempted, but lower priority one stays as it is small",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2"},
@ -1349,6 +1387,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "mixed priority pods are preempted",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2"},
@ -1365,6 +1404,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "mixed priority pods are preempted, pick later StartTime one when priorities are equal",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2"},
@ -1381,6 +1421,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "pod with anti-affinity is preempted",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
st.RegisterFilterPlugin(interpodaffinity.Name, interpodaffinity.New),
},
@ -1415,6 +1456,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "preemption to resolve even pods spread FitError",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPluginAsExtensions(
podtopologyspread.Name,
1,
@ -1497,6 +1539,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
{
name: "get Unschedulable in the preemption phase when the filter plugins filtering the nodes",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2"},
@ -1539,7 +1582,9 @@ func TestSelectNodesForPreemption(t *testing.T) {
}
registry := framework.Registry{}
// TODO: instantiate the plugins dynamically.
plugins := &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{},
PreFilter: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
}
@ -1607,27 +1652,33 @@ func TestSelectNodesForPreemption(t *testing.T) {
// TestPickOneNodeForPreemption tests pickOneNodeForPreemption.
func TestPickOneNodeForPreemption(t *testing.T) {
tests := []struct {
name string
registerFilterPlugin st.RegisterPluginFunc
nodes []string
pod *v1.Pod
pods []*v1.Pod
expected []string // any of the items is valid
name string
registerPlugins []st.RegisterPluginFunc
nodes []string
pod *v1.Pod
pods []*v1.Pod
expected []string // any of the items is valid
}{
{
name: "No node needs preemption",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
name: "No node needs preemption",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}}},
expected: []string{"machine1"},
},
{
name: "a pod that fits on both machines when lower priority pods are preempted",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
name: "a pod that fits on both machines when lower priority pods are preempted",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
@ -1635,10 +1686,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
expected: []string{"machine1", "machine2"},
},
{
name: "a pod that fits on a machine with no preemption",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
name: "a pod that fits on a machine with no preemption",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
@ -1646,10 +1700,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
expected: []string{"machine3"},
},
{
name: "machine with min highest priority pod is picked",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
name: "machine with min highest priority pod is picked",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
@ -1663,10 +1720,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
expected: []string{"machine3"},
},
{
name: "when highest priorities are the same, minimum sum of priorities is picked",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
name: "when highest priorities are the same, minimum sum of priorities is picked",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
@ -1680,10 +1740,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
expected: []string{"machine2"},
},
{
name: "when highest priority and sum are the same, minimum number of pods is picked",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
name: "when highest priority and sum are the same, minimum number of pods is picked",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
@ -1702,10 +1765,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
{
// pickOneNodeForPreemption adjusts pod priorities when finding the sum of the victims. This
// test ensures that the logic works correctly.
name: "sum of adjusted priorities is considered",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
name: "sum of adjusted priorities is considered",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &negPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
@ -1721,10 +1787,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
expected: []string{"machine2"},
},
{
name: "non-overlapping lowest high priority, sum priorities, and number of pods",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2", "machine3", "machine4"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: types.UID("pod1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &veryHighPriority}},
name: "non-overlapping lowest high priority, sum priorities, and number of pods",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2", "machine3", "machine4"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: types.UID("pod1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &veryHighPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime}},
@ -1745,10 +1814,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
expected: []string{"machine1"},
},
{
name: "same priority, same number of victims, different start time for each machine's pod",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
name: "same priority, same number of victims, different start time for each machine's pod",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190103}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190103}},
@ -1762,10 +1834,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
expected: []string{"machine2"},
},
{
name: "same priority, same number of victims, different start time for all pods",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
name: "same priority, same number of victims, different start time for all pods",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190105}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190103}},
@ -1779,10 +1854,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
expected: []string{"machine3"},
},
{
name: "different priority, same number of victims, different start time for all pods",
registerFilterPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
name: "different priority, same number of victims, different start time for all pods",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
nodes: []string{"machine1", "machine2", "machine3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: veryLargeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "m1.1", UID: types.UID("m1.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &lowPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190105}},
{ObjectMeta: metav1.ObjectMeta{Name: "m1.2", UID: types.UID("m1.2")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190103}},
@ -1804,11 +1882,15 @@ func TestPickOneNodeForPreemption(t *testing.T) {
}
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
registry := framework.Registry{}
// TODO: instantiate the plugins dynamically.
plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{},
QueueSort: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
test.registerFilterPlugin(&registry, plugins, pluginConfigs)
for _, f := range test.registerPlugins {
f(&registry, plugins, pluginConfigs)
}
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs, framework.WithSnapshotSharedLister(snapshot))
g := &genericScheduler{
@ -1976,7 +2058,7 @@ func TestPreempt(t *testing.T) {
extenders []*FakeExtender
failedNodeToStatusMap framework.NodeToStatusMap
nodeNames []string
registerPlugin st.RegisterPluginFunc
registerPlugins []st.RegisterPluginFunc
expectedNode string
expectedPods []string // list of preempted pods
}{
@ -1993,9 +2075,12 @@ func TestPreempt(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1", UID: types.UID("m2.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1", UID: types.UID("m3.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
},
{
name: "One node doesn't need any preemption",
@ -2010,9 +2095,12 @@ func TestPreempt(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1", UID: types.UID("m2.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
expectedNode: "machine3",
expectedPods: []string{},
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
expectedNode: "machine3",
expectedPods: []string{},
},
{
name: "preemption for topology spread constraints",
@ -2086,13 +2174,16 @@ func TestPreempt(t *testing.T) {
"node-x": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch),
},
nodeNames: []string{"node-a/zone1", "node-b/zone1", "node-x/zone2"},
registerPlugin: st.RegisterPluginAsExtensions(
podtopologyspread.Name,
1,
podtopologyspread.New,
"PreFilter",
"Filter",
),
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPluginAsExtensions(
podtopologyspread.Name,
1,
podtopologyspread.New,
"PreFilter",
"Filter",
),
},
expectedNode: "node-b",
expectedPods: []string{"pod-b1"},
},
@ -2117,9 +2208,12 @@ func TestPreempt(t *testing.T) {
predicates: []fitPredicate{machine1PredicateExtender},
},
},
registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
},
{
name: "Scheduler extenders do not allow any preemption",
@ -2139,9 +2233,12 @@ func TestPreempt(t *testing.T) {
predicates: []fitPredicate{falsePredicateExtender},
},
},
registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
expectedNode: "",
expectedPods: []string{},
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
expectedNode: "",
expectedPods: []string{},
},
{
name: "One scheduler extender allows only machine1, the other returns error but ignorable. Only machine1 would be chosen",
@ -2165,9 +2262,12 @@ func TestPreempt(t *testing.T) {
predicates: []fitPredicate{machine1PredicateExtender},
},
},
registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
},
{
name: "One scheduler extender allows only machine1, but it is not interested in given pod, otherwise machine1 would have been chosen",
@ -2191,9 +2291,12 @@ func TestPreempt(t *testing.T) {
predicates: []fitPredicate{truePredicateExtender},
},
},
registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
expectedNode: "machine3",
expectedPods: []string{},
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
expectedNode: "machine3",
expectedPods: []string{},
},
{
name: "no preempting in pod",
@ -2208,9 +2311,12 @@ func TestPreempt(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1", UID: types.UID("m2.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1", UID: types.UID("m3.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
expectedNode: "",
expectedPods: nil,
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
expectedNode: "",
expectedPods: nil,
},
{
name: "PreemptionPolicy is nil",
@ -2225,9 +2331,12 @@ func TestPreempt(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "m2.1", UID: types.UID("m2.1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
{ObjectMeta: metav1.ObjectMeta{Name: "m3.1", UID: types.UID("m3.1")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine3"}, Status: v1.PodStatus{Phase: v1.PodRunning}},
},
registerPlugin: st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(noderesources.FitName, noderesources.NewFit),
},
expectedNode: "machine1",
expectedPods: []string{"m1.1", "m1.2"},
},
}
@ -2274,12 +2383,16 @@ func TestPreempt(t *testing.T) {
}
registry := framework.Registry{}
// TODO: instantiate the plugins dynamically.
plugins := &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{},
PreFilter: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
test.registerPlugin(&registry, plugins, pluginConfigs)
for _, f := range test.registerPlugins {
f(&registry, plugins, pluginConfigs)
}
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs, framework.WithSnapshotSharedLister(snapshot))
@ -2420,6 +2533,7 @@ func TestFairEvaluationForNodes(t *testing.T) {
nodes := makeNodeList(nodeNames)
g := makeScheduler(
nodes,
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
)
// To make numAllNodes % nodesToFind != 0

View File

@ -45,6 +45,7 @@ import (
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
@ -289,6 +290,12 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler,
// Combine all framework configurations. If this results in any duplication, framework
// instantiation should fail.
var defaultPlugins schedulerapi.Plugins
// "PrioritySort" is neither a predicate nor priority before. We make it as the default QueueSort plugin.
defaultPlugins.Append(&schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: queuesort.Name}},
},
})
defaultPlugins.Append(pluginsForPredicates)
defaultPlugins.Append(pluginsForPriorities)
defaultPlugins.Apply(c.plugins)

View File

@ -44,6 +44,7 @@ import (
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -110,6 +111,10 @@ func TestCreateFromConfig(t *testing.T) {
if hpa != v1.DefaultHardPodAffinitySymmetricWeight {
t.Errorf("Wrong hardPodAffinitySymmetricWeight, ecpected: %d, got: %d", v1.DefaultHardPodAffinitySymmetricWeight, hpa)
}
queueSortPls := sched.Framework.ListPlugins()["QueueSortPlugin"]
if len(queueSortPls) == 0 || queueSortPls[0].Name != queuesort.Name {
t.Errorf("QueueSort plugin %q not registered.", queuesort.Name)
}
// Verify that node label predicate/priority are converted to framework plugins.
wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}`

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
"//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
@ -62,6 +63,7 @@ filegroup(
"//pkg/scheduler/framework/plugins/nodeunschedulable:all-srcs",
"//pkg/scheduler/framework/plugins/nodevolumelimits:all-srcs",
"//pkg/scheduler/framework/plugins/podtopologyspread:all-srcs",
"//pkg/scheduler/framework/plugins/queuesort:all-srcs",
"//pkg/scheduler/framework/plugins/serviceaffinity:all-srcs",
"//pkg/scheduler/framework/plugins/tainttoleration:all-srcs",
"//pkg/scheduler/framework/plugins/volumebinding:all-srcs",

View File

@ -0,0 +1,37 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["priority_sort.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort",
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["priority_sort_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,50 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queuesort
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/api/v1/pod"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// Name is the name of the plugin used in the plugin registry and configurations.
const Name = "PrioritySort"
// PrioritySort is a plugin that implements Priority based sorting.
type PrioritySort struct{}
var _ framework.QueueSortPlugin = &PrioritySort{}
// Name returns name of the plugin.
func (pl *PrioritySort) Name() string {
return Name
}
// Less is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodInfo.timestamp.
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.PodInfo) bool {
p1 := pod.GetPodPriority(pInfo1.Pod)
p2 := pod.GetPodPriority(pInfo2.Pod)
return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}
// New initializes a new plugin and returns it.
func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
return &PrioritySort{}, nil
}

View File

@ -0,0 +1,121 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queuesort
import (
"testing"
"time"
v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
func TestLess(t *testing.T) {
prioritySort := &PrioritySort{}
var lowPriority, highPriority = int32(10), int32(100)
t1 := time.Now()
t2 := t1.Add(time.Second)
for _, tt := range []struct {
name string
p1 *framework.PodInfo
p2 *framework.PodInfo
expected bool
}{
{
name: "p1.priority less than p2.priority",
p1: &framework.PodInfo{
Pod: &v1.Pod{
Spec: v1.PodSpec{
Priority: &lowPriority,
},
},
},
p2: &framework.PodInfo{
Pod: &v1.Pod{
Spec: v1.PodSpec{
Priority: &highPriority,
},
},
},
expected: false, // p2 should be ahead of p1 in the queue
},
{
name: "p1.priority greater than p2.priority",
p1: &framework.PodInfo{
Pod: &v1.Pod{
Spec: v1.PodSpec{
Priority: &highPriority,
},
},
},
p2: &framework.PodInfo{
Pod: &v1.Pod{
Spec: v1.PodSpec{
Priority: &lowPriority,
},
},
},
expected: true, // p1 should be ahead of p2 in the queue
},
{
name: "equal priority. p1 is added to schedulingQ earlier than p2",
p1: &framework.PodInfo{
Pod: &v1.Pod{
Spec: v1.PodSpec{
Priority: &highPriority,
},
},
Timestamp: t1,
},
p2: &framework.PodInfo{
Pod: &v1.Pod{
Spec: v1.PodSpec{
Priority: &highPriority,
},
},
Timestamp: t2,
},
expected: true, // p1 should be ahead of p2 in the queue
},
{
name: "equal priority. p2 is added to schedulingQ earlier than p1",
p1: &framework.PodInfo{
Pod: &v1.Pod{
Spec: v1.PodSpec{
Priority: &highPriority,
},
},
Timestamp: t2,
},
p2: &framework.PodInfo{
Pod: &v1.Pod{
Spec: v1.PodSpec{
Priority: &highPriority,
},
},
Timestamp: t1,
},
expected: false, // p2 should be ahead of p1 in the queue
},
} {
t.Run(tt.name, func(t *testing.T) {
if got := prioritySort.Less(tt.p1, tt.p2); got != tt.expected {
t.Errorf("expected %v, got %v", tt.expected, got)
}
})
}
}

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
@ -77,5 +78,6 @@ func NewInTreeRegistry(args *RegistryArgs) framework.Registry {
interpodaffinity.Name: interpodaffinity.New,
nodelabel.Name: nodelabel.New,
serviceaffinity.Name: serviceaffinity.New,
queuesort.Name: queuesort.New,
}
}

View File

@ -248,6 +248,10 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
}
if len(f.queueSortPlugins) == 0 {
return nil, fmt.Errorf("no queue sort plugin is enabled")
}
if len(f.queueSortPlugins) > 1 {
return nil, fmt.Errorf("only one queue sort plugin can be enabled")
}
@ -287,8 +291,14 @@ func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, plugi
// QueueSortFunc returns the function to sort pods in scheduling queue
func (f *framework) QueueSortFunc() LessFunc {
if f == nil {
// If framework is nil, simply keep their order unchanged.
// NOTE: this is primarily for tests.
return func(_, _ *PodInfo) bool { return false }
}
if len(f.queueSortPlugins) == 0 {
return nil
panic("No QueueSort plugin is registered in the framework.")
}
// Only one QueueSort plugin can be enabled.

View File

@ -26,7 +26,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -37,6 +36,7 @@ import (
)
const (
queueSortPlugin = "no-op-queue-sort-plugin"
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
scorePlugin1 = "score-plugin-1"
@ -275,7 +275,24 @@ func (pp *TestPermitPlugin) Permit(ctx context.Context, state *CycleState, p *v1
return NewStatus(Wait, ""), time.Duration(10 * time.Second)
}
var registry Registry = func() Registry {
var _ QueueSortPlugin = &TestQueueSortPlugin{}
func newQueueSortPlugin(_ *runtime.Unknown, _ FrameworkHandle) (Plugin, error) {
return &TestQueueSortPlugin{}, nil
}
// TestQueueSortPlugin is a no-op implementation for QueueSort extension point.
type TestQueueSortPlugin struct{}
func (pl *TestQueueSortPlugin) Name() string {
return queueSortPlugin
}
func (pl *TestQueueSortPlugin) Less(_, _ *PodInfo) bool {
return false
}
var registry = func() Registry {
r := make(Registry)
r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1)
r.Register(scoreWithNormalizePlugin2, newScoreWithNormalizePlugin2)
@ -291,7 +308,7 @@ var defaultWeights = map[string]int32{
scorePlugin1: 1,
}
var emptyArgs []config.PluginConfig = make([]config.PluginConfig, 0)
var emptyArgs = make([]config.PluginConfig, 0)
var state = &CycleState{}
// Pod is only used for logging errors.
@ -301,6 +318,22 @@ var nodes = []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "node2"}},
}
func newFrameworkWithQueueSortEnabled(r Registry, pl *config.Plugins, plc []config.PluginConfig, opts ...Option) (Framework, error) {
if _, ok := r[queueSortPlugin]; !ok {
r[queueSortPlugin] = newQueueSortPlugin
}
plugins := &config.Plugins{}
plugins.Append(&config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: queueSortPlugin},
},
},
})
plugins.Append(pl)
return NewFramework(r, plugins, plc, opts...)
}
func TestInitFrameworkWithScorePlugins(t *testing.T) {
tests := []struct {
name string
@ -338,7 +371,7 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewFramework(registry, tt.plugins, emptyArgs)
_, err := newFrameworkWithQueueSortEnabled(registry, tt.plugins, emptyArgs)
if tt.initErr && err == nil {
t.Fatal("Framework initialization should fail")
}
@ -534,7 +567,7 @@ func TestRunScorePlugins(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Inject the results via Args in PluginConfig.
f, err := NewFramework(registry, tt.plugins, tt.pluginConfigs)
f, err := newFrameworkWithQueueSortEnabled(registry, tt.plugins, tt.pluginConfigs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
@ -572,7 +605,7 @@ func TestPreFilterPlugins(t *testing.T) {
})
plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithExtensionsPluginName}, {Name: preFilterPluginName}}}}
t.Run("TestPreFilterPlugin", func(t *testing.T) {
f, err := NewFramework(r, plugins, emptyArgs)
f, err := newFrameworkWithQueueSortEnabled(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
@ -798,7 +831,7 @@ func TestFilterPlugins(t *testing.T) {
config.Plugin{Name: pl.name})
}
f, err := NewFramework(registry, cfgPls, emptyArgs, WithRunAllFilters(tt.runAllFilters))
f, err := newFrameworkWithQueueSortEnabled(registry, cfgPls, emptyArgs, WithRunAllFilters(tt.runAllFilters))
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
@ -1003,7 +1036,7 @@ func TestRecordingMetrics(t *testing.T) {
Unreserve: pluginSet,
}
recorder := newMetricsRecorder(100, time.Nanosecond)
f, err := NewFramework(r, plugins, emptyArgs, withMetricsRecorder(recorder))
f, err := newFrameworkWithQueueSortEnabled(r, plugins, emptyArgs, withMetricsRecorder(recorder))
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
@ -1052,7 +1085,7 @@ func TestPermitWaitingMetric(t *testing.T) {
plugins := &config.Plugins{
Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 1}}},
}
f, err := NewFramework(r, plugins, emptyArgs)
f, err := newFrameworkWithQueueSortEnabled(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
@ -1082,7 +1115,7 @@ func TestRejectWaitingPod(t *testing.T) {
Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}},
}
f, err := NewFramework(r, plugins, emptyArgs)
f, err := newFrameworkWithQueueSortEnabled(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}

View File

@ -10,7 +10,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/queue",
visibility = ["//pkg/scheduler:__subpackages__"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/heap:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
@ -33,20 +32,19 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
],
)

View File

@ -36,7 +36,6 @@ import (
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/v1/pod"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/heap"
"k8s.io/kubernetes/pkg/scheduler/metrics"
@ -196,17 +195,6 @@ func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo {
}
}
// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodInfo.timestamp.
func activeQComp(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*framework.PodInfo)
prio1 := pod.GetPodPriority(pInfo1.Pod)
prio2 := pod.GetPodPriority(pInfo2.Pod)
return (prio1 > prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
fwk framework.Framework,
@ -217,16 +205,10 @@ func NewPriorityQueue(
opt(&options)
}
comp := activeQComp
if fwk != nil {
if queueSortFunc := fwk.QueueSortFunc(); queueSortFunc != nil {
comp = func(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*framework.PodInfo)
return queueSortFunc(pInfo1, pInfo2)
}
}
comp := func(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*framework.PodInfo)
return fwk.QueueSortFunc()(pInfo1, pInfo2)
}
pq := &PriorityQueue{

View File

@ -17,7 +17,6 @@ limitations under the License.
package queue
import (
"context"
"fmt"
"reflect"
"strings"
@ -30,17 +29,16 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
const queueMetricMetadata = `
@ -130,7 +128,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
}
func TestPriorityQueue_Add(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -166,121 +164,42 @@ func TestPriorityQueue_Add(t *testing.T) {
}
}
type fakeFramework struct{}
func (*fakeFramework) QueueSortFunc() framework.LessFunc {
return func(podInfo1, podInfo2 *framework.PodInfo) bool {
prio1 := podutil.GetPodPriority(podInfo1.Pod)
prio2 := podutil.GetPodPriority(podInfo2.Pod)
return prio1 < prio2
func newDefaultFramework() framework.Framework {
defaultCfg := algorithmprovider.NewRegistry(1)[schedulerapi.SchedulerDefaultProviderName]
pl, pls := defaultCfg.FrameworkPlugins, defaultCfg.FrameworkPluginConfig
fakeClient := fake.NewSimpleClientset()
fwk, err := framework.NewFramework(
frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{}),
pl,
pls,
framework.WithClientSet(fakeClient),
framework.WithInformerFactory(informers.NewSharedInformerFactory(fakeClient, 0)),
framework.WithSnapshotSharedLister(nodeinfosnapshot.NewEmptySnapshot()),
)
if err != nil {
panic(err)
}
}
func (f *fakeFramework) HasFilterPlugins() bool {
return true
}
func (f *fakeFramework) HasScorePlugins() bool {
return true
}
func (f *fakeFramework) ListPlugins() map[string][]config.Plugin {
return nil
}
func (*fakeFramework) NodeInfoSnapshot() *nodeinfosnapshot.Snapshot {
return nil
}
func (*fakeFramework) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
return nil
}
func (*fakeFramework) RunFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) framework.PluginToStatus {
return nil
}
func (*fakeFramework) RunPreFilterExtensionAddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}
func (*fakeFramework) RunPreFilterExtensionRemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}
func (*fakeFramework) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) {
return nil, nil
}
func (*fakeFramework) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
}
func (*fakeFramework) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status {
return nil
}
func (*fakeFramework) RunReservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunUnreservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
}
func (*fakeFramework) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) IterateOverWaitingPods(callback func(framework.WaitingPod)) {}
func (*fakeFramework) GetWaitingPod(uid types.UID) framework.WaitingPod {
return nil
}
func (*fakeFramework) RejectWaitingPod(uid types.UID) {
}
func (*fakeFramework) ClientSet() clientset.Interface {
return nil
}
func (*fakeFramework) SharedInformerFactory() informers.SharedInformerFactory {
return nil
}
func (*fakeFramework) SnapshotSharedLister() schedulerlisters.SharedLister {
return nil
}
func (*fakeFramework) VolumeBinder() *volumebinder.VolumeBinder {
return nil
return fwk
}
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := createAndRunPriorityQueue(&fakeFramework{})
q := createAndRunPriorityQueue(newDefaultFramework())
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
if err := q.Add(&highPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != &highPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name)
}
if p, err := q.Pop(); err != nil || p.Pod != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Pod.Name)
}
}
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&highPriNominatedPod)
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
@ -312,7 +231,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
// Pods in and before current scheduling cycle will be put back to activeQueue
// if we were trying to schedule them when we received move request.
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
q := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(time.Now())))
q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(time.Now())))
totalNum := 10
expectedPods := make([]v1.Pod, 0, totalNum)
for i := 0; i < totalNum; i++ {
@ -379,7 +298,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
}
func TestPriorityQueue_Pop(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
@ -396,7 +315,7 @@ func TestPriorityQueue_Pop(t *testing.T) {
}
func TestPriorityQueue_Update(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Update(nil, &highPriorityPod)
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
@ -432,7 +351,7 @@ func TestPriorityQueue_Update(t *testing.T) {
}
func TestPriorityQueue_Delete(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Update(&highPriorityPod, &highPriNominatedPod)
q.Add(&unschedulablePod)
if err := q.Delete(&highPriNominatedPod); err != nil {
@ -456,7 +375,7 @@ func TestPriorityQueue_Delete(t *testing.T) {
}
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod)
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod))
@ -502,7 +421,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
Spec: v1.PodSpec{NodeName: "machine1"},
}
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod)
// Add a couple of pods to the unschedulableQ.
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
@ -523,7 +442,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
}
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod)
q.Add(&unschedulablePod)
q.Add(&highPriorityPod)
@ -548,7 +467,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
return pendingSet
}
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod)
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod))
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod))
@ -564,7 +483,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
}
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -634,7 +553,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
func TestPriorityQueue_NewWithOptions(t *testing.T) {
q := createAndRunPriorityQueue(
nil,
newDefaultFramework(),
WithPodInitialBackoffDuration(2*time.Second),
WithPodMaxBackoffDuration(20*time.Second),
)
@ -806,7 +725,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
}{
{
name: "PriorityQueue close",
q: createAndRunPriorityQueue(nil),
q: createAndRunPriorityQueue(newDefaultFramework()),
expectedErr: fmt.Errorf(queueClosed),
},
}
@ -835,7 +754,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
// ensures that an unschedulable pod does not block head of the queue when there
// are frequent events that move pods to the active queue.
func TestRecentlyTriedPodsGoBack(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
// Add a few pods to priority queue.
for i := 0; i < 5; i++ {
p := v1.Pod{
@ -889,7 +808,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
// This behavior ensures that an unschedulable pod does not block head of the queue when there
// are frequent events that move pods to the active queue.
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
// Add an unschedulable pod to a priority queue.
// This makes a situation that the pod was tried to schedule
@ -980,7 +899,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// TestHighPriorityBackoff tests that a high priority pod does not block
// other pods if it is unschedulable
func TestHighPriorityBackoff(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -1043,7 +962,7 @@ func TestHighPriorityBackoff(t *testing.T) {
// TestHighPriorityFlushUnschedulableQLeftover tests that pods will be moved to
// activeQ after one minutes if it is in unschedulableQ
func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
q := createAndRunPriorityQueue(nil)
q := createAndRunPriorityQueue(newDefaultFramework())
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-midpod",
@ -1240,7 +1159,7 @@ func TestPodTimestamp(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp)))
queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp)))
var podInfoList []*framework.PodInfo
for i, op := range test.operations {
@ -1407,7 +1326,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resetMetrics()
queue := createAndRunPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp)))
queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp)))
for i, op := range test.operations {
for _, pInfo := range test.operands[i] {
op(queue, pInfo)
@ -1436,7 +1355,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 1: A pod is created and scheduled after 1 attempt. The queue operations are
// Add -> Pop.
c := clock.NewFakeClock(timestamp)
queue := createAndRunPriorityQueue(nil, WithClock(c))
queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
queue.Add(pod)
pInfo, err := queue.Pop()
if err != nil {
@ -1447,7 +1366,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 2: A pod is created and scheduled after 2 attempts. The queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop.
c = clock.NewFakeClock(timestamp)
queue = createAndRunPriorityQueue(nil, WithClock(c))
queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@ -1467,7 +1386,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 3: Similar to case 2, but before the second pop, call update, the queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop.
c = clock.NewFakeClock(timestamp)
queue = createAndRunPriorityQueue(nil, WithClock(c))
queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@ -1565,7 +1484,7 @@ func TestIncomingPodsMetrics(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
metrics.SchedulerQueueIncomingPods.Reset()
queue := NewPriorityQueue(nil, WithClock(clock.NewFakeClock(timestamp)))
queue := NewPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp)))
queue.Close()
queue.Run()
for _, op := range test.operations {

View File

@ -49,6 +49,7 @@ import (
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -355,8 +356,11 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
f := st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter")
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, f, pod, &node)
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, pod, &node, fns...)
waitPodExpireChan := make(chan struct{})
timeout := make(chan struct{})
@ -418,8 +422,11 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
scache.AddNode(&node)
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
f := st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter")
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, f, firstPod, &node)
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, firstPod, &node, fns...)
// We use conflicted pod ports to incur fit predicate failure.
secondPod := podWithPort("bar", "", 8080)
@ -512,10 +519,13 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
f := st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter")
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry(
queuedPodStore, scache, informerFactory, f, stop, test.BindingDuration)
queuedPodStore, scache, informerFactory, stop, test.BindingDuration, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@ -547,9 +557,9 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) {
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
informerFactory informers.SharedInformerFactory, stop chan struct{}, f st.RegisterPluginFunc, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) {
informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, nil)
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@ -629,8 +639,11 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
fmt.Sprintf("Insufficient %v", v1.ResourceMemory),
)
}
f := st.RegisterFilterPlugin("PodFitsResources", noderesources.NewFit)
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, nil)
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("PodFitsResources", noderesources.NewFit),
}
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@ -657,14 +670,18 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, f st.RegisterPluginFunc, recorder events.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, recorder events.EventRecorder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
registry := framework.Registry{}
// TODO: instantiate the plugins dynamically.
plugins := &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{},
PreFilter: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
f(&registry, plugins, pluginConfigs)
for _, f := range fns {
f(&registry, plugins, pluginConfigs)
}
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
algo := core.NewGenericScheduler(
scache,
@ -711,14 +728,18 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
return sched, bindingChan, errChan
}
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, f st.RegisterPluginFunc, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, stop chan struct{}, bindingTime time.Duration, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding) {
registry := framework.Registry{}
// TODO: instantiate the plugins dynamically.
plugins := &schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{},
PreFilter: &schedulerapi.PluginSet{},
Filter: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
f(&registry, plugins, pluginConfigs)
for _, f := range fns {
f(&registry, plugins, pluginConfigs)
}
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
queue := internalqueue.NewSchedulingQueue(nil)
algo := core.NewGenericScheduler(
@ -785,8 +806,11 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
volumeBindingNewFunc := func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return volumebinding.NewFromVolumeBinder(fakeVolumeBinder), nil
}
f := st.RegisterFilterPlugin(volumebinding.Name, volumeBindingNewFunc)
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, f, recorder)
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin(volumebinding.Name, volumeBindingNewFunc),
}
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, recorder, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
s.VolumeBinder = fakeVolumeBinder

View File

@ -25,6 +25,11 @@ import (
// to register a Filter Plugin to a given registry.
type RegisterPluginFunc func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig)
// RegisterQueueSortPlugin returns a function to register a QueueSort Plugin to a given registry.
func RegisterQueueSortPlugin(pluginName string, pluginNewFunc framework.PluginFactory) RegisterPluginFunc {
return RegisterPluginAsExtensions(pluginName, 1, pluginNewFunc, "QueueSort")
}
// RegisterFilterPlugin returns a function to register a Filter Plugin to a given registry.
func RegisterFilterPlugin(pluginName string, pluginNewFunc framework.PluginFactory) RegisterPluginFunc {
return RegisterPluginAsExtensions(pluginName, 1, pluginNewFunc, "Filter")
@ -59,6 +64,8 @@ func RegisterPluginAsExtensions(pluginName string, weight int32, pluginNewFunc f
func getPluginSetByExtension(plugins *schedulerapi.Plugins, extension string) *schedulerapi.PluginSet {
switch extension {
case "QueueSort":
return plugins.QueueSort
case "Filter":
return plugins.Filter
case "PreFilter":

View File

@ -79,6 +79,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
]
}`,
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
},
@ -98,6 +99,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"apiVersion" : "v1"
}`,
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
@ -144,6 +146,7 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"priorities" : []
}`,
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "TaintToleration"},
@ -160,6 +163,7 @@ priorities:
weight: 1
`,
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
},
@ -178,6 +182,7 @@ priorities:
kind: Policy
`,
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
@ -223,6 +228,7 @@ predicates: []
priorities: []
`,
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"QueueSortPlugin": {{Name: "PrioritySort"}},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "TaintToleration"},