feat: implement a force flush

This commit is contained in:
Kensei Nakada 2024-09-04 21:59:27 +09:00
parent 0ac5d745fe
commit b5ed15b94a
3 changed files with 23 additions and 11 deletions

View File

@ -204,7 +204,7 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
aq.schedCycle++ aq.schedCycle++
// In flight, no concurrent events yet. // In flight, no concurrent events yet.
if aq.isSchedulingQueueHintEnabled { if aq.isSchedulingQueueHintEnabled {
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1) aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false)
aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod) aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod)
} }
@ -297,7 +297,7 @@ func (aq *activeQueue) addEventIfPodInFlight(oldPod, newPod *v1.Pod, event frame
_, ok := aq.inFlightPods[newPod.UID] _, ok := aq.inFlightPods[newPod.UID]
if ok { if ok {
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1) aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false)
aq.inFlightEvents.PushBack(&clusterEvent{ aq.inFlightEvents.PushBack(&clusterEvent{
event: event, event: event,
oldObj: oldPod, oldObj: oldPod,
@ -314,7 +314,7 @@ func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event f
defer aq.lock.Unlock() defer aq.lock.Unlock()
if len(aq.inFlightPods) != 0 { if len(aq.inFlightPods) != 0 {
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1) aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label, 1, false)
aq.inFlightEvents.PushBack(&clusterEvent{ aq.inFlightEvents.PushBack(&clusterEvent{
event: event, event: event,
oldObj: oldObj, oldObj: oldObj,
@ -346,7 +346,6 @@ func (aq *activeQueue) done(pod types.UID) {
// Remove the pod from the list. // Remove the pod from the list.
aq.inFlightEvents.Remove(inFlightPod) aq.inFlightEvents.Remove(inFlightPod)
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1)
aggrMetricsCounter := map[string]int{} aggrMetricsCounter := map[string]int{}
// Remove events which are only referred to by this Pod // Remove events which are only referred to by this Pod
@ -370,8 +369,14 @@ func (aq *activeQueue) done(pod types.UID) {
} }
for evLabel, count := range aggrMetricsCounter { for evLabel, count := range aggrMetricsCounter {
aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count)) aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count), false)
} }
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1,
// If it's the last Pod in inFlightPods, we should force-flush the metrics.
// Otherwise, especially in small clusters, which don't get a new Pod frequently,
// the metrics might not be flushed for a long time.
len(aq.inFlightPods) == 0)
} }
// close closes the activeQueue. // close closes the activeQueue.

View File

@ -151,15 +151,16 @@ func (r *MetricAsyncRecorder) ObserveQueueingHintDurationAsync(pluginName, event
} }
// ObserveInFlightEventsAsync observes the in_flight_events metric. // ObserveInFlightEventsAsync observes the in_flight_events metric.
//
// Note that this function is not goroutine-safe; // Note that this function is not goroutine-safe;
// we don't lock the map deliberately for the performance reason and we assume the queue (i.e., the caller) takes lock before updating the in-flight events. // we don't lock the map deliberately for the performance reason and we assume the queue (i.e., the caller) takes lock before updating the in-flight events.
func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64) { func (r *MetricAsyncRecorder) ObserveInFlightEventsAsync(eventLabel string, valueToAdd float64, forceFlush bool) {
r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd) r.aggregatedInflightEventMetric[gaugeVecMetricKey{metricName: InFlightEvents.Name, labelValue: eventLabel}] += int(valueToAdd)
// Only flush the metric to the channel if the interval is reached. // Only flush the metric to the channel if the interval is reached.
// The values are flushed to Prometheus in the run() function, which runs once the interval time. // The values are flushed to Prometheus in the run() function, which runs once the interval time.
// Note: we implement this flushing here, not in FlushMetrics, because, if we did so, we would need to implement a lock for the map, which we want to avoid. // Note: we implement this flushing here, not in FlushMetrics, because, if we did so, we would need to implement a lock for the map, which we want to avoid.
if time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval { if forceFlush || time.Since(r.aggregatedInflightEventMetricLastFlushTime) > r.interval {
for key, value := range r.aggregatedInflightEventMetric { for key, value := range r.aggregatedInflightEventMetric {
newMetric := &gaugeVecMetric{ newMetric := &gaugeVecMetric{
metric: InFlightEvents, metric: InFlightEvents,

View File

@ -116,9 +116,9 @@ func TestInFlightEventAsync(t *testing.T) {
} }
podAddLabel := "Pod/Add" podAddLabel := "Pod/Add"
r.ObserveInFlightEventsAsync(podAddLabel, 10) r.ObserveInFlightEventsAsync(podAddLabel, 10, false)
r.ObserveInFlightEventsAsync(podAddLabel, -1) r.ObserveInFlightEventsAsync(podAddLabel, -1, false)
r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1) r.ObserveInFlightEventsAsync(PodPoppedInFlightEvent, 1, false)
if d := cmp.Diff(r.aggregatedInflightEventMetric, map[gaugeVecMetricKey]int{ if d := cmp.Diff(r.aggregatedInflightEventMetric, map[gaugeVecMetricKey]int{
{metricName: InFlightEvents.Name, labelValue: podAddLabel}: 9, {metricName: InFlightEvents.Name, labelValue: podAddLabel}: 9,
@ -130,7 +130,7 @@ func TestInFlightEventAsync(t *testing.T) {
r.aggregatedInflightEventMetricLastFlushTime = time.Now().Add(-time.Hour) // to test flush r.aggregatedInflightEventMetricLastFlushTime = time.Now().Add(-time.Hour) // to test flush
// It adds -4 and flushes the metric to the channel. // It adds -4 and flushes the metric to the channel.
r.ObserveInFlightEventsAsync(podAddLabel, -4) r.ObserveInFlightEventsAsync(podAddLabel, -4, false)
if len(r.aggregatedInflightEventMetric) != 0 { if len(r.aggregatedInflightEventMetric) != 0 {
t.Errorf("aggregatedInflightEventMetric should be cleared, but got: %v", r.aggregatedInflightEventMetric) t.Errorf("aggregatedInflightEventMetric should be cleared, but got: %v", r.aggregatedInflightEventMetric)
} }
@ -164,4 +164,10 @@ func TestInFlightEventAsync(t *testing.T) {
}, cmp.AllowUnexported(gaugeVecMetric{}), cmpopts.IgnoreFields(gaugeVecMetric{}, "metric")); d != "" { }, cmp.AllowUnexported(gaugeVecMetric{}), cmpopts.IgnoreFields(gaugeVecMetric{}, "metric")); d != "" {
t.Errorf("unexpected metrics are sent to aggregatedInflightEventMetricBufferCh: %s", d) t.Errorf("unexpected metrics are sent to aggregatedInflightEventMetricBufferCh: %s", d)
} }
// Test force flush
r.ObserveInFlightEventsAsync(podAddLabel, 1, true)
if len(r.aggregatedInflightEventMetric) != 0 {
t.Errorf("aggregatedInflightEventMetric should be force-flushed, but got: %v", r.aggregatedInflightEventMetric)
}
} }