Record overall Filter latency for all nodes in a scheduling cycle.

This commit is contained in:
Cong Liu 2020-01-22 10:51:39 -05:00
parent c30f4489b4
commit e0aeb4d6a3
3 changed files with 14 additions and 60 deletions

View File

@ -490,6 +490,15 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *
}
}
beginCheckNode := time.Now()
statusCode := framework.Success
defer func() {
// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
metrics.FrameworkExtensionPointDuration.WithLabelValues(framework.Filter, statusCode.String()).Observe(metrics.SinceInSeconds(beginCheckNode))
}()
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)
@ -498,6 +507,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *
filtered = filtered[:filteredLen]
if err := errCh.ReceiveError(); err != nil {
statusCode = framework.Error
return nil, err
}
return filtered, nil

View File

@ -22,7 +22,7 @@ import (
"reflect"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
@ -39,12 +39,13 @@ import (
)
const (
// Filter is the name of the filter extension point.
Filter = "Filter"
// Specifies the maximum timeout a permit plugin can return.
maxTimeout time.Duration = 15 * time.Minute
preFilter = "PreFilter"
preFilterExtensionAddPod = "PreFilterExtensionAddPod"
preFilterExtensionRemovePod = "PreFilterExtensionRemovePod"
filter = "Filter"
postFilter = "PostFilter"
score = "Score"
scoreExtensionNormalize = "ScoreExtensionNormalize"
@ -351,10 +352,6 @@ func (f *framework) RunPreFilterExtensionAddPod(
podToAdd *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo,
) (status *Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilterExtensionAddPod, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
@ -391,10 +388,6 @@ func (f *framework) RunPreFilterExtensionRemovePod(
podToRemove *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo,
) (status *Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilterExtensionRemovePod, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
@ -432,10 +425,6 @@ func (f *framework) RunFilterPlugins(
nodeInfo *schedulernodeinfo.NodeInfo,
) PluginToStatus {
var firstFailedStatus *Status
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(filter, firstFailedStatus.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
statuses := make(PluginToStatus)
for _, pl := range f.filterPlugins {
pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
@ -466,7 +455,7 @@ func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state
}
startTime := time.Now()
status := pl.Filter(ctx, state, pod, nodeInfo)
f.metricsRecorder.observePluginDurationAsync(filter, pl.Name(), status, metrics.SinceInSeconds(startTime))
f.metricsRecorder.observePluginDurationAsync(Filter, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
}

View File

@ -865,30 +865,6 @@ func TestRecordingMetrics(t *testing.T) {
wantExtensionPoint: "PreFilter",
wantStatus: Success,
},
{
name: "PreFilterAddPod - Success",
action: func(f Framework) { f.RunPreFilterExtensionAddPod(context.Background(), state, pod, nil, nil) },
wantExtensionPoint: "PreFilterExtensionAddPod",
wantStatus: Success,
},
{
name: "PreFilterRemovePod - Success",
action: func(f Framework) { f.RunPreFilterExtensionRemovePod(context.Background(), state, pod, nil, nil) },
wantExtensionPoint: "PreFilterExtensionRemovePod",
wantStatus: Success,
},
{
name: "PreFilterRemovePod - Success",
action: func(f Framework) { f.RunPreFilterPlugins(context.Background(), state, pod) },
wantExtensionPoint: "PreFilter",
wantStatus: Success,
},
{
name: "Filter - Success",
action: func(f Framework) { f.RunFilterPlugins(context.Background(), state, pod, nil) },
wantExtensionPoint: "Filter",
wantStatus: Success,
},
{
name: "PostFilter - Success",
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), state, pod, nil, nil) },
@ -945,27 +921,6 @@ func TestRecordingMetrics(t *testing.T) {
wantExtensionPoint: "PreFilter",
wantStatus: Error,
},
{
name: "PreFilterAddPod - Error",
action: func(f Framework) { f.RunPreFilterExtensionAddPod(context.Background(), state, pod, nil, nil) },
inject: injectedResult{PreFilterAddPodStatus: int(Error)},
wantExtensionPoint: "PreFilterExtensionAddPod",
wantStatus: Error,
},
{
name: "PreFilterRemovePod - Error",
action: func(f Framework) { f.RunPreFilterExtensionRemovePod(context.Background(), state, pod, nil, nil) },
inject: injectedResult{PreFilterRemovePodStatus: int(Error)},
wantExtensionPoint: "PreFilterExtensionRemovePod",
wantStatus: Error,
},
{
name: "Filter - Error",
action: func(f Framework) { f.RunFilterPlugins(context.Background(), state, pod, nil) },
inject: injectedResult{FilterStatus: int(Error)},
wantExtensionPoint: "Filter",
wantStatus: Error,
},
{
name: "PostFilter - Error",
action: func(f Framework) { f.RunPostFilterPlugins(context.Background(), state, pod, nil, nil) },