Merge pull request #86545 from xiaoanyunfei/cleanup/enhance_scheduler_metrics

Cleanup/Enhance scheduler metrics
This commit is contained in:
Kubernetes Prow Robot 2020-01-09 13:29:22 -08:00 committed by GitHub
commit f634ee6fe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 113 deletions

View File

@ -44,8 +44,8 @@ type StateKey string
type CycleState struct {
mx sync.RWMutex
storage map[StateKey]StateData
// if recordFrameworkMetrics is true, framework metrics will be recorded for this cycle.
recordFrameworkMetrics bool
// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
recordPluginMetrics bool
}
// NewCycleState initializes a new CycleState and returns its pointer.
@ -55,20 +55,20 @@ func NewCycleState() *CycleState {
}
}
// ShouldRecordFrameworkMetrics returns whether framework metrics should be recorded.
func (c *CycleState) ShouldRecordFrameworkMetrics() bool {
// ShouldRecordPluginMetrics returns whether PluginExecutionDuration metrics should be recorded.
func (c *CycleState) ShouldRecordPluginMetrics() bool {
if c == nil {
return false
}
return c.recordFrameworkMetrics
return c.recordPluginMetrics
}
// SetRecordFrameworkMetrics sets recordFrameworkMetrics to the given value.
func (c *CycleState) SetRecordFrameworkMetrics(flag bool) {
// SetRecordPluginMetrics sets recordPluginMetrics to the given value.
func (c *CycleState) SetRecordPluginMetrics(flag bool) {
if c == nil {
return
}
c.recordFrameworkMetrics = flag
c.recordPluginMetrics = flag
}
// Clone creates a copy of CycleState and returns its pointer. Clone returns

View File

@ -300,12 +300,10 @@ func (f *framework) QueueSortFunc() LessFunc {
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(preFilter, status, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.preFilterPlugins {
status = f.runPreFilterPlugin(ctx, pl, state, pod)
if !status.IsSuccess() {
@ -324,7 +322,7 @@ func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState,
}
func (f *framework) runPreFilterPlugin(ctx context.Context, pl PreFilterPlugin, state *CycleState, pod *v1.Pod) *Status {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.PreFilter(ctx, state, pod)
}
startTime := time.Now()
@ -343,12 +341,10 @@ func (f *framework) RunPreFilterExtensionAddPod(
podToAdd *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo,
) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionAddPod, status, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilterExtensionAddPod, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
@ -366,7 +362,7 @@ func (f *framework) RunPreFilterExtensionAddPod(
}
func (f *framework) runPreFilterExtensionAddPod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
}
startTime := time.Now()
@ -385,12 +381,10 @@ func (f *framework) RunPreFilterExtensionRemovePod(
podToRemove *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo,
) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(preFilterExtensionRemovePod, status, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilterExtensionRemovePod, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.preFilterPlugins {
if pl.PreFilterExtensions() == nil {
continue
@ -408,7 +402,7 @@ func (f *framework) RunPreFilterExtensionRemovePod(
}
func (f *framework) runPreFilterExtensionRemovePod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
}
startTime := time.Now()
@ -428,12 +422,10 @@ func (f *framework) RunFilterPlugins(
nodeInfo *schedulernodeinfo.NodeInfo,
) PluginToStatus {
var firstFailedStatus *Status
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(filter, firstFailedStatus, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(filter, firstFailedStatus.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
statuses := make(PluginToStatus)
for _, pl := range f.filterPlugins {
pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
@ -459,7 +451,7 @@ func (f *framework) RunFilterPlugins(
}
func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.Filter(ctx, state, pod, nodeInfo)
}
startTime := time.Now()
@ -478,12 +470,10 @@ func (f *framework) RunPostFilterPlugins(
nodes []*v1.Node,
filteredNodesStatuses NodeToStatusMap,
) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(postFilter, status, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(postFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.postFilterPlugins {
status = f.runPostFilterPlugin(ctx, pl, state, pod, nodes, filteredNodesStatuses)
if !status.IsSuccess() {
@ -497,7 +487,7 @@ func (f *framework) RunPostFilterPlugins(
}
func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.PostFilter(ctx, state, pod, nodes, filteredNodesStatuses)
}
startTime := time.Now()
@ -511,12 +501,10 @@ func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin
// It also returns *Status, which is set to non-success if any of the plugins returns
// a non-success status.
func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(score, status, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins))
for _, pl := range f.scorePlugins {
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
@ -592,7 +580,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
}
func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.Score(ctx, state, pod, nodeName)
}
startTime := time.Now()
@ -602,7 +590,7 @@ func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *C
}
func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeScoreList NodeScoreList) *Status {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
}
startTime := time.Now()
@ -615,12 +603,10 @@ func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(preBind, status, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preBind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.preBindPlugins {
status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
@ -633,7 +619,7 @@ func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, po
}
func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.PreBind(ctx, state, pod, nodeName)
}
startTime := time.Now()
@ -644,12 +630,10 @@ func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, stat
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(bind, status, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(bind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
if len(f.bindPlugins) == 0 {
return NewStatus(Skip, "")
}
@ -669,7 +653,7 @@ func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *
}
func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return bp.Bind(ctx, state, pod, nodeName)
}
startTime := time.Now()
@ -680,19 +664,17 @@ func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *Cyc
// RunPostBindPlugins runs the set of configured postbind plugins.
func (f *framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(postBind, nil, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(postBind, Success.String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.postBindPlugins {
f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
}
}
func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
pl.PostBind(ctx, state, pod, nodeName)
return
}
@ -705,12 +687,10 @@ func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, st
// plugins returns an error, it does not continue running the remaining ones and
// returns the error. In such case, pod will not be scheduled.
func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(reserve, status, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.reservePlugins {
status = f.runReservePlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
@ -723,7 +703,7 @@ func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, po
}
func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.Reserve(ctx, state, pod, nodeName)
}
startTime := time.Now()
@ -734,19 +714,17 @@ func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, stat
// RunUnreservePlugins runs the set of configured unreserve plugins.
func (f *framework) RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(unreserve, nil, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, Success.String()).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.unreservePlugins {
f.runUnreservePlugin(ctx, pl, state, pod, nodeName)
}
}
func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, state *CycleState, pod *v1.Pod, nodeName string) {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
pl.Unreserve(ctx, state, pod, nodeName)
return
}
@ -763,12 +741,10 @@ func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin,
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
if state.ShouldRecordFrameworkMetrics() {
startTime := time.Now()
defer func() {
f.metricsRecorder.observeExtensionPointDurationAsync(permit, status, metrics.SinceInSeconds(startTime))
}()
}
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
}()
pluginsWaitTime := make(map[string]time.Duration)
statusCode := Success
for _, pl := range f.permitPlugins {
@ -820,7 +796,7 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod
}
func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state *CycleState, pod *v1.Pod, nodeName string) (*Status, time.Duration) {
if !state.ShouldRecordFrameworkMetrics() {
if !state.ShouldRecordPluginMetrics() {
return pl.Permit(ctx, state, pod, nodeName)
}
startTime := time.Now()

View File

@ -593,7 +593,6 @@ func TestPreFilterPlugins(t *testing.T) {
t.Errorf("AddPod called %v, expected: 1", preFilter2.RemoveCalled)
}
})
}
func TestFilterPlugins(t *testing.T) {
@ -817,7 +816,9 @@ func TestFilterPlugins(t *testing.T) {
}
func TestRecordingMetrics(t *testing.T) {
state := &CycleState{recordFrameworkMetrics: true}
state := &CycleState{
recordPluginMetrics: true,
}
tests := []struct {
name string
action func(f Framework)

View File

@ -60,21 +60,7 @@ func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder
return recorder
}
// observeExtensionPointDurationAsync observes the framework_extension_point_duration_seconds metric.
// The metric will be flushed to Prometheus asynchronously.
func (r *metricsRecorder) observeExtensionPointDurationAsync(extensionPoint string, status *Status, value float64) {
newMetric := &frameworkMetric{
metric: metrics.FrameworkExtensionPointDuration,
labelValues: []string{extensionPoint, status.Code().String()},
value: value,
}
select {
case r.bufferCh <- newMetric:
default:
}
}
// observeExtensionPointDurationAsync observes the plugin_execution_duration_seconds metric.
// observePluginDurationAsync observes the plugin_execution_duration_seconds metric.
// The metric will be flushed to Prometheus asynchronously.
func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *Status, value float64) {
newMetric := &frameworkMetric{

View File

@ -54,8 +54,8 @@ const (
BindTimeoutSeconds = 100
// SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
SchedulerError = "SchedulerError"
// Percentage of framework metrics to be sampled.
frameworkMetricsSamplePercent = 10
// Percentage of plugin metrics to be sampled.
pluginMetricsSamplePercent = 10
)
// podConditionUpdater updates the condition of a pod based on the passed
@ -566,7 +566,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
state.SetRecordFrameworkMetrics(rand.Intn(100) < frameworkMetricsSamplePercent)
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)