diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 9894e66df48..d8684f5ae01 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -215,6 +215,21 @@ func WithCaptureProfile(c CaptureProfile) Option { } } +// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl. +func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option { + return func(o *frameworkOptions) { + o.clusterEventMap = m + } +} + +// WithMetricsRecorder sets metrics recorder for the scheduling frameworkImpl. +func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option { + return func(o *frameworkOptions) { + o.metricsRecorder = r + } +} + +// defaultFrameworkOptions are applied when no option corresponding to those fields exist. func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions { return frameworkOptions{ metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh), @@ -223,13 +238,6 @@ func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions { } } -// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl. -func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option { - return func(o *frameworkOptions) { - o.clusterEventMap = m - } -} - var _ framework.Framework = &frameworkImpl{} // NewFramework initializes plugins given the configuration and the registry. diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index bff7584c4db..50fc17d63ce 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -29,6 +29,7 @@ package queue import ( "context" "fmt" + "math/rand" "reflect" "sync" "time" @@ -182,6 +183,10 @@ type PriorityQueue struct { closed bool nsLister listersv1.NamespaceLister + + metricsRecorder metrics.MetricAsyncRecorder + // pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled. + pluginMetricsSamplePercent int } type priorityQueueOptions struct { @@ -190,6 +195,8 @@ type priorityQueueOptions struct { podMaxBackoffDuration time.Duration podMaxInUnschedulablePodsDuration time.Duration podLister listersv1.PodLister + metricsRecorder metrics.MetricAsyncRecorder + pluginMetricsSamplePercent int clusterEventMap map[framework.ClusterEvent]sets.String preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin } @@ -246,6 +253,20 @@ func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option { } } +// WithMetricsRecorder sets metrics recorder. +func WithMetricsRecorder(recorder metrics.MetricAsyncRecorder) Option { + return func(o *priorityQueueOptions) { + o.metricsRecorder = recorder + } +} + +// WithPluginMetricsSamplePercent sets the percentage of plugin metrics to be sampled. +func WithPluginMetricsSamplePercent(percent int) Option { + return func(o *priorityQueueOptions) { + o.pluginMetricsSamplePercent = percent + } +} + var defaultPriorityQueueOptions = priorityQueueOptions{ clock: clock.RealClock{}, podInitialBackoffDuration: DefaultPodInitialBackoffDuration, @@ -298,6 +319,8 @@ func NewPriorityQueue( moveRequestCycle: -1, clusterEventMap: options.clusterEventMap, preEnqueuePluginMap: options.preEnqueuePluginMap, + metricsRecorder: options.metricsRecorder, + pluginMetricsSamplePercent: options.pluginMetricsSamplePercent, } pq.cond.L = &pq.lock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) @@ -325,8 +348,9 @@ func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framewo metrics.FrameworkExtensionPointDuration.WithLabelValues(preEnqueue, s.Code().String(), pod.Spec.SchedulerName).Observe(metrics.SinceInSeconds(startTime)) }() + shouldRecordMetric := rand.Intn(100) < p.pluginMetricsSamplePercent for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] { - s = pl.PreEnqueue(ctx, pod) + s = p.runPreEnqueuePlugin(ctx, pl, pod, shouldRecordMetric) if s.IsSuccess() { continue } @@ -342,6 +366,16 @@ func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framewo return true } +func (p *PriorityQueue) runPreEnqueuePlugin(ctx context.Context, pl framework.PreEnqueuePlugin, pod *v1.Pod, shouldRecordMetric bool) *framework.Status { + if !shouldRecordMetric { + return pl.PreEnqueue(ctx, pod) + } + startTime := p.clock.Now() + s := pl.PreEnqueue(ctx, pod) + p.metricsRecorder.ObservePluginDurationAsync(preEnqueue, pl.Name(), s.Code().String(), p.clock.Since(startTime).Seconds()) + return s +} + // addToActiveQ tries to add pod to active queue. It returns 2 parameters: // 1. a boolean flag to indicate whether the pod is added successfully. // 2. an error for the caller to act on. diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index aa02acf7301..0a74e45f960 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -1599,11 +1599,12 @@ func TestPendingPodsMetric(t *testing.T) { pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, "z", queueable, timestamp.Add(2*time.Second)) tests := []struct { - name string - operations []operation - operands [][]*framework.QueuedPodInfo - metricsName string - wants string + name string + operations []operation + operands [][]*framework.QueuedPodInfo + metricsName string + pluginMetricsSamplePercent int + wants string }{ { name: "add pods to activeQ and unschedulablePods", @@ -1765,6 +1766,59 @@ scheduler_pending_pods{queue="gated"} 5 scheduler_pending_pods{queue="unschedulable"} 20 `, }, + { + name: "the metrics should not be recorded (pluginMetricsSamplePercent=0)", + operations: []operation{ + add, + }, + operands: [][]*framework.QueuedPodInfo{ + pInfos[:1], + }, + metricsName: "scheduler_plugin_execution_duration_seconds", + pluginMetricsSamplePercent: 0, + wants: ` +# HELP scheduler_plugin_execution_duration_seconds [ALPHA] Duration for running a plugin at a specific extension point. +# TYPE scheduler_plugin_execution_duration_seconds histogram +`, // the observed value will always be 0, because we don't proceed the fake clock. + }, + { + name: "the metrics should be recorded (pluginMetricsSamplePercent=100)", + operations: []operation{ + add, + }, + operands: [][]*framework.QueuedPodInfo{ + pInfos[:1], + }, + metricsName: "scheduler_plugin_execution_duration_seconds", + pluginMetricsSamplePercent: 100, + wants: ` +# HELP scheduler_plugin_execution_duration_seconds [ALPHA] Duration for running a plugin at a specific extension point. +# TYPE scheduler_plugin_execution_duration_seconds histogram +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="1e-05"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="1.5000000000000002e-05"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="2.2500000000000005e-05"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="3.375000000000001e-05"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="5.062500000000001e-05"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="7.593750000000002e-05"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00011390625000000003"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00017085937500000006"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0002562890625000001"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.00038443359375000017"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0005766503906250003"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0008649755859375004"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0012974633789062506"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0019461950683593758"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.0029192926025390638"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.004378938903808595"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.006568408355712893"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.009852612533569338"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.014778918800354007"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="0.02216837820053101"} 1 +scheduler_plugin_execution_duration_seconds_bucket{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success",le="+Inf"} 1 +scheduler_plugin_execution_duration_seconds_sum{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success"} 0 +scheduler_plugin_execution_duration_seconds_count{extension_point="PreEnqueue",plugin="preEnqueuePlugin",status="Success"} 1 +`, // the observed value will always be 0, because we don't proceed the fake clock. + }, } resetMetrics := func() { @@ -1772,6 +1826,7 @@ scheduler_pending_pods{queue="unschedulable"} 20 metrics.BackoffPods().Set(0) metrics.UnschedulablePods().Set(0) metrics.GatedPods().Set(0) + metrics.PluginExecutionDuration.Reset() } for _, test := range tests { @@ -1781,13 +1836,16 @@ scheduler_pending_pods{queue="unschedulable"} 20 defer cancel() m := map[string][]framework.PreEnqueuePlugin{"": {&preEnqueuePlugin{allowlists: []string{queueable}}}} - queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m)) + recorder := metrics.NewMetricsAsyncRecorder(3, 20*time.Microsecond, ctx.Done()) + queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m), WithPluginMetricsSamplePercent(test.pluginMetricsSamplePercent), WithMetricsRecorder(*recorder)) for i, op := range test.operations { for _, pInfo := range test.operands[i] { op(queue, pInfo) } } + recorder.FlushMetrics() + if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(test.wants), test.metricsName); err != nil { t.Fatal(err) } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ad88ad982ad..540d46f038b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -283,6 +283,7 @@ func New(client clientset.Interface, snapshot := internalcache.NewEmptySnapshot() clusterEventMap := make(map[framework.ClusterEvent]sets.String) + metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh) profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, stopCh, frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), @@ -292,8 +293,10 @@ func New(client clientset.Interface, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), frameworkruntime.WithClusterEventMap(clusterEventMap), + frameworkruntime.WithClusterEventMap(clusterEventMap), frameworkruntime.WithParallelism(int(options.parallelism)), frameworkruntime.WithExtenders(extenders), + frameworkruntime.WithMetricsRecorder(metricsRecorder), ) if err != nil { return nil, fmt.Errorf("initializing profiles: %v", err) @@ -316,6 +319,8 @@ func New(client clientset.Interface, internalqueue.WithClusterEventMap(clusterEventMap), internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap), + internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent), + internalqueue.WithMetricsRecorder(*metricsRecorder), ) for _, fwk := range profiles {