From b5ed15b94ab11acf1f89822df6a04013c3335008 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Wed, 4 Sep 2024 21:59:27 +0900 Subject: [PATCH] feat: implement a force flush --- pkg/scheduler/backend/queue/active_queue.go | 15 ++++++++++----- pkg/scheduler/metrics/metric_recorder.go | 5 +++-- pkg/scheduler/metrics/metric_recorder_test.go | 14 ++++++++++---- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index 6b3183dffbf..456df160d3c 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -204,7 +204,7 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) aq.schedCycle++ // In flight, no concurrent events yet. if aq.isSchedulingQueueHintEnabled { - aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1) + aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false) aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod) } @@ -297,7 +297,7 @@ func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event frame _, ok := aq.inFlightPods[newPod.UID] if ok { - aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1) + aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false) aq.inFlightEvents.PushBack(&clusterEvent{ event: event, oldObj: oldPod, @@ -314,7 +314,7 @@ func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event f defer aq.lock.Unlock() if len(aq.inFlightPods) != 0 { - aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1) + aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false) aq.inFlightEvents.PushBack(&clusterEvent{ event: event, oldObj: oldObj, @@ -346,7 +346,6 @@ func (aq *activeQueue) done(pod types.UID) { // Remove the pod from the list. aq.inFlightEvents.Remove(inFlightPod) - aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1) aggrMetricsCounter := map[string]int{} // Remove events which are only referred to by this Pod @@ -370,8 +369,14 @@ func (aq *activeQueue) done(pod types.UID) { } for evLabel, count := range aggrMetricsCounter { - aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count)) + aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count), false) } + + aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1, + // If it's the last Pod in inFlightPods, we should force-flush the metrics. + // Otherwise, especially in small clusters, which don't get a new Pod frequently, + // the metrics might not be flushed for a long time. + len(aq.inFlightPods) == 0) } // close closes the activeQueue. diff --git a/pkg/scheduler/metrics/metric_recorder.go b/pkg/scheduler/metrics/metric_recorder.go index 9146a7c8296..f59ea1e96f7 100644 --- a/pkg/scheduler/metrics/metric_recorder.go +++ b/pkg/scheduler/metrics/metric_recorder.go @@ -151,15 +151,16 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event } // ObserveInFlightEventsAsync observes the in_flight_events metric. +// // Note that this function is not goroutine-safe; // we don't lock the map deliberately for the performance reason and we assume the queue (i.e., the caller) takes lock before updating the in-flight events. -func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64) { +func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64, forceFlush bool) { r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd) // Only flush the metric to the channel if the interval is reached. // The values are flushed to Prometheus in the run() function, which runs once the interval time. // Note: we implement this flushing here, not in FlushMetrics, because, if we did so, we would need to implement a lock for the map, which we want to avoid. - if time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval { + if forceFlush || time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval { for key, value := range r.aggregatedInflightEventMetric { newMetric := &gaugeVecMetric{ metric: InFlightEvents, diff --git a/pkg/scheduler/metrics/metric_recorder_test.go b/pkg/scheduler/metrics/metric_recorder_test.go index 3abf6a4dacd..ad12c032ed1 100644 --- a/pkg/scheduler/metrics/metric_recorder_test.go +++ b/pkg/scheduler/metrics/metric_recorder_test.go @@ -116,9 +116,9 @@ func TestInFlightEventAsync(t *testing.T) { } podAddLabel := "Pod/Add" - r.ObserveInFlightEventsAsync(podAddLabel, 10) - r.ObserveInFlightEventsAsync(podAddLabel, -1) - r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1) + r.ObserveInFlightEventsAsync(podAddLabel, 10, false) + r.ObserveInFlightEventsAsync(podAddLabel, -1, false) + r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1, false) if d := cmp.Diff(r.aggregatedInflightEventMetric, map[gaugeVecMetricKey]int{ {metricName: InFlightEvents.Name, labelValue: podAddLabel}: 9, @@ -130,7 +130,7 @@ func TestInFlightEventAsync(t *testing.T) { r.aggregatedInflightEventMetricLastFlushTime = time.Now().Add(-time.Hour) // to test flush // It adds -4 and flushes the metric to the channel. - r.ObserveInFlightEventsAsync(podAddLabel, -4) + r.ObserveInFlightEventsAsync(podAddLabel, -4, false) if len(r.aggregatedInflightEventMetric) != 0 { t.Errorf("aggregatedInflightEventMetric should be cleared, but got: %v", r.aggregatedInflightEventMetric) } @@ -164,4 +164,10 @@ func TestInFlightEventAsync(t *testing.T) { }, cmp.AllowUnexported(gaugeVecMetric{}), cmpopts.IgnoreFields(gaugeVecMetric{}, "metric")); d != "" { t.Errorf("unexpected metrics are sent to aggregatedInflightEventMetricBufferCh: %s", d) } + + // Test force flush + r.ObserveInFlightEventsAsync(podAddLabel, 1, true) + if len(r.aggregatedInflightEventMetric) != 0 { + t.Errorf("aggregatedInflightEventMetric should be force-flushed, but got: %v", r.aggregatedInflightEventMetric) + } }