diff --git a/staging/src/k8s.io/component-base/metrics/testutil/BUILD b/staging/src/k8s.io/component-base/metrics/testutil/BUILD index b757d1dad61..ba04be0ae8d 100644 --- a/staging/src/k8s.io/component-base/metrics/testutil/BUILD +++ b/staging/src/k8s.io/component-base/metrics/testutil/BUILD @@ -13,6 +13,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", "//staging/src/k8s.io/component-base/metrics:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus/testutil:go_default_library", + "//vendor/github.com/prometheus/client_model/go:go_default_library", "//vendor/github.com/prometheus/common/expfmt:go_default_library", "//vendor/github.com/prometheus/common/model:go_default_library", ], @@ -34,7 +35,14 @@ filegroup( go_test( name = "go_default_test", - srcs = ["testutil_test.go"], + srcs = [ + "metrics_test.go", + "testutil_test.go", + ], embed = [":go_default_library"], - deps = ["//staging/src/k8s.io/component-base/metrics:go_default_library"], + deps = [ + "//staging/src/k8s.io/component-base/metrics:go_default_library", + "//vendor/github.com/prometheus/client_model/go:go_default_library", + "//vendor/k8s.io/utils/pointer:go_default_library", + ], ) diff --git a/staging/src/k8s.io/component-base/metrics/testutil/metrics.go b/staging/src/k8s.io/component-base/metrics/testutil/metrics.go index 9b154dfad32..4095d4233ca 100644 --- a/staging/src/k8s.io/component-base/metrics/testutil/metrics.go +++ b/staging/src/k8s.io/component-base/metrics/testutil/metrics.go @@ -19,11 +19,16 @@ package testutil import ( "fmt" "io" + "math" "reflect" + "sort" "strings" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + + "k8s.io/component-base/metrics" ) var ( @@ -178,3 +183,140 @@ func ValidateMetrics(metrics Metrics, metricName string, expectedLabels ...strin } return nil } + +// Histogram wraps prometheus histogram DTO (data transfer object) +type Histogram struct { + *dto.Histogram +} + +// GetHistogramFromGatherer collects a metric from a gatherer implementing k8s.io/component-base/metrics.Gatherer interface. +// Used only for testing purposes where we need to gather metrics directly from a running binary (without metrics endpoint). +func GetHistogramFromGatherer(gatherer metrics.Gatherer, metricName string) (Histogram, error) { + var metricFamily *dto.MetricFamily + m, err := gatherer.Gather() + if err != nil { + return Histogram{}, err + } + for _, mFamily := range m { + if mFamily.Name != nil && *mFamily.Name == metricName { + metricFamily = mFamily + break + } + } + + if metricFamily == nil { + return Histogram{}, fmt.Errorf("Metric %q not found", metricName) + } + + if metricFamily.GetMetric() == nil { + return Histogram{}, fmt.Errorf("Metric %q is empty", metricName) + } + + if len(metricFamily.GetMetric()) == 0 { + return Histogram{}, fmt.Errorf("Metric %q is empty", metricName) + } + + return Histogram{ + // Histograms are stored under the first index (based on observation). + // Given there's only one histogram registered per each metric name, accessing + // the first index is sufficient. + metricFamily.GetMetric()[0].GetHistogram(), + }, nil +} + +func uint64Ptr(u uint64) *uint64 { + return &u +} + +// Bucket of a histogram +type bucket struct { + upperBound float64 + count float64 +} + +func bucketQuantile(q float64, buckets []bucket) float64 { + if q < 0 { + return math.Inf(-1) + } + if q > 1 { + return math.Inf(+1) + } + + if len(buckets) < 2 { + return math.NaN() + } + + rank := q * buckets[len(buckets)-1].count + b := sort.Search(len(buckets)-1, func(i int) bool { return buckets[i].count >= rank }) + + if b == 0 { + return buckets[0].upperBound * (rank / buckets[0].count) + } + + // linear approximation of b-th bucket + brank := rank - buckets[b-1].count + bSize := buckets[b].upperBound - buckets[b-1].upperBound + bCount := buckets[b].count - buckets[b-1].count + + return buckets[b-1].upperBound + bSize*(brank/bCount) +} + +// Quantile computes q-th quantile of a cumulative histogram. +// It's expected the histogram is valid (by calling Validate) +func (hist *Histogram) Quantile(q float64) float64 { + buckets := []bucket{} + + for _, bckt := range hist.Bucket { + buckets = append(buckets, bucket{ + count: float64(*bckt.CumulativeCount), + upperBound: *bckt.UpperBound, + }) + } + + // bucketQuantile expects the upper bound of the last bucket to be +inf + // buckets[len(buckets)-1].upperBound = math.Inf(+1) + + return bucketQuantile(q, buckets) +} + +// Average computes histogram's average value +func (hist *Histogram) Average() float64 { + return *hist.SampleSum / float64(*hist.SampleCount) +} + +// Clear clears all fields of the wrapped histogram +func (hist *Histogram) Clear() { + if hist.SampleCount != nil { + *hist.SampleCount = 0 + } + if hist.SampleSum != nil { + *hist.SampleSum = 0 + } + for _, b := range hist.Bucket { + if b.CumulativeCount != nil { + *b.CumulativeCount = 0 + } + } +} + +// Validate makes sure the wrapped histogram has all necessary fields set and with valid values. +func (hist *Histogram) Validate() error { + if hist.SampleCount == nil || *hist.SampleCount == 0 { + return fmt.Errorf("nil or empty histogram SampleCount") + } + + if hist.SampleSum == nil || *hist.SampleSum == 0 { + return fmt.Errorf("nil or empty histogram SampleSum") + } + + for _, bckt := range hist.Bucket { + if bckt == nil { + return fmt.Errorf("empty histogram bucket") + } + if bckt.UpperBound == nil || *bckt.UpperBound < 0 { + return fmt.Errorf("nil or negative histogram bucket UpperBound") + } + } + + return nil +} diff --git a/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go b/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go new file mode 100644 index 00000000000..f44f559abbb --- /dev/null +++ b/staging/src/k8s.io/component-base/metrics/testutil/metrics_test.go @@ -0,0 +1,255 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testutil + +import ( + "fmt" + "testing" + + "k8s.io/utils/pointer" + + dto "github.com/prometheus/client_model/go" +) + +func samples2Histogram(samples []float64, upperBounds []float64) Histogram { + histogram := dto.Histogram{ + SampleCount: uint64Ptr(0), + SampleSum: pointer.Float64Ptr(0.0), + } + + for _, ub := range upperBounds { + histogram.Bucket = append(histogram.Bucket, &dto.Bucket{ + CumulativeCount: uint64Ptr(0), + UpperBound: pointer.Float64Ptr(ub), + }) + } + + for _, sample := range samples { + for i, bucket := range histogram.Bucket { + if sample < *bucket.UpperBound { + *histogram.Bucket[i].CumulativeCount++ + } + } + *histogram.SampleCount++ + *histogram.SampleSum += sample + } + return Histogram{ + &histogram, + } +} + +func TestHistogramQuantile(t *testing.T) { + tests := []struct { + samples []float64 + bounds []float64 + q50 float64 + q90 float64 + q99 float64 + }{ + { + // repeating numbers + samples: []float64{0.5, 0.5, 0.5, 0.5, 1.5, 1.5, 1.5, 1.5, 3, 3, 3, 3, 6, 6, 6, 6}, + bounds: []float64{1, 2, 4, 8}, + q50: 2, + q90: 6.4, + q99: 7.84, + }, + { + // random numbers + samples: []float64{11, 67, 61, 21, 40, 36, 52, 63, 8, 3, 67, 35, 61, 1, 36, 58}, + bounds: []float64{10, 20, 40, 80}, + q50: 40, + q90: 72, + q99: 79.2, + }, + { + // the last bucket is empty + samples: []float64{6, 34, 30, 10, 20, 18, 26, 31, 4, 2, 33, 17, 30, 1, 18, 29}, + bounds: []float64{10, 20, 40, 80}, + q50: 20, + q90: 36, + q99: 39.6, + }, + } + + for _, test := range tests { + h := samples2Histogram(test.samples, test.bounds) + q50 := h.Quantile(0.5) + q90 := h.Quantile(0.9) + q99 := h.Quantile(0.99) + q999999 := h.Quantile(0.999999) + + if q50 != test.q50 { + t.Errorf("Expected q50 to be %v, got %v instead", test.q50, q50) + } + if q90 != test.q90 { + t.Errorf("Expected q90 to be %v, got %v instead", test.q90, q90) + } + if q99 != test.q99 { + t.Errorf("Expected q99 to be %v, got %v instead", test.q99, q99) + } + lastUpperBound := test.bounds[len(test.bounds)-1] + if !(q999999 < lastUpperBound) { + t.Errorf("Expected q999999 to be less than %v, got %v instead", lastUpperBound, q999999) + } + } +} + +func TestHistogramClear(t *testing.T) { + samples := []float64{0.5, 0.5, 0.5, 0.5, 1.5, 1.5, 1.5, 1.5, 3, 3, 3, 3, 6, 6, 6, 6} + bounds := []float64{1, 2, 4, 8} + h := samples2Histogram(samples, bounds) + + if *h.SampleCount == 0 { + t.Errorf("Expected histogram .SampleCount to be non-zero") + } + if *h.SampleSum == 0 { + t.Errorf("Expected histogram .SampleSum to be non-zero") + } + + for _, b := range h.Bucket { + if b.CumulativeCount != nil { + if *b.CumulativeCount == 0 { + t.Errorf("Expected histogram bucket to have non-zero comulative count") + } + } + } + + h.Clear() + + if *h.SampleCount != 0 { + t.Errorf("Expected histogram .SampleCount to be zero, have %v instead", *h.SampleCount) + } + + if *h.SampleSum != 0 { + t.Errorf("Expected histogram .SampleSum to be zero, have %v instead", *h.SampleSum) + } + + for _, b := range h.Bucket { + if b.CumulativeCount != nil { + if *b.CumulativeCount != 0 { + t.Errorf("Expected histogram bucket to have zero comulative count, have %v instead", *b.CumulativeCount) + } + } + if b.UpperBound != nil { + *b.UpperBound = 0 + } + } +} + +func TestHistogramValidate(t *testing.T) { + tests := []struct { + name string + h Histogram + err error + }{ + { + name: "nil SampleCount", + h: Histogram{ + &dto.Histogram{}, + }, + err: fmt.Errorf("nil or empty histogram SampleCount"), + }, + { + name: "empty SampleCount", + h: Histogram{ + &dto.Histogram{ + SampleCount: uint64Ptr(0), + }, + }, + err: fmt.Errorf("nil or empty histogram SampleCount"), + }, + { + name: "nil SampleSum", + h: Histogram{ + &dto.Histogram{ + SampleCount: uint64Ptr(1), + }, + }, + err: fmt.Errorf("nil or empty histogram SampleSum"), + }, + { + name: "empty SampleSum", + h: Histogram{ + &dto.Histogram{ + SampleCount: uint64Ptr(1), + SampleSum: pointer.Float64Ptr(0.0), + }, + }, + err: fmt.Errorf("nil or empty histogram SampleSum"), + }, + { + name: "nil bucket", + h: Histogram{ + &dto.Histogram{ + SampleCount: uint64Ptr(1), + SampleSum: pointer.Float64Ptr(1.0), + Bucket: []*dto.Bucket{ + nil, + }, + }, + }, + err: fmt.Errorf("empty histogram bucket"), + }, + { + name: "nil bucket UpperBound", + h: Histogram{ + &dto.Histogram{ + SampleCount: uint64Ptr(1), + SampleSum: pointer.Float64Ptr(1.0), + Bucket: []*dto.Bucket{ + {}, + }, + }, + }, + err: fmt.Errorf("nil or negative histogram bucket UpperBound"), + }, + { + name: "negative bucket UpperBound", + h: Histogram{ + &dto.Histogram{ + SampleCount: uint64Ptr(1), + SampleSum: pointer.Float64Ptr(1.0), + Bucket: []*dto.Bucket{ + {UpperBound: pointer.Float64Ptr(-1.0)}, + }, + }, + }, + err: fmt.Errorf("nil or negative histogram bucket UpperBound"), + }, + { + name: "valid histogram", + h: samples2Histogram( + []float64{0.5, 0.5, 0.5, 0.5, 1.5, 1.5, 1.5, 1.5, 3, 3, 3, 3, 6, 6, 6, 6}, + []float64{1, 2, 4, 8}, + ), + }, + } + + for _, test := range tests { + err := test.h.Validate() + if test.err != nil { + if err == nil || err.Error() != test.err.Error() { + t.Errorf("Expected %q error, got %q instead", test.err, err) + } + } else { + if err != nil { + t.Errorf("Expected error to be nil, got %q instead", err) + } + } + } +} diff --git a/test/integration/scheduler_perf/BUILD b/test/integration/scheduler_perf/BUILD index 2c266c0974c..12d846e2b8e 100644 --- a/test/integration/scheduler_perf/BUILD +++ b/test/integration/scheduler_perf/BUILD @@ -20,7 +20,10 @@ go_library( "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", + "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", "//test/integration/util:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index de47fcd50c7..898cdf0a914 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -38,6 +38,15 @@ const ( configFile = "config/performance-config.yaml" ) +var ( + defaultMetrics = []string{ + "scheduler_scheduling_algorithm_predicate_evaluation_seconds", + "scheduler_scheduling_algorithm_priority_evaluation_seconds", + "scheduler_binding_duration_seconds", + "scheduler_e2e_scheduling_duration_seconds", + } +) + // testCase configures a test case to run the scheduler performance test. Users should be able to // provide this via a YAML file. // @@ -92,6 +101,7 @@ type testParams struct { } func BenchmarkPerfScheduling(b *testing.B) { + dataItems := DataItems{Version: "v1"} tests := getSimpleTestCases(configFile) for _, test := range tests { @@ -100,12 +110,15 @@ func BenchmarkPerfScheduling(b *testing.B) { for feature, flag := range test.FeatureGates { defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)() } - perfScheduling(test, b) + dataItems.DataItems = append(dataItems.DataItems, perfScheduling(test, b)...) }) } + if err := dataItems2JSONFile(dataItems, b.Name()); err != nil { + klog.Fatalf("%v: unable to write measured data: %v", b.Name(), err) + } } -func perfScheduling(test testCase, b *testing.B) { +func perfScheduling(test testCase, b *testing.B) []DataItem { var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{} if test.Nodes.NodeAllocatableStrategy != nil { nodeStrategy = test.Nodes.NodeAllocatableStrategy @@ -180,15 +193,45 @@ func perfScheduling(test testCase, b *testing.B) { // start benchmark b.ResetTimer() + + // Start measuring throughput + stopCh := make(chan struct{}) + throughputCollector := newThroughputCollector(podInformer) + go throughputCollector.run(stopCh) + + // Scheduling the main workload config = testutils.NewTestPodCreatorConfig() config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy) podCreator = testutils.NewTestPodCreator(clientset, config) podCreator.CreatePods() <-completedCh + close(stopCh) // Note: without this line we're taking the overhead of defer() into account. b.StopTimer() + + setNameLabel := func(dataItem *DataItem) DataItem { + if dataItem.Labels == nil { + dataItem.Labels = map[string]string{} + } + dataItem.Labels["Name"] = b.Name() + return *dataItem + } + + dataItems := []DataItem{ + setNameLabel(throughputCollector.collect()), + } + + for _, metric := range defaultMetrics { + dataItem := newMetricsCollector(metric).collect() + if dataItem == nil { + continue + } + dataItems = append(dataItems, setNameLabel(dataItem)) + } + + return dataItems } func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy { diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index fce5470ea42..fb8ea837bab 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -17,15 +17,34 @@ limitations under the License. package benchmark import ( + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "math" + "path" + "sort" + "time" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" + "k8s.io/klog" "k8s.io/kubernetes/test/integration/util" ) +const ( + dateFormat = "2006-01-02T15:04:05Z" + throughputSampleFrequency = time.Second +) + +var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard") + // mustSetupScheduler starts the following components: // - k8s api server (a.k.a. master) // - scheduler @@ -66,3 +85,145 @@ func getScheduledPods(podInformer coreinformers.PodInformer) ([]*v1.Pod, error) } return scheduled, nil } + +// DataItem is the data point. +type DataItem struct { + // Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice + // that all data items with the same label combination should have the same buckets. + Data map[string]float64 `json:"data"` + // Unit is the data unit. Notice that all data items with the same label combination + // should have the same unit. + Unit string `json:"unit"` + // Labels is the labels of the data item. + Labels map[string]string `json:"labels,omitempty"` +} + +// DataItems is the data point set. It is the struct that perf dashboard expects. +type DataItems struct { + Version string `json:"version"` + DataItems []DataItem `json:"dataItems"` +} + +func dataItems2JSONFile(dataItems DataItems, namePrefix string) error { + b, err := json.Marshal(dataItems) + if err != nil { + return err + } + + destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat)) + if *dataItemsDir != "" { + destFile = path.Join(*dataItemsDir, destFile) + } + + return ioutil.WriteFile(destFile, b, 0644) +} + +// metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint. +// Currently only Histrogram metrics are supported. +type metricsCollector struct { + metric string +} + +func newMetricsCollector(metric string) *metricsCollector { + return &metricsCollector{ + metric: metric, + } +} + +func (pc *metricsCollector) collect() *DataItem { + hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, pc.metric) + if err != nil { + klog.Error(err) + return nil + } + + if err := hist.Validate(); err != nil { + klog.Error(err) + return nil + } + + q50 := hist.Quantile(0.50) + q90 := hist.Quantile(0.90) + q99 := hist.Quantile(0.95) + avg := hist.Average() + + // clear the metrics so that next test always starts with empty prometheus + // metrics (since the metrics are shared among all tests run inside the same binary) + hist.Clear() + + msFactor := float64(time.Second) / float64(time.Millisecond) + + return &DataItem{ + Labels: map[string]string{ + "Metric": pc.metric, + }, + Data: map[string]float64{ + "Perc50": q50 * msFactor, + "Perc90": q90 * msFactor, + "Perc99": q99 * msFactor, + "Average": avg * msFactor, + }, + Unit: "ms", + } +} + +type throughputCollector struct { + podInformer coreinformers.PodInformer + schedulingThroughputs []float64 +} + +func newThroughputCollector(podInformer coreinformers.PodInformer) *throughputCollector { + return &throughputCollector{ + podInformer: podInformer, + } +} + +func (tc *throughputCollector) run(stopCh chan struct{}) { + podsScheduled, err := getScheduledPods(tc.podInformer) + if err != nil { + klog.Fatalf("%v", err) + } + lastScheduledCount := len(podsScheduled) + for { + select { + case <-stopCh: + return + case <-time.After(throughputSampleFrequency): + podsScheduled, err := getScheduledPods(tc.podInformer) + if err != nil { + klog.Fatalf("%v", err) + } + + scheduled := len(podsScheduled) + samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second) + throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds + tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput) + lastScheduledCount = scheduled + + klog.Infof("%d pods scheduled", lastScheduledCount) + } + } +} + +func (tc *throughputCollector) collect() *DataItem { + throughputSummary := &DataItem{} + if length := len(tc.schedulingThroughputs); length > 0 { + sort.Float64s(tc.schedulingThroughputs) + sum := 0.0 + for i := range tc.schedulingThroughputs { + sum += tc.schedulingThroughputs[i] + } + + throughputSummary.Labels = map[string]string{ + "Metric": "SchedulingThroughput", + } + throughputSummary.Data = map[string]float64{ + "Average": sum / float64(length), + "Perc50": tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1], + "Perc90": tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1], + "Perc99": tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1], + } + throughputSummary.Unit = "pods/s" + } + return throughputSummary +}