diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index d2776cbcc01..e14de634892 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -497,6 +497,7 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework continue } + start := time.Now() hint, err := hintfn.QueueingHintFn(logger, pod, oldObj, newObj) if err != nil { // If the QueueingHintFn returned an error, we should treat the event as Queue so that we can prevent @@ -509,6 +510,8 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework } hint = framework.Queue } + p.metricsRecorder.ObserveQueueingHintDurationAsync(hintfn.PluginName, event.Label, queueingHintToLabel(hint, err), metrics.SinceInSeconds(start)) + if hint == framework.QueueSkip { continue } @@ -536,6 +539,23 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework return queueStrategy } +// queueingHintToLabel converts a hint and an error from QHint to a label string. +func queueingHintToLabel(hint framework.QueueingHint, err error) string { + if err != nil { + return metrics.QueueingHintResultError + } + + switch hint { + case framework.Queue: + return metrics.QueueingHintResultQueue + case framework.QueueSkip: + return metrics.QueueingHintResultQueueSkip + } + + // Shouldn't reach here. + return "" +} + // runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin. // It returns true if all PreEnqueue function run successfully; otherwise returns false // upon the first failure. diff --git a/pkg/scheduler/metrics/metric_recorder.go b/pkg/scheduler/metrics/metric_recorder.go index 1146f81cd9e..b7454ba094b 100644 --- a/pkg/scheduler/metrics/metric_recorder.go +++ b/pkg/scheduler/metrics/metric_recorder.go @@ -119,9 +119,19 @@ func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-ch // ObservePluginDurationAsync observes the plugin_execution_duration_seconds metric. // The metric will be flushed to Prometheus asynchronously. func (r *MetricAsyncRecorder) ObservePluginDurationAsync(extensionPoint, pluginName, status string, value float64) { + r.observeMetricAsync(PluginExecutionDuration, value, pluginName, extensionPoint, status) +} + +// ObserveQueueingHintDurationAsync observes the queueing_hint_execution_duration_seconds metric. +// The metric will be flushed to Prometheus asynchronously. +func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event, hint string, value float64) { + r.observeMetricAsync(queueingHintExecutionDuration, value, pluginName, event, hint) +} + +func (r *MetricAsyncRecorder) observeMetricAsync(m *metrics.HistogramVec, value float64, labelsValues ...string) { newMetric := &metric{ - metric: PluginExecutionDuration, - labelValues: []string{pluginName, extensionPoint, status}, + metric: m, + labelValues: labelsValues, value: value, } select { diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 080aaca6060..fe3fd8f459e 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -20,8 +20,10 @@ import ( "sync" "time" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/kubernetes/pkg/features" volumebindingmetrics "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics" ) @@ -73,6 +75,12 @@ const ( Permit = "Permit" ) +const ( + QueueingHintResultQueue = "Queue" + QueueingHintResultQueueSkip = "QueueSkip" + QueueingHintResultError = "Error" +) + // All the histogram based metrics have 1ms as size for the smallest bucket. var ( scheduleAttempts = metrics.NewCounterVec( @@ -198,6 +206,19 @@ var ( }, []string{"plugin", "extension_point", "status"}) + // This is only available when the QHint feature gate is enabled. + queueingHintExecutionDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "queueing_hint_execution_duration_seconds", + Help: "Duration for running a queueing hint function of a plugin.", + // Start with 0.01ms with the last bucket being [~22ms, Inf). We use a small factor (1.5) + // so that we have better granularity since plugin latency is very sensitive. + Buckets: metrics.ExponentialBuckets(0.00001, 1.5, 20), + StabilityLevel: metrics.ALPHA, + }, + []string{"plugin", "event", "hint"}) + SchedulerQueueIncomingPods = metrics.NewCounterVec( &metrics.CounterOpts{ Subsystem: SchedulerSubsystem, @@ -269,6 +290,9 @@ func Register() { // Register the metrics. registerMetrics.Do(func() { RegisterMetrics(metricsList...) + if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { + RegisterMetrics(queueingHintExecutionDuration) + } volumebindingmetrics.RegisterVolumeSchedulingMetrics() }) }