diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index 68be798ab40..6b3183dffbf 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -123,14 +123,17 @@ type activeQueue struct { // isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled. isSchedulingQueueHintEnabled bool + + metricsRecorder metrics.MetricAsyncRecorder } -func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool) *activeQueue { +func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder) *activeQueue { aq := &activeQueue{ queue: queue, inFlightPods: make(map[types.UID]*list.Element), inFlightEvents: list.New(), isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled, + metricsRecorder: metricRecorder, } aq.cond.L = &aq.lock @@ -201,6 +204,7 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) aq.schedCycle++ // In flight, no concurrent events yet. if aq.isSchedulingQueueHintEnabled { + aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1) aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod) } @@ -293,6 +297,7 @@ func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event frame _, ok := aq.inFlightPods[newPod.UID] if ok { + aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1) aq.inFlightEvents.PushBack(&clusterEvent{ event: event, oldObj: oldPod, @@ -309,6 +314,7 @@ func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event f defer aq.lock.Unlock() if len(aq.inFlightPods) != 0 { + aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1) aq.inFlightEvents.PushBack(&clusterEvent{ event: event, oldObj: oldObj, @@ -340,7 +346,9 @@ func (aq *activeQueue) done(pod types.UID) { // Remove the pod from the list. aq.inFlightEvents.Remove(inFlightPod) + aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1) + aggrMetricsCounter := map[string]int{} // Remove events which are only referred to by this Pod // so that the inFlightEvents list doesn't grow infinitely. // If the pod was at the head of the list, then all @@ -352,11 +360,17 @@ func (aq *activeQueue) done(pod types.UID) { // Empty list. break } - if _, ok := e.Value.(*clusterEvent); !ok { + ev, ok := e.Value.(*clusterEvent) + if !ok { // A pod, must stop pruning. break } aq.inFlightEvents.Remove(e) + aggrMetricsCounter[ev.event.Label]-- + } + + for evLabel, count := range aggrMetricsCounter { + aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count)) } } diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 9e787d1ed53..6186036920b 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -331,7 +331,7 @@ func NewPriorityQueue( podInitialBackoffDuration: options.podInitialBackoffDuration, podMaxBackoffDuration: options.podMaxBackoffDuration, podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, - activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled), + activeQ: newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessFn), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder), unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), preEnqueuePluginMap: options.preEnqueuePluginMap, queueingHintMap: options.queueingHintMap, diff --git a/pkg/scheduler/backend/queue/testing.go b/pkg/scheduler/backend/queue/testing.go index e620fc1cab9..f619bac8495 100644 --- a/pkg/scheduler/backend/queue/testing.go +++ b/pkg/scheduler/backend/queue/testing.go @@ -18,11 +18,13 @@ package queue import ( "context" + "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/metrics" ) // NewTestQueue creates a priority queue with an empty informer factory. @@ -39,6 +41,12 @@ func NewTestQueueWithObjects( opts ...Option, ) *PriorityQueue { informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(objs...), 0) + + // Because some major functions (e.g., Pop) requires the metric recorder to be set, + // we always set a metric recorder here. + recorder := metrics.NewMetricsAsyncRecorder(10, 20*time.Microsecond, ctx.Done()) + // We set it before the options that users provide, so that users can override it. + opts = append([]Option{WithMetricsRecorder(*recorder)}, opts...) return NewTestQueueWithInformerFactory(ctx, lessFn, informerFactory, opts...) } diff --git a/pkg/scheduler/metrics/metric_recorder.go b/pkg/scheduler/metrics/metric_recorder.go index b7454ba094b..3400e5aad23 100644 --- a/pkg/scheduler/metrics/metric_recorder.go +++ b/pkg/scheduler/metrics/metric_recorder.go @@ -80,23 +80,42 @@ func (r *PendingPodsRecorder) Clear() { r.recorder.Set(float64(0)) } -// metric is the data structure passed in the buffer channel between the main framework thread +// histgramVecMetric is the data structure passed in the buffer channel between the main framework thread // and the metricsRecorder goroutine. -type metric struct { +type histgramVecMetric struct { metric *metrics.HistogramVec labelValues []string value float64 } +type gaugeVecMetric struct { + metric *metrics.GaugeVec + labelValues []string + valueToAdd float64 +} + +type gaugeVecMetricKey struct { + metricName string + labelValue string +} + // MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path. type MetricAsyncRecorder struct { // bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it. - bufferCh chan *metric + bufferCh chan *histgramVecMetric // if bufferSize is reached, incoming metrics will be discarded. bufferSize int // how often the recorder runs to flush the metrics. interval time.Duration + // aggregatedInflightEventMetric is only to record InFlightEvents metric asynchronously. + // It's a map from gaugeVecMetricKey to the aggregated value + // and the aggregated value is flushed to Prometheus every time the interval is reached. + // Note that we don't lock the map deliberately because we assume the queue takes lock before updating the in-flight events. + aggregatedInflightEventMetric map[gaugeVecMetricKey]int + aggregatedInflightEventMetricLastFlushTime time.Time + aggregatedInflightEventMetricBufferCh chan *gaugeVecMetric + // stopCh is used to stop the goroutine which periodically flushes metrics. stopCh <-chan struct{} // IsStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure @@ -106,11 +125,14 @@ type MetricAsyncRecorder struct { func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder { recorder := &MetricAsyncRecorder{ - bufferCh: make(chan *metric, bufferSize), - bufferSize: bufferSize, - interval: interval, - stopCh: stopCh, - IsStoppedCh: make(chan struct{}), + bufferCh: make(chan *histgramVecMetric, bufferSize), + bufferSize: bufferSize, + interval: interval, + stopCh: stopCh, + aggregatedInflightEventMetric: make(map[gaugeVecMetricKey]int), + aggregatedInflightEventMetricLastFlushTime: time.Now(), + aggregatedInflightEventMetricBufferCh: make(chan *gaugeVecMetric, bufferSize), + IsStoppedCh: make(chan struct{}), } go recorder.run() return recorder @@ -128,8 +150,32 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event r.observeMetricAsync(queueingHintExecutionDuration, value, pluginName, event, hint) } +// ObserveInFlightEventsAsync observes the in_flight_events metric. +// Note that this function is not goroutine-safe; +// we don't lock the map deliberately for the performance reason and we assume the queue (i.e., the caller) takes lock before updating the in-flight events. +func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64) { + r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd) + + // Only flush the metric to the channal if the interval is reached. + // The values are flushed to Prometheus in the run() function, which runs once the interval time. + if time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval { + for key, value := range r.aggregatedInflightEventMetric { + newMetric := &gaugeVecMetric{ + metric: InFlightEvents, + labelValues: []string{key.labelValue}, + valueToAdd: float64(value), + } + select { + case r.aggregatedInflightEventMetricBufferCh <- newMetric: + default: + } + } + r.aggregatedInflightEventMetricLastFlushTime = time.Now() + } +} + func (r *MetricAsyncRecorder) observeMetricAsync(m *metrics.HistogramVec, value float64, labelsValues ...string) { - newMetric := &metric{ + newMetric := &histgramVecMetric{ metric: m, labelValues: labelsValues, value: value, @@ -161,7 +207,14 @@ func (r *MetricAsyncRecorder) FlushMetrics() { case m := <-r.bufferCh: m.metric.WithLabelValues(m.labelValues...).Observe(m.value) default: - return + // no more value + } + + select { + case m := <-r.aggregatedInflightEventMetricBufferCh: + m.metric.WithLabelValues(m.labelValues...).Add(m.valueToAdd) + default: + // no more value } } } diff --git a/pkg/scheduler/metrics/metric_recorder_test.go b/pkg/scheduler/metrics/metric_recorder_test.go index 833a891f291..dabf2b153e8 100644 --- a/pkg/scheduler/metrics/metric_recorder_test.go +++ b/pkg/scheduler/metrics/metric_recorder_test.go @@ -17,9 +17,14 @@ limitations under the License. package metrics import ( + "sort" "sync" "sync/atomic" "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" ) var _ MetricRecorder = &fakePodsRecorder{} @@ -101,3 +106,59 @@ func TestClear(t *testing.T) { t.Errorf("Expected %v, got %v", 0, fakeRecorder.counter) } } + +func TestInFlightEventAsync(t *testing.T) { + r := &MetricAsyncRecorder{ + aggregatedInflightEventMetric: map[gaugeVecMetricKey]int{}, + aggregatedInflightEventMetricLastFlushTime: time.Now(), + aggregatedInflightEventMetricBufferCh: make(chan *gaugeVecMetric, 100), + interval: time.Hour, + } + + podAddLabel := "Pod/Add" + r.ObserveInFlightEventsAsync(podAddLabel, 10) + r.ObserveInFlightEventsAsync(podAddLabel, -1) + r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1) + + if d := cmp.Diff(r.aggregatedInflightEventMetric, map[gaugeVecMetricKey]int{ + {metricName: InFlightEvents.Name, labelValue: podAddLabel}: 9, + {metricName: InFlightEvents.Name, labelValue: PodPoppedInFlightEvent}: 1, + }, cmp.AllowUnexported(gaugeVecMetric{})); d != "" { + t.Errorf("unexpected aggregatedInflightEventMetric: %s", d) + } + + r.aggregatedInflightEventMetricLastFlushTime = time.Now().Add(-time.Hour) // to test flush + + // It adds -4 and flushes the metric to the channel. + r.ObserveInFlightEventsAsync(podAddLabel, -4) + + got := []gaugeVecMetric{} + for { + select { + case m := <-r.aggregatedInflightEventMetricBufferCh: + got = append(got, *m) + continue + default: + } + // got all + break + } + + // sort got to avoid the flaky test + sort.Slice(got, func(i, j int) bool { + return got[i].labelValues[0] < got[j].labelValues[0] + }) + + if d := cmp.Diff(got, []gaugeVecMetric{ + { + labelValues: []string{podAddLabel}, + valueToAdd: 5, + }, + { + labelValues: []string{PodPoppedInFlightEvent}, + valueToAdd: 1, + }, + }, cmp.AllowUnexported(gaugeVecMetric{}), cmpopts.IgnoreFields(gaugeVecMetric{}, "metric")); d != "" { + t.Errorf("unexpected metrics are sent to aggregatedInflightEventMetricBufferCh: %s", d) + } +} diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index fe3fd8f459e..992c01e0456 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -81,6 +81,10 @@ const ( QueueingHintResultError = "Error" ) +const ( + PodPoppedInFlightEvent = "PodPopped" +) + // All the histogram based metrics have 1ms as size for the smallest bucket. var ( scheduleAttempts = metrics.NewCounterVec( @@ -141,6 +145,13 @@ var ( Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.", StabilityLevel: metrics.STABLE, }, []string{"queue"}) + InFlightEvents = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: SchedulerSubsystem, + Name: "inflight_events", + Help: "Number of events recorded in the scheduling queue.", + StabilityLevel: metrics.ALPHA, + }, []string{"event"}) Goroutines = metrics.NewGaugeVec( &metrics.GaugeOpts{ Subsystem: SchedulerSubsystem, @@ -292,6 +303,7 @@ func Register() { RegisterMetrics(metricsList...) if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { RegisterMetrics(queueingHintExecutionDuration) + RegisterMetrics(InFlightEvents) } volumebindingmetrics.RegisterVolumeSchedulingMetrics() })