diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 59217f356f2..f509cf4e098 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -13,6 +13,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/scheduler/apis/config:go_default_library", + "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -54,9 +55,12 @@ go_test( deps = [ "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config/scheme:go_default_library", + "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", + "//vendor/github.com/prometheus/client_model/go:go_default_library", ], ) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 7bab9b4b30f..90fc2648f7f 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -22,7 +22,7 @@ import ( "reflect" "time" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -31,13 +31,26 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/metrics" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) const ( // Specifies the maximum timeout a permit plugin can return. - maxTimeout time.Duration = 15 * time.Minute + maxTimeout time.Duration = 15 * time.Minute + preFilter = "PreFilter" + preFilterExtensionAddPod = "PreFilterExtensionAddPod" + preFilterExtensionRemovePod = "PreFilterExtensionRemovePod" + filter = "Filter" + postFilter = "PostFilter" + score = "Score" + preBind = "PreBind" + bind = "Bind" + postBind = "PostBind" + reserve = "Reserve" + unreserve = "Unreserve" + permit = "Permit" ) // framework is the component responsible for initializing and running scheduler @@ -233,7 +246,9 @@ func (f *framework) QueueSortFunc() LessFunc { // anything but Success. If a non-success status is returned, then the scheduling // cycle is aborted. func (f *framework) RunPreFilterPlugins( - state *CycleState, pod *v1.Pod) *Status { + state *CycleState, pod *v1.Pod) (status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, preFilter, status) }() for _, pl := range f.preFilterPlugins { status := pl.PreFilter(state, pod) if !status.IsSuccess() { @@ -255,7 +270,9 @@ func (f *framework) RunPreFilterPlugins( // PreFilter plugins. It returns directly if any of the plugins return any // status other than Success. func (f *framework) RunPreFilterExtensionAddPod(state *CycleState, podToSchedule *v1.Pod, - podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, preFilterExtensionAddPod, status) }() for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue @@ -275,7 +292,9 @@ func (f *framework) RunPreFilterExtensionAddPod(state *CycleState, podToSchedule // PreFilter plugins. It returns directly if any of the plugins return any // status other than Success. func (f *framework) RunPreFilterExtensionRemovePod(state *CycleState, podToSchedule *v1.Pod, - podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, preFilterExtensionRemovePod, status) }() for _, pl := range f.preFilterPlugins { if pl.PreFilterExtensions() == nil { continue @@ -296,7 +315,9 @@ func (f *framework) RunPreFilterExtensionRemovePod(state *CycleState, podToSched // given node is not suitable for running pod. // Meanwhile, the failure message and status are set for the given node. func (f *framework) RunFilterPlugins(state *CycleState, - pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, filter, status) }() for _, pl := range f.filterPlugins { status := pl.Filter(state, pod, nodeInfo) if !status.IsSuccess() { @@ -321,7 +342,9 @@ func (f *framework) RunPostFilterPlugins( pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap, -) *Status { +) (status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, postFilter, status) }() for _, pl := range f.postFilterPlugins { status := pl.PostFilter(state, pod, nodes, filteredNodesStatuses) if !status.IsSuccess() { @@ -338,7 +361,9 @@ func (f *framework) RunPostFilterPlugins( // stores for each scoring plugin name the corresponding NodeScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns // a non-success status. -func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status) { +func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, score, status) }() pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins)) for _, pl := range f.scorePlugins { pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes)) @@ -374,7 +399,8 @@ func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1. if pl.ScoreExtensions() == nil { return } - if status := pl.ScoreExtensions().NormalizeScore(state, pod, nodeScoreList); !status.IsSuccess() { + status := pl.ScoreExtensions().NormalizeScore(state, pod, nodeScoreList) + if !status.IsSuccess() { err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message()) errCh.SendErrorWithCancel(err, cancel) return @@ -416,7 +442,9 @@ func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1. // failure (bool) if any of the plugins returns an error. It also returns an // error containing the rejection message or the error occurred in the plugin. func (f *framework) RunPreBindPlugins( - state *CycleState, pod *v1.Pod, nodeName string) *Status { + state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, preBind, status) }() for _, pl := range f.preBindPlugins { status := pl.PreBind(state, pod, nodeName) if !status.IsSuccess() { @@ -429,11 +457,12 @@ func (f *framework) RunPreBindPlugins( } // RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status. -func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status { +func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, bind, status) }() if len(f.bindPlugins) == 0 { return NewStatus(Skip, "") } - var status *Status for _, bp := range f.bindPlugins { status = bp.Bind(state, pod, nodeName) if status != nil && status.Code() == Skip { @@ -452,6 +481,8 @@ func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName stri // RunPostBindPlugins runs the set of configured postbind plugins. func (f *framework) RunPostBindPlugins( state *CycleState, pod *v1.Pod, nodeName string) { + startTime := time.Now() + defer recordExtensionPointDuration(startTime, postBind, nil) for _, pl := range f.postBindPlugins { pl.PostBind(state, pod, nodeName) } @@ -461,7 +492,9 @@ func (f *framework) RunPostBindPlugins( // plugins returns an error, it does not continue running the remaining ones and // returns the error. In such case, pod will not be scheduled. func (f *framework) RunReservePlugins( - state *CycleState, pod *v1.Pod, nodeName string) *Status { + state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, reserve, status) }() for _, pl := range f.reservePlugins { status := pl.Reserve(state, pod, nodeName) if !status.IsSuccess() { @@ -476,6 +509,8 @@ func (f *framework) RunReservePlugins( // RunUnreservePlugins runs the set of configured unreserve plugins. func (f *framework) RunUnreservePlugins( state *CycleState, pod *v1.Pod, nodeName string) { + startTime := time.Now() + defer recordExtensionPointDuration(startTime, unreserve, nil) for _, pl := range f.unreservePlugins { pl.Unreserve(state, pod, nodeName) } @@ -489,7 +524,9 @@ func (f *framework) RunUnreservePlugins( // Note that if multiple plugins asked to wait, then we wait for the minimum // timeout duration. func (f *framework) RunPermitPlugins( - state *CycleState, pod *v1.Pod, nodeName string) *Status { + state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { + startTime := time.Now() + defer func() { recordExtensionPointDuration(startTime, permit, status) }() timeout := maxTimeout statusCode := Success for _, pl := range f.permitPlugins { @@ -620,3 +657,11 @@ func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plu } return pgMap } + +func recordExtensionPointDuration(start time.Time, extensionPoint string, status *Status) { + statusCode := Success.String() + if status != nil { + statusCode = status.Code().String() + } + metrics.FrameworkExtensionPointDuration.WithLabelValues(extensionPoint, statusCode).Observe(metrics.SinceInSeconds(start)) +} diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index b4715504207..f9e9fc701c1 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -18,11 +18,15 @@ package v1alpha1 import ( "fmt" + "github.com/prometheus/client_golang/prometheus" + "k8s.io/kubernetes/pkg/scheduler/metrics" "reflect" "strings" "testing" + "time" - v1 "k8s.io/api/core/v1" + dto "github.com/prometheus/client_model/go" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -37,6 +41,7 @@ const ( preFilterPluginName = "prefilter-plugin" preFilterWithExtensionsPluginName = "prefilter-with-extensions-plugin" duplicatePluginName = "duplicate-plugin" + testPlugin = "test-plugin" ) // TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface. @@ -118,6 +123,51 @@ func (pl *PluginNotImplementingScore) Name() string { return pluginNotImplementingScore } +// TestPlugin implements all Plugin interfaces. +type TestPlugin struct { + name string + inj injectedResult +} + +func (pl *TestPlugin) Name() string { + return pl.name +} + +func (pl *TestPlugin) Score(state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) { + return 0, NewStatus(Code(pl.inj.ScoreStatus), "injected status") +} + +func (pl *TestPlugin) ScoreExtensions() ScoreExtensions { + return nil +} + +func (pl *TestPlugin) PreFilter(state *CycleState, p *v1.Pod) *Status { + return NewStatus(Code(pl.inj.PreFilterStatus), "injected status") +} +func (pl *TestPlugin) PreFilterExtensions() PreFilterExtensions { + return nil +} +func (pl *TestPlugin) Filter(state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + return NewStatus(Code(pl.inj.FilterStatus), "injected status") +} +func (pl *TestPlugin) PostFilter(state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status { + return NewStatus(Code(pl.inj.PostFilterStatus), "injected status") +} +func (pl *TestPlugin) Reserve(state *CycleState, p *v1.Pod, nodeName string) *Status { + return NewStatus(Code(pl.inj.ReserveStatus), "injected status") +} +func (pl *TestPlugin) PreBind(state *CycleState, p *v1.Pod, nodeName string) *Status { + return NewStatus(Code(pl.inj.PreBindStatus), "injected status") +} +func (pl *TestPlugin) PostBind(state *CycleState, p *v1.Pod, nodeName string) {} +func (pl *TestPlugin) Unreserve(state *CycleState, p *v1.Pod, nodeName string) {} +func (pl *TestPlugin) Permit(state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration) { + return NewStatus(Code(pl.inj.PermitStatus), "injected status"), time.Duration(0) +} +func (pl *TestPlugin) Bind(state *CycleState, p *v1.Pod, nodeName string) *Status { + return NewStatus(Code(pl.inj.BindStatus), "injected status") +} + // TestPreFilterPlugin only implements PreFilterPlugin interface. type TestPreFilterPlugin struct { PreFilterCalled int @@ -224,12 +274,12 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) { }{ { name: "enabled Score plugin doesn't exist in registry", - plugins: buildConfigDefaultWeights("notExist"), + plugins: buildScoreConfigDefaultWeights("notExist"), initErr: true, }, { name: "enabled Score plugin doesn't extend the ScorePlugin interface", - plugins: buildConfigDefaultWeights(pluginNotImplementingScore), + plugins: buildScoreConfigDefaultWeights(pluginNotImplementingScore), initErr: true, }, { @@ -238,15 +288,15 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) { }, { name: "enabled Score plugin list is empty", - plugins: buildConfigDefaultWeights(), + plugins: buildScoreConfigDefaultWeights(), }, { name: "enabled plugin only implements ScorePlugin interface", - plugins: buildConfigDefaultWeights(scorePlugin1), + plugins: buildScoreConfigDefaultWeights(scorePlugin1), }, { name: "enabled plugin implements ScoreWithNormalizePlugin interface", - plugins: buildConfigDefaultWeights(scoreWithNormalizePlugin1), + plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1), }, } @@ -297,12 +347,12 @@ func TestRunScorePlugins(t *testing.T) { }{ { name: "no Score plugins", - plugins: buildConfigDefaultWeights(), + plugins: buildScoreConfigDefaultWeights(), want: PluginToNodeScores{}, }, { name: "single Score plugin", - plugins: buildConfigDefaultWeights(scorePlugin1), + plugins: buildScoreConfigDefaultWeights(scorePlugin1), pluginConfigs: []config.PluginConfig{ { Name: scorePlugin1, @@ -319,7 +369,7 @@ func TestRunScorePlugins(t *testing.T) { { name: "single ScoreWithNormalize plugin", //registry: registry, - plugins: buildConfigDefaultWeights(scoreWithNormalizePlugin1), + plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1), pluginConfigs: []config.PluginConfig{ { Name: scoreWithNormalizePlugin1, @@ -335,7 +385,7 @@ func TestRunScorePlugins(t *testing.T) { }, { name: "2 Score plugins, 2 NormalizeScore plugins", - plugins: buildConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1, scoreWithNormalizePlugin2), + plugins: buildScoreConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1, scoreWithNormalizePlugin2), pluginConfigs: []config.PluginConfig{ { Name: scorePlugin1, @@ -371,11 +421,11 @@ func TestRunScorePlugins(t *testing.T) { { Name: scoreWithNormalizePlugin1, Args: runtime.Unknown{ - Raw: []byte(`{ "scoreErr": true }`), + Raw: []byte(`{ "scoreStatus": 1 }`), }, }, }, - plugins: buildConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1), + plugins: buildScoreConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1), err: true, }, { @@ -384,16 +434,16 @@ func TestRunScorePlugins(t *testing.T) { { Name: scoreWithNormalizePlugin1, Args: runtime.Unknown{ - Raw: []byte(`{ "normalizeErr": true }`), + Raw: []byte(`{ "normalizeStatus": 1 }`), }, }, }, - plugins: buildConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1), + plugins: buildScoreConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1), err: true, }, { name: "Score plugin return score greater than MaxNodeScore", - plugins: buildConfigDefaultWeights(scorePlugin1), + plugins: buildScoreConfigDefaultWeights(scorePlugin1), pluginConfigs: []config.PluginConfig{ { Name: scorePlugin1, @@ -406,7 +456,7 @@ func TestRunScorePlugins(t *testing.T) { }, { name: "Score plugin return score less than MinNodeScore", - plugins: buildConfigDefaultWeights(scorePlugin1), + plugins: buildScoreConfigDefaultWeights(scorePlugin1), pluginConfigs: []config.PluginConfig{ { Name: scorePlugin1, @@ -419,7 +469,7 @@ func TestRunScorePlugins(t *testing.T) { }, { name: "ScoreWithNormalize plugin return score greater than MaxNodeScore", - plugins: buildConfigDefaultWeights(scoreWithNormalizePlugin1), + plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1), pluginConfigs: []config.PluginConfig{ { Name: scoreWithNormalizePlugin1, @@ -432,7 +482,7 @@ func TestRunScorePlugins(t *testing.T) { }, { name: "ScoreWithNormalize plugin return score less than MinNodeScore", - plugins: buildConfigDefaultWeights(scoreWithNormalizePlugin1), + plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1), pluginConfigs: []config.PluginConfig{ { Name: scoreWithNormalizePlugin1, @@ -457,7 +507,7 @@ func TestRunScorePlugins(t *testing.T) { if tt.err { if status.IsSuccess() { - t.Error("Expected status to be non-success.") + t.Errorf("Expected status to be non-success. got: %v", status.Code().String()) } return } @@ -510,11 +560,173 @@ func TestPreFilterPlugins(t *testing.T) { } -func buildConfigDefaultWeights(ps ...string) *config.Plugins { - return buildConfigWithWeights(defaultWeights, ps...) +func TestRecordingMetrics(t *testing.T) { + tests := []struct { + name string + action func(f Framework) + inject injectedResult + wantExtensionPoint string + wantStatus Code + }{ + { + name: "PreFilter - Success", + action: func(f Framework) { f.RunPreFilterPlugins(nil, pod) }, + wantExtensionPoint: "PreFilter", + wantStatus: Success, + }, + { + name: "Filter - Success", + action: func(f Framework) { f.RunFilterPlugins(nil, pod, nil) }, + wantExtensionPoint: "Filter", + wantStatus: Success, + }, + { + name: "PostFilter - Success", + action: func(f Framework) { f.RunPostFilterPlugins(nil, pod, nil, nil) }, + wantExtensionPoint: "PostFilter", + wantStatus: Success, + }, + { + name: "Score - Success", + action: func(f Framework) { f.RunScorePlugins(nil, pod, nodes) }, + wantExtensionPoint: "Score", + wantStatus: Success, + }, + { + name: "Reserve - Success", + action: func(f Framework) { f.RunReservePlugins(nil, pod, "") }, + wantExtensionPoint: "Reserve", + wantStatus: Success, + }, + { + name: "Unreserve - Success", + action: func(f Framework) { f.RunUnreservePlugins(nil, pod, "") }, + wantExtensionPoint: "Unreserve", + wantStatus: Success, + }, + { + name: "PreBind - Success", + action: func(f Framework) { f.RunPreBindPlugins(nil, pod, "") }, + wantExtensionPoint: "PreBind", + wantStatus: Success, + }, + { + name: "Bind - Success", + action: func(f Framework) { f.RunBindPlugins(nil, pod, "") }, + wantExtensionPoint: "Bind", + wantStatus: Success, + }, + { + name: "PostBind - Success", + action: func(f Framework) { f.RunPostBindPlugins(nil, pod, "") }, + wantExtensionPoint: "PostBind", + wantStatus: Success, + }, + { + name: "Permit - Success", + action: func(f Framework) { f.RunPermitPlugins(nil, pod, "") }, + wantExtensionPoint: "Permit", + wantStatus: Success, + }, + + { + name: "PreFilter - Error", + action: func(f Framework) { f.RunPreFilterPlugins(nil, pod) }, + inject: injectedResult{PreFilterStatus: int(Error)}, + wantExtensionPoint: "PreFilter", + wantStatus: Error, + }, + { + name: "Filter - Error", + action: func(f Framework) { f.RunFilterPlugins(nil, pod, nil) }, + inject: injectedResult{FilterStatus: int(Error)}, + wantExtensionPoint: "Filter", + wantStatus: Error, + }, + { + name: "PostFilter - Error", + action: func(f Framework) { f.RunPostFilterPlugins(nil, pod, nil, nil) }, + inject: injectedResult{PostFilterStatus: int(Error)}, + wantExtensionPoint: "PostFilter", + wantStatus: Error, + }, + { + name: "Score - Error", + action: func(f Framework) { f.RunScorePlugins(nil, pod, nodes) }, + inject: injectedResult{ScoreStatus: int(Error)}, + wantExtensionPoint: "Score", + wantStatus: Error, + }, + { + name: "Reserve - Error", + action: func(f Framework) { f.RunReservePlugins(nil, pod, "") }, + inject: injectedResult{ReserveStatus: int(Error)}, + wantExtensionPoint: "Reserve", + wantStatus: Error, + }, + { + name: "PreBind - Error", + action: func(f Framework) { f.RunPreBindPlugins(nil, pod, "") }, + inject: injectedResult{PreBindStatus: int(Error)}, + wantExtensionPoint: "PreBind", + wantStatus: Error, + }, + { + name: "Bind - Error", + action: func(f Framework) { f.RunBindPlugins(nil, pod, "") }, + inject: injectedResult{BindStatus: int(Error)}, + wantExtensionPoint: "Bind", + wantStatus: Error, + }, + { + name: "Permit - Error", + action: func(f Framework) { f.RunPermitPlugins(nil, pod, "") }, + inject: injectedResult{PermitStatus: int(Error)}, + wantExtensionPoint: "Permit", + wantStatus: Error, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &TestPlugin{name: testPlugin, inj: tt.inject} + r := make(Registry) + r.Register(testPlugin, + func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) { + return plugin, nil + }) + pluginSet := &config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 1}}} + plugins := &config.Plugins{ + Score: pluginSet, + PreFilter: pluginSet, + Filter: pluginSet, + PostFilter: pluginSet, + Reserve: pluginSet, + Permit: pluginSet, + PreBind: pluginSet, + Bind: pluginSet, + PostBind: pluginSet, + Unreserve: pluginSet, + } + f, err := NewFramework(r, plugins, emptyArgs) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + metrics.Register() + metrics.FrameworkExtensionPointDuration.Reset() + + tt.action(f) + + collectAndCompare(t, tt.wantExtensionPoint, tt.wantStatus) + }) + } } -func buildConfigWithWeights(weights map[string]int32, ps ...string) *config.Plugins { +func buildScoreConfigDefaultWeights(ps ...string) *config.Plugins { + return buildScoreConfigWithWeights(defaultWeights, ps...) +} + +func buildScoreConfigWithWeights(weights map[string]int32, ps ...string) *config.Plugins { var plugins []config.Plugin for _, p := range ps { plugins = append(plugins, config.Plugin{Name: p, Weight: weights[p]}) @@ -523,25 +735,60 @@ func buildConfigWithWeights(weights map[string]int32, ps ...string) *config.Plug } type injectedResult struct { - ScoreRes int64 `json:"scoreRes,omitempty"` - NormalizeRes int64 `json:"normalizeRes,omitempty"` - ScoreErr bool `json:"scoreErr,omitempty"` - NormalizeErr bool `json:"normalizeErr,omitempty"` + ScoreRes int64 `json:"scoreRes,omitempty"` + NormalizeRes int64 `json:"normalizeRes,omitempty"` + ScoreStatus int `json:"scoreStatus,omitempty"` + NormalizeStatus int `json:"normalizeStatus,omitempty"` + PreFilterStatus int `json:"preFilterStatus,omitempty"` + FilterStatus int `json:"filterStatus,omitempty"` + PostFilterStatus int `json:"postFilterStatus,omitempty"` + ReserveStatus int `json:"reserveStatus,omitempty"` + PreBindStatus int `json:"preBindStatus,omitempty"` + BindStatus int `json:"bindStatus,omitempty"` + PermitStatus int `json:"permitStatus,omitempty"` } func setScoreRes(inj injectedResult) (int64, *Status) { - if inj.ScoreErr { - return 0, NewStatus(Error, "injecting failure.") + if Code(inj.ScoreStatus) != Success { + return 0, NewStatus(Code(inj.ScoreStatus), "injecting failure.") } return inj.ScoreRes, nil } func injectNormalizeRes(inj injectedResult, scores NodeScoreList) *Status { - if inj.NormalizeErr { - return NewStatus(Error, "injecting failure.") + if Code(inj.NormalizeStatus) != Success { + return NewStatus(Code(inj.NormalizeStatus), "injecting failure.") } for i := range scores { scores[i].Score = inj.NormalizeRes } return nil } + +func collectAndCompare(t *testing.T, wantExtensionPoint string, wantStatus Code) { + ch := make(chan prometheus.Metric, 1) + m := &dto.Metric{} + metrics.FrameworkExtensionPointDuration.Collect(ch) + got := <-ch + got.Write(m) + + if len(m.Label) != 2 { + t.Fatalf("Unexpected number of label pairs, got: %v, want: 2", len(m.Label)) + } + + if *m.Label[0].Value != wantExtensionPoint { + t.Errorf("Unexpected extension point label, got: %q, want %q", *m.Label[0].Value, wantExtensionPoint) + } + + if *m.Label[1].Value != wantStatus.String() { + t.Errorf("Unexpected status code label, got: %q, want %q", *m.Label[1].Value, wantStatus) + } + + if *m.Histogram.SampleCount != 1 { + t.Errorf("Expect 1 sample, got: %v", m.Histogram.SampleCount) + } + + if *m.Histogram.SampleSum <= 0 { + t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum) + } +} diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 13e690769e1..a5396c8dc53 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -31,9 +31,6 @@ import ( schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) -// Code is the Status code/type which is returned from plugins. -type Code int - // NodeScoreList declares a list of nodes and their scores. type NodeScoreList []NodeScore @@ -49,6 +46,9 @@ type PluginToNodeScores map[string]NodeScoreList // NodeToStatusMap declares map from node name to its status. type NodeToStatusMap map[string]*Status +// Code is the Status code/type which is returned from plugins. +type Code int + // These are predefined codes used in a Status. const ( // Success means that plugin ran correctly and found pod schedulable. @@ -72,6 +72,13 @@ const ( Skip ) +// This list should be exactly the same as the codes iota defined above in the same order. +var codes = []string{"Success", "Error", "Unschedulable", "UnschedulableAndUnresolvable", "Wait", "Skip"} + +func (c Code) String() string { + return codes[c] +} + const ( // MaxNodeScore is the maximum score a Score plugin is expected to return. MaxNodeScore int64 = 100 diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index f8a0ed39989..f3f13c9f9bf 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -238,6 +238,16 @@ var ( StabilityLevel: metrics.ALPHA, }) + FrameworkExtensionPointDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "framework_extension_point_duration_seconds", + Help: "Latency for running all plugins of a specific extension point.", + Buckets: nil, + StabilityLevel: metrics.ALPHA, + }, + []string{"extension_point", "status"}) + metricsList = []metrics.Registerable{ scheduleAttempts, SchedulingLatency, @@ -259,6 +269,7 @@ var ( pendingPods, PodSchedulingDuration, PodSchedulingAttempts, + FrameworkExtensionPointDuration, } )