From f32d735b50f3c3141690e5fa177b2b85a53bb4bc Mon Sep 17 00:00:00 2001 From: Cong Liu Date: Wed, 16 Oct 2019 10:15:54 -0400 Subject: [PATCH] Add permit_wait_duration_seconds metric for scheduler. --- pkg/scheduler/framework/v1alpha1/framework.go | 2 + .../framework/v1alpha1/framework_test.go | 91 +++++++++++++++++-- .../framework/v1alpha1/waiting_pods_map.go | 4 + pkg/scheduler/metrics/metrics.go | 11 +++ 4 files changed, 101 insertions(+), 7 deletions(-) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index aa22f0f0b6f..1d236f1ba8c 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -570,11 +570,13 @@ func (f *framework) RunPermitPlugins( // We now wait for the minimum duration if at least one plugin asked to // wait (and no plugin rejected the pod) if statusCode == Wait { + startTime := time.Now() w := newWaitingPod(pod, pluginsWaitTime) f.waitingPods.add(w) defer f.waitingPods.remove(pod.UID) klog.V(4).Infof("waiting for pod %q at permit", pod.Name) s := <-w.s + metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime)) if !s.IsSuccess() { if s.IsUnschedulable() { msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message()) diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index 089fe328b07..7ff56a931f7 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -727,7 +727,48 @@ func TestRecordingMetrics(t *testing.T) { tt.action(f) - collectAndCompare(t, tt.wantExtensionPoint, tt.wantStatus) + collectAndCompareFrameworkMetrics(t, tt.wantExtensionPoint, tt.wantStatus) + }) + } +} + +func TestPermitWaitingMetric(t *testing.T) { + tests := []struct { + name string + inject injectedResult + wantRes string + }{ + { + name: "Permit - Success", + }, + { + name: "Permit - Wait Timeout", + inject: injectedResult{PermitStatus: int(Wait)}, + wantRes: "Unschedulable", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &TestPlugin{name: testPlugin, inj: tt.inject} + r := make(Registry) + r.Register(testPlugin, + func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) { + return plugin, nil + }) + plugins := &config.Plugins{ + Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: testPlugin, Weight: 1}}}, + } + f, err := NewFramework(r, plugins, emptyArgs) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + metrics.Register() + metrics.PermitWaitDuration.Reset() + + f.RunPermitPlugins(context.TODO(), nil, pod, "") + + collectAndComparePermitWaitDuration(t, tt.wantRes) }) } } @@ -775,12 +816,8 @@ func injectNormalizeRes(inj injectedResult, scores NodeScoreList) *Status { return nil } -func collectAndCompare(t *testing.T, wantExtensionPoint string, wantStatus Code) { - ch := make(chan prometheus.Metric, 1) - m := &dto.Metric{} - metrics.FrameworkExtensionPointDuration.Collect(ch) - got := <-ch - got.Write(m) +func collectAndCompareFrameworkMetrics(t *testing.T, wantExtensionPoint string, wantStatus Code) { + m := collectHistogramMetric(metrics.FrameworkExtensionPointDuration) if len(m.Label) != 2 { t.Fatalf("Unexpected number of label pairs, got: %v, want: 2", len(m.Label)) @@ -802,3 +839,43 @@ func collectAndCompare(t *testing.T, wantExtensionPoint string, wantStatus Code) t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum) } } + +func collectAndComparePermitWaitDuration(t *testing.T, wantRes string) { + m := collectHistogramMetric(metrics.PermitWaitDuration) + if wantRes == "" { + if m != nil { + t.Errorf("PermitWaitDuration shouldn't be recorded but got %+v", m) + } + return + } + if wantRes != "" { + if len(m.Label) != 1 { + t.Fatalf("Unexpected number of label pairs, got: %v, want: 1", len(m.Label)) + } + + if *m.Label[0].Value != wantRes { + t.Errorf("Unexpected result label, got: %q, want %q", *m.Label[0].Value, wantRes) + } + + if *m.Histogram.SampleCount != 1 { + t.Errorf("Expect 1 sample, got: %v", m.Histogram.SampleCount) + } + + if *m.Histogram.SampleSum <= 0 { + t.Errorf("Expect latency to be greater than 0, got: %v", m.Histogram.SampleSum) + } + } +} + +func collectHistogramMetric(metric prometheus.Collector) *dto.Metric { + ch := make(chan prometheus.Metric, 1) + metric.Collect(ch) + select { + case got := <-ch: + m := &dto.Metric{} + got.Write(m) + return m + default: + return nil + } +} diff --git a/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go b/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go index ff727f92482..df7dbe6eeea 100644 --- a/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go +++ b/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go @@ -85,6 +85,10 @@ func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *wa } wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime)) + // The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the + // lock here so that time.AfterFunc can only execute after newWaitingPod finishes. + wp.mu.Lock() + defer wp.mu.Unlock() for k, v := range pluginsMaxWaitTime { plugin, waitTime := k, v wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() { diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 71512e0c446..aa4df6877db 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -262,6 +262,16 @@ var ( StabilityLevel: metrics.ALPHA, }, []string{"queue", "event"}) + PermitWaitDuration = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: SchedulerSubsystem, + Name: "permit_wait_duration_seconds", + Help: "Duration of waiting in RunPermitPlugins.", + Buckets: metrics.ExponentialBuckets(0.001, 2, 15), + StabilityLevel: metrics.ALPHA, + }, + []string{"result"}) + metricsList = []metrics.Registerable{ scheduleAttempts, SchedulingLatency, @@ -286,6 +296,7 @@ var ( FrameworkExtensionPointDuration, SchedulerQueueIncomingPods, SchedulerGoroutines, + PermitWaitDuration, } )