mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Collect some of scheduling metrics and scheduling throughput
In addition to getting overall performance measurements from golang benchmark, collect metrics that provides information about insides of the scheduler itself. This is a first step towards improving what we collect about the scheduler. Metrics in question: - scheduler_scheduling_algorithm_predicate_evaluation_seconds - scheduler_scheduling_algorithm_priority_evaluation_seconds - scheduler_binding_duration_seconds - scheduler_e2e_scheduling_duration_seconds Scheduling throughput is computed on the fly inside perfScheduling.
This commit is contained in:
parent
db1990f48b
commit
8a1c4a5a88
@ -20,7 +20,10 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
|
||||
"//test/integration/util:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_model/go:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -38,6 +38,15 @@ const (
|
||||
configFile = "config/performance-config.yaml"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultMetrics = []string{
|
||||
"scheduler_scheduling_algorithm_predicate_evaluation_seconds",
|
||||
"scheduler_scheduling_algorithm_priority_evaluation_seconds",
|
||||
"scheduler_binding_duration_seconds",
|
||||
"scheduler_e2e_scheduling_duration_seconds",
|
||||
}
|
||||
)
|
||||
|
||||
// testCase configures a test case to run the scheduler performance test. Users should be able to
|
||||
// provide this via a YAML file.
|
||||
//
|
||||
@ -92,6 +101,7 @@ type testParams struct {
|
||||
}
|
||||
|
||||
func BenchmarkPerfScheduling(b *testing.B) {
|
||||
dataItems := DataItems{Version: "v1"}
|
||||
tests := getSimpleTestCases(configFile)
|
||||
|
||||
for _, test := range tests {
|
||||
@ -100,12 +110,15 @@ func BenchmarkPerfScheduling(b *testing.B) {
|
||||
for feature, flag := range test.FeatureGates {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
|
||||
}
|
||||
perfScheduling(test, b)
|
||||
dataItems.DataItems = append(dataItems.DataItems, perfScheduling(test, b)...)
|
||||
})
|
||||
}
|
||||
if err := dataItems2JSONFile(dataItems, b.Name()); err != nil {
|
||||
klog.Fatalf("%v: unable to write measured data: %v", b.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
func perfScheduling(test testCase, b *testing.B) {
|
||||
func perfScheduling(test testCase, b *testing.B) []DataItem {
|
||||
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
|
||||
if test.Nodes.NodeAllocatableStrategy != nil {
|
||||
nodeStrategy = test.Nodes.NodeAllocatableStrategy
|
||||
@ -180,15 +193,45 @@ func perfScheduling(test testCase, b *testing.B) {
|
||||
|
||||
// start benchmark
|
||||
b.ResetTimer()
|
||||
|
||||
// Start measuring throughput
|
||||
stopCh := make(chan struct{})
|
||||
throughputCollector := newThroughputCollector(podInformer)
|
||||
go throughputCollector.run(stopCh)
|
||||
|
||||
// Scheduling the main workload
|
||||
config = testutils.NewTestPodCreatorConfig()
|
||||
config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy)
|
||||
podCreator = testutils.NewTestPodCreator(clientset, config)
|
||||
podCreator.CreatePods()
|
||||
|
||||
<-completedCh
|
||||
close(stopCh)
|
||||
|
||||
// Note: without this line we're taking the overhead of defer() into account.
|
||||
b.StopTimer()
|
||||
|
||||
setNameLabel := func(dataItem *DataItem) DataItem {
|
||||
if dataItem.Labels == nil {
|
||||
dataItem.Labels = map[string]string{}
|
||||
}
|
||||
dataItem.Labels["Name"] = b.Name()
|
||||
return *dataItem
|
||||
}
|
||||
|
||||
dataItems := []DataItem{
|
||||
setNameLabel(throughputCollector.collect()),
|
||||
}
|
||||
|
||||
for _, metric := range defaultMetrics {
|
||||
dataItem := newPrometheusCollector(metric).collect()
|
||||
if dataItem == nil {
|
||||
continue
|
||||
}
|
||||
dataItems = append(dataItems, setNameLabel(dataItem))
|
||||
}
|
||||
|
||||
return dataItems
|
||||
}
|
||||
|
||||
func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy {
|
||||
|
@ -17,15 +17,34 @@ limitations under the License.
|
||||
package benchmark
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"path"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/test/integration/util"
|
||||
)
|
||||
|
||||
const (
|
||||
dateFormat = "2006-01-02T15:04:05Z"
|
||||
throughputSampleFrequency = time.Second
|
||||
)
|
||||
|
||||
var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
|
||||
|
||||
// mustSetupScheduler starts the following components:
|
||||
// - k8s api server (a.k.a. master)
|
||||
// - scheduler
|
||||
@ -66,3 +85,250 @@ func getScheduledPods(podInformer coreinformers.PodInformer) ([]*v1.Pod, error)
|
||||
}
|
||||
return scheduled, nil
|
||||
}
|
||||
|
||||
// DataItem is the data point.
|
||||
type DataItem struct {
|
||||
// Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice
|
||||
// that all data items with the same label combination should have the same buckets.
|
||||
Data map[string]float64 `json:"data"`
|
||||
// Unit is the data unit. Notice that all data items with the same label combination
|
||||
// should have the same unit.
|
||||
Unit string `json:"unit"`
|
||||
// Labels is the labels of the data item.
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
}
|
||||
|
||||
// DataItems is the data point set. It is the struct that perf dashboard expects.
|
||||
type DataItems struct {
|
||||
Version string `json:"version"`
|
||||
DataItems []DataItem `json:"dataItems"`
|
||||
}
|
||||
|
||||
func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
|
||||
b, err := json.Marshal(dataItems)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat))
|
||||
if *dataItemsDir != "" {
|
||||
destFile = path.Join(*dataItemsDir, destFile)
|
||||
}
|
||||
|
||||
return ioutil.WriteFile(destFile, b, 0644)
|
||||
}
|
||||
|
||||
// prometheusCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
|
||||
// Currently only Histrogram metrics are supported.
|
||||
type prometheusCollector struct {
|
||||
metric string
|
||||
cache *dto.MetricFamily
|
||||
}
|
||||
|
||||
func newPrometheusCollector(metric string) *prometheusCollector {
|
||||
return &prometheusCollector{
|
||||
metric: metric,
|
||||
}
|
||||
}
|
||||
|
||||
func (pc *prometheusCollector) collect() *DataItem {
|
||||
var metricFamily *dto.MetricFamily
|
||||
m, err := legacyregistry.DefaultGatherer.Gather()
|
||||
if err != nil {
|
||||
klog.Error(err)
|
||||
return nil
|
||||
}
|
||||
for _, mFamily := range m {
|
||||
if mFamily.Name != nil && *mFamily.Name == pc.metric {
|
||||
metricFamily = mFamily
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if metricFamily == nil {
|
||||
klog.Infof("Metric %q not found", pc.metric)
|
||||
return nil
|
||||
}
|
||||
|
||||
if metricFamily.GetMetric() == nil {
|
||||
klog.Infof("Metric %q is empty", pc.metric)
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(metricFamily.GetMetric()) == 0 {
|
||||
klog.Infof("Metric %q is empty", pc.metric)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Histograms are stored under the first index (based on observation).
|
||||
// Given there's only one histogram registered per each metric name, accessaing
|
||||
// the first index is sufficient.
|
||||
dataItem := pc.promHist2Summary(metricFamily.GetMetric()[0].GetHistogram())
|
||||
if dataItem.Data == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// clear the metrics so that next test always starts with empty prometheus
|
||||
// metrics (since the metrics are shared among all tests run inside the same binary)
|
||||
clearPromHistogram(metricFamily.GetMetric()[0].GetHistogram())
|
||||
|
||||
return dataItem
|
||||
}
|
||||
|
||||
// Bucket of a histogram
|
||||
type bucket struct {
|
||||
upperBound float64
|
||||
count float64
|
||||
}
|
||||
|
||||
func bucketQuantile(q float64, buckets []bucket) float64 {
|
||||
if q < 0 {
|
||||
return math.Inf(-1)
|
||||
}
|
||||
if q > 1 {
|
||||
return math.Inf(+1)
|
||||
}
|
||||
|
||||
if len(buckets) < 2 {
|
||||
return math.NaN()
|
||||
}
|
||||
|
||||
rank := q * buckets[len(buckets)-1].count
|
||||
b := sort.Search(len(buckets)-1, func(i int) bool { return buckets[i].count >= rank })
|
||||
|
||||
if b == 0 {
|
||||
return buckets[0].upperBound * (rank / buckets[0].count)
|
||||
}
|
||||
|
||||
// linear approximation of b-th bucket
|
||||
brank := rank - buckets[b-1].count
|
||||
bSize := buckets[b].upperBound - buckets[b-1].upperBound
|
||||
bCount := buckets[b].count - buckets[b-1].count
|
||||
|
||||
return buckets[b-1].upperBound + bSize*(brank/bCount)
|
||||
}
|
||||
|
||||
func (pc *prometheusCollector) promHist2Summary(hist *dto.Histogram) *DataItem {
|
||||
buckets := []bucket{}
|
||||
|
||||
if hist.SampleCount == nil || *hist.SampleCount == 0 {
|
||||
return &DataItem{}
|
||||
}
|
||||
|
||||
if hist.SampleSum == nil || *hist.SampleSum == 0 {
|
||||
return &DataItem{}
|
||||
}
|
||||
|
||||
for _, bckt := range hist.Bucket {
|
||||
if bckt == nil {
|
||||
return &DataItem{}
|
||||
}
|
||||
if bckt.UpperBound == nil || *bckt.UpperBound < 0 {
|
||||
return &DataItem{}
|
||||
}
|
||||
buckets = append(buckets, bucket{
|
||||
count: float64(*bckt.CumulativeCount),
|
||||
upperBound: *bckt.UpperBound,
|
||||
})
|
||||
}
|
||||
|
||||
// bucketQuantile expects the upper bound of the last bucket to be +inf
|
||||
buckets[len(buckets)-1].upperBound = math.Inf(+1)
|
||||
|
||||
q50 := bucketQuantile(0.50, buckets)
|
||||
q90 := bucketQuantile(0.90, buckets)
|
||||
q99 := bucketQuantile(0.95, buckets)
|
||||
|
||||
msFactor := float64(time.Second) / float64(time.Millisecond)
|
||||
|
||||
return &DataItem{
|
||||
Labels: map[string]string{
|
||||
"Metric": pc.metric,
|
||||
},
|
||||
Data: map[string]float64{
|
||||
"Perc50": q50 * msFactor,
|
||||
"Perc90": q90 * msFactor,
|
||||
"Perc99": q99 * msFactor,
|
||||
"Average": (*hist.SampleSum / float64(*hist.SampleCount)) * msFactor,
|
||||
},
|
||||
Unit: "ms",
|
||||
}
|
||||
}
|
||||
|
||||
func clearPromHistogram(hist *dto.Histogram) {
|
||||
if hist.SampleCount != nil {
|
||||
*hist.SampleCount = 0
|
||||
}
|
||||
if hist.SampleSum != nil {
|
||||
*hist.SampleSum = 0
|
||||
}
|
||||
for _, b := range hist.Bucket {
|
||||
if b.CumulativeCount != nil {
|
||||
*b.CumulativeCount = 0
|
||||
}
|
||||
if b.UpperBound != nil {
|
||||
*b.UpperBound = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type throughputCollector struct {
|
||||
podInformer coreinformers.PodInformer
|
||||
schedulingThroughputs []float64
|
||||
}
|
||||
|
||||
func newThroughputCollector(podInformer coreinformers.PodInformer) *throughputCollector {
|
||||
return &throughputCollector{
|
||||
podInformer: podInformer,
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *throughputCollector) run(stopCh chan struct{}) {
|
||||
podsScheduled, err := getScheduledPods(tc.podInformer)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
lastScheduledCount := len(podsScheduled)
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-time.After(throughputSampleFrequency):
|
||||
podsScheduled, err := getScheduledPods(tc.podInformer)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
scheduled := len(podsScheduled)
|
||||
samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second)
|
||||
throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds
|
||||
tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
|
||||
lastScheduledCount = scheduled
|
||||
|
||||
klog.Infof("%d pods scheduled", lastScheduledCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *throughputCollector) collect() *DataItem {
|
||||
throughputSummary := &DataItem{}
|
||||
if length := len(tc.schedulingThroughputs); length > 0 {
|
||||
sort.Float64s(tc.schedulingThroughputs)
|
||||
sum := 0.0
|
||||
for i := range tc.schedulingThroughputs {
|
||||
sum += tc.schedulingThroughputs[i]
|
||||
}
|
||||
|
||||
throughputSummary.Labels = map[string]string{
|
||||
"Metric": "SchedulingThroughput",
|
||||
}
|
||||
throughputSummary.Data = map[string]float64{
|
||||
"Average": sum / float64(length),
|
||||
"Perc50": tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1],
|
||||
"Perc90": tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1],
|
||||
"Perc99": tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1],
|
||||
}
|
||||
throughputSummary.Unit = "pods/s"
|
||||
}
|
||||
return throughputSummary
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user