Merge pull request #83569 from liu-cong/framework-metrics

Add metrics for scheduler framework.
This commit is contained in:
Kubernetes Prow Robot 2019-10-15 23:05:28 -07:00 committed by GitHub
commit 1552ba6b00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 361 additions and 47 deletions

View File

@ -13,6 +13,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -54,9 +55,12 @@ go_test(
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/prometheus/client_model/go:go_default_library",
],
)

View File

@ -22,7 +22,7 @@ import (
"reflect"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
@ -31,13 +31,26 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
const (
// Specifies the maximum timeout a permit plugin can return.
maxTimeout time.Duration = 15 * time.Minute
maxTimeout time.Duration = 15 * time.Minute
preFilter = "PreFilter"
preFilterExtensionAddPod = "PreFilterExtensionAddPod"
preFilterExtensionRemovePod = "PreFilterExtensionRemovePod"
filter = "Filter"
postFilter = "PostFilter"
score = "Score"
preBind = "PreBind"
bind = "Bind"
postBind = "PostBind"
reserve = "Reserve"
unreserve = "Unreserve"
permit = "Permit"
)
// framework is the component responsible for initializing and running scheduler
@ -233,7 +246,9 @@ func (f *framework) QueueSortFunc() LessFunc {
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
func (f *framework) RunPreFilterPlugins(
state *CycleState, pod *v1.Pod) *Status {
state *CycleState, pod *v1.Pod) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, preFilter, status) }()
for _, pl := range f.preFilterPlugins {
status := pl.PreFilter(state, pod)
if !status.IsSuccess() {
@ -255,7 +270,9 @@ func (f *framework) RunPreFilterPlugins(
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterExtensionAddPod(state *CycleState, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, preFilterExtensionAddPod, status) }()
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
@ -275,7 +292,9 @@ func (f *framework) RunPreFilterExtensionAddPod(state *CycleState, podToSchedule
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterExtensionRemovePod(state *CycleState, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, preFilterExtensionRemovePod, status) }()
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
@ -296,7 +315,9 @@ func (f *framework) RunPreFilterExtensionRemovePod(state *CycleState, podToSched
// given node is not suitable for running pod.
// Meanwhile, the failure message and status are set for the given node.
func (f *framework) RunFilterPlugins(state *CycleState,
pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, filter, status) }()
for _, pl := range f.filterPlugins {
status := pl.Filter(state, pod, nodeInfo)
if !status.IsSuccess() {
@ -321,7 +342,9 @@ func (f *framework) RunPostFilterPlugins(
pod *v1.Pod,
nodes []*v1.Node,
filteredNodesStatuses NodeToStatusMap,
) *Status {
) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, postFilter, status) }()
for _, pl := range f.postFilterPlugins {
status := pl.PostFilter(state, pod, nodes, filteredNodesStatuses)
if !status.IsSuccess() {
@ -338,7 +361,9 @@ func (f *framework) RunPostFilterPlugins(
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// It also returns *Status, which is set to non-success if any of the plugins returns
// a non-success status.
func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node) (PluginToNodeScores, *Status) {
func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, score, status) }()
pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins))
for _, pl := range f.scorePlugins {
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
@ -374,7 +399,8 @@ func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.
if pl.ScoreExtensions() == nil {
return
}
if status := pl.ScoreExtensions().NormalizeScore(state, pod, nodeScoreList); !status.IsSuccess() {
status := pl.ScoreExtensions().NormalizeScore(state, pod, nodeScoreList)
if !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
errCh.SendErrorWithCancel(err, cancel)
return
@ -416,7 +442,9 @@ func (f *framework) RunScorePlugins(state *CycleState, pod *v1.Pod, nodes []*v1.
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
func (f *framework) RunPreBindPlugins(
state *CycleState, pod *v1.Pod, nodeName string) *Status {
state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, preBind, status) }()
for _, pl := range f.preBindPlugins {
status := pl.PreBind(state, pod, nodeName)
if !status.IsSuccess() {
@ -429,11 +457,12 @@ func (f *framework) RunPreBindPlugins(
}
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) *Status {
func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, bind, status) }()
if len(f.bindPlugins) == 0 {
return NewStatus(Skip, "")
}
var status *Status
for _, bp := range f.bindPlugins {
status = bp.Bind(state, pod, nodeName)
if status != nil && status.Code() == Skip {
@ -452,6 +481,8 @@ func (f *framework) RunBindPlugins(state *CycleState, pod *v1.Pod, nodeName stri
// RunPostBindPlugins runs the set of configured postbind plugins.
func (f *framework) RunPostBindPlugins(
state *CycleState, pod *v1.Pod, nodeName string) {
startTime := time.Now()
defer recordExtensionPointDuration(startTime, postBind, nil)
for _, pl := range f.postBindPlugins {
pl.PostBind(state, pod, nodeName)
}
@ -461,7 +492,9 @@ func (f *framework) RunPostBindPlugins(
// plugins returns an error, it does not continue running the remaining ones and
// returns the error. In such case, pod will not be scheduled.
func (f *framework) RunReservePlugins(
state *CycleState, pod *v1.Pod, nodeName string) *Status {
state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, reserve, status) }()
for _, pl := range f.reservePlugins {
status := pl.Reserve(state, pod, nodeName)
if !status.IsSuccess() {
@ -476,6 +509,8 @@ func (f *framework) RunReservePlugins(
// RunUnreservePlugins runs the set of configured unreserve plugins.
func (f *framework) RunUnreservePlugins(
state *CycleState, pod *v1.Pod, nodeName string) {
startTime := time.Now()
defer recordExtensionPointDuration(startTime, unreserve, nil)
for _, pl := range f.unreservePlugins {
pl.Unreserve(state, pod, nodeName)
}
@ -489,7 +524,9 @@ func (f *framework) RunUnreservePlugins(
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
func (f *framework) RunPermitPlugins(
state *CycleState, pod *v1.Pod, nodeName string) *Status {
state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, permit, status) }()
timeout := maxTimeout
statusCode := Success
for _, pl := range f.permitPlugins {
@ -620,3 +657,11 @@ func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plu
}
return pgMap
}
func recordExtensionPointDuration(start time.Time, extensionPoint string, status *Status) {
statusCode := Success.String()
if status != nil {
statusCode = status.Code().String()
}
metrics.FrameworkExtensionPointDuration.WithLabelValues(extensionPoint, statusCode).Observe(metrics.SinceInSeconds(start))
}

View File

@ -18,11 +18,15 @@ package v1alpha1
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"reflect"
"strings"
"testing"
"time"
v1 "k8s.io/api/core/v1"
dto "github.com/prometheus/client_model/go"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -37,6 +41,7 @@ const (
preFilterPluginName = "prefilter-plugin"
preFilterWithExtensionsPluginName = "prefilter-with-extensions-plugin"
duplicatePluginName = "duplicate-plugin"
testPlugin = "test-plugin"
)
// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
@ -118,6 +123,51 @@ func (pl *PluginNotImplementingScore) Name() string {
return pluginNotImplementingScore
}
// TestPlugin implements all Plugin interfaces.
type TestPlugin struct {
name string
inj injectedResult
}
func (pl *TestPlugin) Name() string {
return pl.name
}
func (pl *TestPlugin) Score(state *CycleState, p *v1.Pod, nodeName string) (int64, *Status) {
return 0, NewStatus(Code(pl.inj.ScoreStatus), "injected status")
}
func (pl *TestPlugin) ScoreExtensions() ScoreExtensions {
return nil
}
func (pl *TestPlugin) PreFilter(state *CycleState, p *v1.Pod) *Status {
return NewStatus(Code(pl.inj.PreFilterStatus), "injected status")
}
func (pl *TestPlugin) PreFilterExtensions() PreFilterExtensions {
return nil
}
func (pl *TestPlugin) Filter(state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
return NewStatus(Code(pl.inj.FilterStatus), "injected status")
}
func (pl *TestPlugin) PostFilter(state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status {
return NewStatus(Code(pl.inj.PostFilterStatus), "injected status")
}
func (pl *TestPlugin) Reserve(state *CycleState, p *v1.Pod, nodeName string) *Status {
return NewStatus(Code(pl.inj.ReserveStatus), "injected status")
}
func (pl *TestPlugin) PreBind(state *CycleState, p *v1.Pod, nodeName string) *Status {
return NewStatus(Code(pl.inj.PreBindStatus), "injected status")
}
func (pl *TestPlugin) PostBind(state *CycleState, p *v1.Pod, nodeName string) {}
func (pl *TestPlugin) Unreserve(state *CycleState, p *v1.Pod, nodeName string) {}
func (pl *TestPlugin) Permit(state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration) {
return NewStatus(Code(pl.inj.PermitStatus), "injected status"), time.Duration(0)
}
func (pl *TestPlugin) Bind(state *CycleState, p *v1.Pod, nodeName string) *Status {
return NewStatus(Code(pl.inj.BindStatus), "injected status")
}
// TestPreFilterPlugin only implements PreFilterPlugin interface.
type TestPreFilterPlugin struct {
PreFilterCalled int
@ -224,12 +274,12 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) {
}{
{
name: "enabled Score plugin doesn't exist in registry",
plugins: buildConfigDefaultWeights("notExist"),
plugins: buildScoreConfigDefaultWeights("notExist"),
initErr: true,
},
{
name: "enabled Score plugin doesn't extend the ScorePlugin interface",
plugins: buildConfigDefaultWeights(pluginNotImplementingScore),
plugins: buildScoreConfigDefaultWeights(pluginNotImplementingScore),
initErr: true,
},
{
@ -238,15 +288,15 @@ func TestInitFrameworkWithScorePlugins(t *testing.T) {
},
{
name: "enabled Score plugin list is empty",
plugins: buildConfigDefaultWeights(),
plugins: buildScoreConfigDefaultWeights(),
},
{
name: "enabled plugin only implements ScorePlugin interface",
plugins: buildConfigDefaultWeights(scorePlugin1),
plugins: buildScoreConfigDefaultWeights(scorePlugin1),
},
{
name: "enabled plugin implements ScoreWithNormalizePlugin interface",
plugins: buildConfigDefaultWeights(scoreWithNormalizePlugin1),
plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1),
},
}
@ -297,12 +347,12 @@ func TestRunScorePlugins(t *testing.T) {
}{
{
name: "no Score plugins",
plugins: buildConfigDefaultWeights(),
plugins: buildScoreConfigDefaultWeights(),
want: PluginToNodeScores{},
},
{
name: "single Score plugin",
plugins: buildConfigDefaultWeights(scorePlugin1),
plugins: buildScoreConfigDefaultWeights(scorePlugin1),
pluginConfigs: []config.PluginConfig{
{
Name: scorePlugin1,
@ -319,7 +369,7 @@ func TestRunScorePlugins(t *testing.T) {
{
name: "single ScoreWithNormalize plugin",
//registry: registry,
plugins: buildConfigDefaultWeights(scoreWithNormalizePlugin1),
plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1),
pluginConfigs: []config.PluginConfig{
{
Name: scoreWithNormalizePlugin1,
@ -335,7 +385,7 @@ func TestRunScorePlugins(t *testing.T) {
},
{
name: "2 Score plugins, 2 NormalizeScore plugins",
plugins: buildConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1, scoreWithNormalizePlugin2),
plugins: buildScoreConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1, scoreWithNormalizePlugin2),
pluginConfigs: []config.PluginConfig{
{
Name: scorePlugin1,
@ -371,11 +421,11 @@ func TestRunScorePlugins(t *testing.T) {
{
Name: scoreWithNormalizePlugin1,
Args: runtime.Unknown{
Raw: []byte(`{ "scoreErr": true }`),
Raw: []byte(`{ "scoreStatus": 1 }`),
},
},
},
plugins: buildConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1),
plugins: buildScoreConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1),
err: true,
},
{
@ -384,16 +434,16 @@ func TestRunScorePlugins(t *testing.T) {
{
Name: scoreWithNormalizePlugin1,
Args: runtime.Unknown{
Raw: []byte(`{ "normalizeErr": true }`),
Raw: []byte(`{ "normalizeStatus": 1 }`),
},
},
},
plugins: buildConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1),
plugins: buildScoreConfigDefaultWeights(scorePlugin1, scoreWithNormalizePlugin1),
err: true,
},
{
name: "Score plugin return score greater than MaxNodeScore",
plugins: buildConfigDefaultWeights(scorePlugin1),
plugins: buildScoreConfigDefaultWeights(scorePlugin1),
pluginConfigs: []config.PluginConfig{
{
Name: scorePlugin1,
@ -406,7 +456,7 @@ func TestRunScorePlugins(t *testing.T) {
},
{
name: "Score plugin return score less than MinNodeScore",
plugins: buildConfigDefaultWeights(scorePlugin1),
plugins: buildScoreConfigDefaultWeights(scorePlugin1),
pluginConfigs: []config.PluginConfig{
{
Name: scorePlugin1,
@ -419,7 +469,7 @@ func TestRunScorePlugins(t *testing.T) {
},
{
name: "ScoreWithNormalize plugin return score greater than MaxNodeScore",
plugins: buildConfigDefaultWeights(scoreWithNormalizePlugin1),
plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1),
pluginConfigs: []config.PluginConfig{
{
Name: scoreWithNormalizePlugin1,
@ -432,7 +482,7 @@ func TestRunScorePlugins(t *testing.T) {
},
{
name: "ScoreWithNormalize plugin return score less than MinNodeScore",
plugins: buildConfigDefaultWeights(scoreWithNormalizePlugin1),
plugins: buildScoreConfigDefaultWeights(scoreWithNormalizePlugin1),
pluginConfigs: []config.PluginConfig{
{
Name: scoreWithNormalizePlugin1,
@ -457,7 +507,7 @@ func TestRunScorePlugins(t *testing.T) {
if tt.err {
if status.IsSuccess() {
t.Error("Expected status to be non-success.")
t.Errorf("Expected status to be non-success. got: %v", status.Code().String())
}
return
}
@ -510,11 +560,173 @@ func TestPreFilterPlugins(t *testing.T) {
}
func buildConfigDefaultWeights(ps ...string) *config.Plugins {
return buildConfigWithWeights(defaultWeights, ps...)
func TestRecordingMetrics(t *testing.T) {
tests := []struct {
name string
action func(f Framework)
inject injectedResult
wantExtensionPoint string
wantStatus Code
}{
{
name: "PreFilter - Success",
action: func(f Framework) { f.RunPreFilterPlugins(nil, pod) },
wantExtensionPoint: "PreFilter",
wantStatus: Success,
},
{
name: "Filter - Success",
action: func(f Framework) { f.RunFilterPlugins(nil, pod, nil) },
wantExtensionPoint: "Filter",
wantStatus: Success,
},
{
name: "PostFilter - Success",
action: func(f Framework) { f.RunPostFilterPlugins(nil, pod, nil, nil) },
wantExtensionPoint: "PostFilter",
wantStatus: Success,
},
{
name: "Score - Success",
action: func(f Framework) { f.RunScorePlugins(nil, pod, nodes) },
wantExtensionPoint: "Score",
wantStatus: Success,
},
{
name: "Reserve - Success",
action: func(f Framework) { f.RunReservePlugins(nil, pod, "") },
wantExtensionPoint: "Reserve",
wantStatus: Success,
},
{
name: "Unreserve - Success",
action: func(f Framework) { f.RunUnreservePlugins(nil, pod, "") },
wantExtensionPoint: "Unreserve",
wantStatus: Success,
},
{
name: "PreBind - Success",
action: func(f Framework) { f.RunPreBindPlugins(nil, pod, "") },
wantExtensionPoint: "PreBind",
wantStatus: Success,
},
{
name: "Bind - Success",
action: func(f Framework) { f.RunBindPlugins(nil, pod, "") },
wantExtensionPoint: "Bind",
wantStatus: Success,
},
{
name: "PostBind - Success",
action: func(f Framework) { f.RunPostBindPlugins(nil, pod, "") },
wantExtensionPoint: "PostBind",
wantStatus: Success,
},
{
name: "Permit - Success",
action: func(f Framework) { f.RunPermitPlugins(nil, pod, "") },
wantExtensionPoint: "Permit",
wantStatus: Success,
},
{
name: "PreFilter - Error",
action: func(f Framework) { f.RunPreFilterPlugins(nil, pod) },
inject: injectedResult{PreFilterStatus: int(Error)},
wantExtensionPoint: "PreFilter",
wantStatus: Error,
},
{
name: "Filter - Error",
action: func(f Framework) { f.RunFilterPlugins(nil, pod, nil) },
inject: injectedResult{FilterStatus: int(Error)},
wantExtensionPoint: "Filter",
wantStatus: Error,
},
{
name: "PostFilter - Error",
action: func(f Framework) { f.RunPostFilterPlugins(nil, pod, nil, nil) },
inject: injectedResult{PostFilterStatus: int(Error)},
wantExtensionPoint: "PostFilter",
wantStatus: Error,
},
{
name: "Score - Error",
action: func(f Framework) { f.RunScorePlugins(nil, pod, nodes) },
inject: injectedResult{ScoreStatus: int(Error)},
wantExtensionPoint: "Score",
wantStatus: Error,
},
{
name: "Reserve - Error",
action: func(f Framework) { f.RunReservePlugins(nil, pod, "") },
inject: injectedResult{ReserveStatus: int(Error)},
wantExtensionPoint: "Reserve",
wantStatus: Error,
},
{
name: "PreBind - Error",
action: func(f Framework) { f.RunPreBindPlugins(nil, pod, "") },
inject: injectedResult{PreBindStatus: int(Error)},
wantExtensionPoint: "PreBind",
wantStatus: Error,
},
{
name: "Bind - Error",
action: func(f Framework) { f.RunBindPlugins(nil, pod, "") },
inject: injectedResult{BindStatus: int(Error)},
wantExtensionPoint: "Bind",
wantStatus: Error,
},
{
name: "Permit - Error",
action: func(f Framework) { f.RunPermitPlugins(nil, pod, "") },
inject: injectedResult{PermitStatus: int(Error)},
wantExtensionPoint: "Permit",
wantStatus: Error,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugin := &TestPlugin{name: testPlugin, inj: tt.inject}
r := make(Registry)
r.Register(testPlugin,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return plugin, nil
})
pluginSet := &config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 1}}}
plugins := &config.Plugins{
Score: pluginSet,
PreFilter: pluginSet,
Filter: pluginSet,
PostFilter: pluginSet,
Reserve: pluginSet,
Permit: pluginSet,
PreBind: pluginSet,
Bind: pluginSet,
PostBind: pluginSet,
Unreserve: pluginSet,
}
f, err := NewFramework(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
metrics.Register()
metrics.FrameworkExtensionPointDuration.Reset()
tt.action(f)
collectAndCompare(t, tt.wantExtensionPoint, tt.wantStatus)
})
}
}
func buildConfigWithWeights(weights map[string]int32, ps ...string) *config.Plugins {
func buildScoreConfigDefaultWeights(ps ...string) *config.Plugins {
return buildScoreConfigWithWeights(defaultWeights, ps...)
}
func buildScoreConfigWithWeights(weights map[string]int32, ps ...string) *config.Plugins {
var plugins []config.Plugin
for _, p := range ps {
plugins = append(plugins, config.Plugin{Name: p, Weight: weights[p]})
@ -523,25 +735,60 @@ func buildConfigWithWeights(weights map[string]int32, ps ...string) *config.Plug
}
type injectedResult struct {
ScoreRes int64 `json:"scoreRes,omitempty"`
NormalizeRes int64 `json:"normalizeRes,omitempty"`
ScoreErr bool `json:"scoreErr,omitempty"`
NormalizeErr bool `json:"normalizeErr,omitempty"`
ScoreRes int64 `json:"scoreRes,omitempty"`
NormalizeRes int64 `json:"normalizeRes,omitempty"`
ScoreStatus int `json:"scoreStatus,omitempty"`
NormalizeStatus int `json:"normalizeStatus,omitempty"`
PreFilterStatus int `json:"preFilterStatus,omitempty"`
FilterStatus int `json:"filterStatus,omitempty"`
PostFilterStatus int `json:"postFilterStatus,omitempty"`
ReserveStatus int `json:"reserveStatus,omitempty"`
PreBindStatus int `json:"preBindStatus,omitempty"`
BindStatus int `json:"bindStatus,omitempty"`
PermitStatus int `json:"permitStatus,omitempty"`
}
func setScoreRes(inj injectedResult) (int64, *Status) {
if inj.ScoreErr {
return 0, NewStatus(Error, "injecting failure.")
if Code(inj.ScoreStatus) != Success {
return 0, NewStatus(Code(inj.ScoreStatus), "injecting failure.")
}
return inj.ScoreRes, nil
}
func injectNormalizeRes(inj injectedResult, scores NodeScoreList) *Status {
if inj.NormalizeErr {
return NewStatus(Error, "injecting failure.")
if Code(inj.NormalizeStatus) != Success {
return NewStatus(Code(inj.NormalizeStatus), "injecting failure.")
}
for i := range scores {
scores[i].Score = inj.NormalizeRes
}
return nil
}
func collectAndCompare(t *testing.T, wantExtensionPoint string, wantStatus Code) {
ch := make(chan prometheus.Metric, 1)
m := &dto.Metric{}
metrics.FrameworkExtensionPointDuration.Collect(ch)
got := <-ch
got.Write(m)
if len(m.Label) != 2 {
t.Fatalf("Unexpected number of label pairs, got: %v, want: 2", len(m.Label))
}
if *m.Label[0].Value != wantExtensionPoint {
t.Errorf("Unexpected extension point label, got: %q, want %q", *m.Label[0].Value, wantExtensionPoint)
}
if *m.Label[1].Value != wantStatus.String() {
t.Errorf("Unexpected status code label, got: %q, want %q", *m.Label[1].Value, wantStatus)
}
if *m.Histogram.SampleCount != 1 {
t.Errorf("Expect 1 sample, got: %v", m.Histogram.SampleCount)
}
if *m.Histogram.SampleSum <= 0 {
t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum)
}
}

View File

@ -31,9 +31,6 @@ import (
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// Code is the Status code/type which is returned from plugins.
type Code int
// NodeScoreList declares a list of nodes and their scores.
type NodeScoreList []NodeScore
@ -49,6 +46,9 @@ type PluginToNodeScores map[string]NodeScoreList
// NodeToStatusMap declares map from node name to its status.
type NodeToStatusMap map[string]*Status
// Code is the Status code/type which is returned from plugins.
type Code int
// These are predefined codes used in a Status.
const (
// Success means that plugin ran correctly and found pod schedulable.
@ -72,6 +72,13 @@ const (
Skip
)
// This list should be exactly the same as the codes iota defined above in the same order.
var codes = []string{"Success", "Error", "Unschedulable", "UnschedulableAndUnresolvable", "Wait", "Skip"}
func (c Code) String() string {
return codes[c]
}
const (
// MaxNodeScore is the maximum score a Score plugin is expected to return.
MaxNodeScore int64 = 100

View File

@ -238,6 +238,16 @@ var (
StabilityLevel: metrics.ALPHA,
})
FrameworkExtensionPointDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: SchedulerSubsystem,
Name: "framework_extension_point_duration_seconds",
Help: "Latency for running all plugins of a specific extension point.",
Buckets: nil,
StabilityLevel: metrics.ALPHA,
},
[]string{"extension_point", "status"})
metricsList = []metrics.Registerable{
scheduleAttempts,
SchedulingLatency,
@ -259,6 +269,7 @@ var (
pendingPods,
PodSchedulingDuration,
PodSchedulingAttempts,
FrameworkExtensionPointDuration,
}
)