Merge pull request #116201 from sanposhiho/metric-scheduling-gate

add(scheduler): implement "plugin_execution_duration_seconds" metric in PreEnqueue
This commit is contained in:
Kubernetes Prow Robot 2023-03-12 13:52:40 -07:00 committed by GitHub
commit a32050e6cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 119 additions and 14 deletions

View File

@ -215,6 +215,21 @@ func WithCaptureProfile(c CaptureProfile) Option {
}
}
// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl.
func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
return func(o *frameworkOptions) {
o.clusterEventMap = m
}
}
// WithMetricsRecorder sets metrics recorder for the scheduling frameworkImpl.
func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
return func(o *frameworkOptions) {
o.metricsRecorder = r
}
}
// defaultFrameworkOptions are applied when no option corresponding to those fields exist.
func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
return frameworkOptions{
metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh),
@ -223,13 +238,6 @@ func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
}
}
// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl.
func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
return func(o *frameworkOptions) {
o.clusterEventMap = m
}
}
var _ framework.Framework = &frameworkImpl{}
// NewFramework initializes plugins given the configuration and the registry.

View File

@ -29,6 +29,7 @@ package queue
import (
"context"
"fmt"
"math/rand"
"reflect"
"sync"
"time"
@ -182,6 +183,10 @@ type PriorityQueue struct {
closed bool
nsLister listersv1.NamespaceLister
metricsRecorder metrics.MetricAsyncRecorder
// pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled.
pluginMetricsSamplePercent int
}
type priorityQueueOptions struct {
@ -190,6 +195,8 @@ type priorityQueueOptions struct {
podMaxBackoffDuration time.Duration
podMaxInUnschedulablePodsDuration time.Duration
podLister listersv1.PodLister
metricsRecorder metrics.MetricAsyncRecorder
pluginMetricsSamplePercent int
clusterEventMap map[framework.ClusterEvent]sets.String
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
}
@ -246,6 +253,20 @@ func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option {
}
}
// WithMetricsRecorder sets metrics recorder.
func WithMetricsRecorder(recorder metrics.MetricAsyncRecorder) Option {
return func(o *priorityQueueOptions) {
o.metricsRecorder = recorder
}
}
// WithPluginMetricsSamplePercent sets the percentage of plugin metrics to be sampled.
func WithPluginMetricsSamplePercent(percent int) Option {
return func(o *priorityQueueOptions) {
o.pluginMetricsSamplePercent = percent
}
}
var defaultPriorityQueueOptions = priorityQueueOptions{
clock: clock.RealClock{},
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
@ -298,6 +319,8 @@ func NewPriorityQueue(
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
preEnqueuePluginMap: options.preEnqueuePluginMap,
metricsRecorder: options.metricsRecorder,
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
@ -325,8 +348,9 @@ func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framewo
metrics.FrameworkExtensionPointDuration.WithLabelValues(preEnqueue, s.Code().String(), pod.Spec.SchedulerName).Observe(metrics.SinceInSeconds(startTime))
}()
shouldRecordMetric := rand.Intn(100) < p.pluginMetricsSamplePercent
for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] {
s = pl.PreEnqueue(ctx, pod)
s = p.runPreEnqueuePlugin(ctx, pl, pod, shouldRecordMetric)
if s.IsSuccess() {
continue
}
@ -342,6 +366,16 @@ func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framewo
return true
}
func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.PreEnqueuePlugin, pod *v1.Pod, shouldRecordMetric bool) *framework.Status {
if !shouldRecordMetric {
return pl.PreEnqueue(ctx, pod)
}
startTime := p.clock.Now()
s := pl.PreEnqueue(ctx, pod)
p.metricsRecorder.ObservePluginDurationAsync(preEnqueue, pl.Name(), s.Code().String(), p.clock.Since(startTime).Seconds())
return s
}
// addToActiveQ tries to add pod to active queue. It returns 2 parameters:
// 1. a boolean flag to indicate whether the pod is added successfully.
// 2. an error for the caller to act on.

View File

@ -1599,11 +1599,12 @@ func TestPendingPodsMetric(t *testing.T) {
pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, "z", queueable, timestamp.Add(2*time.Second))
tests := []struct {
name string
operations []operation
operands [][]*framework.QueuedPodInfo
metricsName string
wants string
name string
operations []operation
operands [][]*framework.QueuedPodInfo
metricsName string
pluginMetricsSamplePercent int
wants string
}{
{
name: "add pods to activeQ and unschedulablePods",
@ -1765,6 +1766,59 @@ scheduler_pending_pods{queue="gated"} 5
scheduler_pending_pods{queue="unschedulable"} 20
`,
},
{
name: "the metrics should not be recorded (pluginMetricsSamplePercent=0)",
operations: []operation{
add,
},
operands: [][]*framework.QueuedPodInfo{
pInfos[:1],
},
metricsName: "scheduler_plugin_execution_duration_seconds",
pluginMetricsSamplePercent: 0,
wants: `
# HELP scheduler_plugin_execution_duration_seconds [ALPHA] Duration for running a plugin at a specific extension point.
# TYPE scheduler_plugin_execution_duration_seconds histogram
`, // the observed value will always be 0, because we don't proceed the fake clock.
},
{
name: "the metrics should be recorded (pluginMetricsSamplePercent=100)",
operations: []operation{
add,
},
operands: [][]*framework.QueuedPodInfo{
pInfos[:1],
},
metricsName: "scheduler_plugin_execution_duration_seconds",
pluginMetricsSamplePercent: 100,
wants: `
# HELP scheduler_plugin_execution_duration_seconds [ALPHA] Duration for running a plugin at a specific extension point.
# TYPE scheduler_plugin_execution_duration_seconds histogram
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="1e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="1.5000000000000002e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="2.2500000000000005e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="3.375000000000001e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="5.062500000000001e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="7.593750000000002e-05"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00011390625000000003"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00017085937500000006"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0002562890625000001"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00038443359375000017"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0005766503906250003"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0008649755859375004"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0012974633789062506"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0019461950683593758"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0029192926025390638"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.004378938903808595"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.006568408355712893"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.009852612533569338"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.014778918800354007"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.02216837820053101"} 1
scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="+Inf"} 1
scheduler_plugin_execution_duration_seconds_sum{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success"} 0
scheduler_plugin_execution_duration_seconds_count{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success"} 1
`, // the observed value will always be 0, because we don't proceed the fake clock.
},
}
resetMetrics := func() {
@ -1772,6 +1826,7 @@ scheduler_pending_pods{queue="unschedulable"} 20
metrics.BackoffPods().Set(0)
metrics.UnschedulablePods().Set(0)
metrics.GatedPods().Set(0)
metrics.PluginExecutionDuration.Reset()
}
for _, test := range tests {
@ -1781,13 +1836,16 @@ scheduler_pending_pods{queue="unschedulable"} 20
defer cancel()
m := map[string][]framework.PreEnqueuePlugin{"": {&preEnqueuePlugin{allowlists: []string{queueable}}}}
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m))
recorder := metrics.NewMetricsAsyncRecorder(3, 20*time.Microsecond, ctx.Done())
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m), WithPluginMetricsSamplePercent(test.pluginMetricsSamplePercent), WithMetricsRecorder(*recorder))
for i, op := range test.operations {
for _, pInfo := range test.operands[i] {
op(queue, pInfo)
}
}
recorder.FlushMetrics()
if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(test.wants), test.metricsName); err != nil {
t.Fatal(err)
}

View File

@ -283,6 +283,7 @@ func New(client clientset.Interface,
snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh)
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
@ -292,8 +293,10 @@ func New(client clientset.Interface,
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
@ -316,6 +319,8 @@ func New(client clientset.Interface,
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
internalqueue.WithMetricsRecorder(*metricsRecorder),
)
for _, fwk := range profiles {