Make MetricCollector configurable for scheduler benchmark tests

This commit is contained in:
Cong Liu 2020-02-15 18:41:17 -08:00
parent 3d09d25292
commit 7f56c753b3
2 changed files with 126 additions and 111 deletions

View File

@ -19,13 +19,13 @@ package benchmark
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"sync/atomic"
"testing" "testing"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/featuregate" "k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog" "k8s.io/klog"
@ -39,11 +39,13 @@ const (
) )
var ( var (
defaultMetrics = []string{ defaultMetricsCollectorConfig = metricsCollectorConfig{
"scheduler_scheduling_algorithm_predicate_evaluation_seconds", Metrics: []string{
"scheduler_scheduling_algorithm_priority_evaluation_seconds", "scheduler_scheduling_algorithm_predicate_evaluation_seconds",
"scheduler_binding_duration_seconds", "scheduler_scheduling_algorithm_priority_evaluation_seconds",
"scheduler_e2e_scheduling_duration_seconds", "scheduler_binding_duration_seconds",
"scheduler_e2e_scheduling_duration_seconds",
},
} }
) )
@ -52,8 +54,8 @@ var (
// //
// It specifies nodes and pods in the cluster before running the test. It also specifies the pods to // It specifies nodes and pods in the cluster before running the test. It also specifies the pods to
// schedule during the test. The config can be as simple as just specify number of nodes/pods, where // schedule during the test. The config can be as simple as just specify number of nodes/pods, where
// default spec will be applied. It also allows the user to specify a pod spec template for more compicated // default spec will be applied. It also allows the user to specify a pod spec template for more
// test cases. // complicated test cases.
// //
// It also specifies the metrics to be collected after the test. If nothing is specified, default metrics // It also specifies the metrics to be collected after the test. If nothing is specified, default metrics
// such as scheduling throughput and latencies will be collected. // such as scheduling throughput and latencies will be collected.
@ -68,6 +70,8 @@ type testCase struct {
PodsToSchedule podCase PodsToSchedule podCase
// optional, feature gates to set before running the test // optional, feature gates to set before running the test
FeatureGates map[featuregate.Feature]bool FeatureGates map[featuregate.Feature]bool
// optional, replaces default defaultMetricsCollectorConfig if supplied.
MetricsCollectorConfig *metricsCollectorConfig
} }
type nodeCase struct { type nodeCase struct {
@ -100,6 +104,11 @@ type testParams struct {
NumPodsToSchedule int NumPodsToSchedule int
} }
type testDataCollector interface {
run(stopCh chan struct{})
collect() []DataItem
}
func BenchmarkPerfScheduling(b *testing.B) { func BenchmarkPerfScheduling(b *testing.B) {
dataItems := DataItems{Version: "v1"} dataItems := DataItems{Version: "v1"}
tests := getSimpleTestCases(configFile) tests := getSimpleTestCases(configFile)
@ -119,119 +128,97 @@ func BenchmarkPerfScheduling(b *testing.B) {
} }
func perfScheduling(test testCase, b *testing.B) []DataItem { func perfScheduling(test testCase, b *testing.B) []DataItem {
var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
if test.Nodes.NodeAllocatableStrategy != nil {
nodeStrategy = test.Nodes.NodeAllocatableStrategy
} else if test.Nodes.LabelNodePrepareStrategy != nil {
nodeStrategy = test.Nodes.LabelNodePrepareStrategy
} else if test.Nodes.UniqueNodeLabelStrategy != nil {
nodeStrategy = test.Nodes.UniqueNodeLabelStrategy
}
setupPodStrategy := getPodStrategy(test.InitPods)
testPodStrategy := getPodStrategy(test.PodsToSchedule)
var nodeSpec *v1.Node
if test.Nodes.NodeTemplatePath != nil {
nodeSpec = getNodeSpecFromFile(test.Nodes.NodeTemplatePath)
}
finalFunc, podInformer, clientset := mustSetupScheduler() finalFunc, podInformer, clientset := mustSetupScheduler()
defer finalFunc() defer finalFunc()
var nodePreparer testutils.TestNodePreparer nodePreparer := getNodePreparer(test.Nodes, clientset)
if nodeSpec != nil {
nodePreparer = framework.NewIntegrationTestNodePreparerWithNodeSpec(
clientset,
[]testutils.CountToStrategy{{Count: test.Nodes.Num, Strategy: nodeStrategy}},
nodeSpec,
)
} else {
nodePreparer = framework.NewIntegrationTestNodePreparer(
clientset,
[]testutils.CountToStrategy{{Count: test.Nodes.Num, Strategy: nodeStrategy}},
"scheduler-perf-",
)
}
if err := nodePreparer.PrepareNodes(); err != nil { if err := nodePreparer.PrepareNodes(); err != nil {
klog.Fatalf("%v", err) klog.Fatalf("%v", err)
} }
defer nodePreparer.CleanupNodes() defer nodePreparer.CleanupNodes()
config := testutils.NewTestPodCreatorConfig() createPods(setupNamespace, test.InitPods, clientset)
config.AddStrategy(setupNamespace, test.InitPods.Num, setupPodStrategy) waitNumPodsScheduled(test.InitPods.Num, podInformer)
podCreator := testutils.NewTestPodCreator(clientset, config)
podCreator.CreatePods()
// start benchmark
b.ResetTimer()
// Start test data collectors.
stopCh := make(chan struct{})
collectors := getTestDataCollectors(test, podInformer, b)
for _, collector := range collectors {
go collector.run(stopCh)
}
// Schedule the main workload
createPods(testNamespace, test.PodsToSchedule, clientset)
waitNumPodsScheduled(test.InitPods.Num+test.PodsToSchedule.Num, podInformer)
close(stopCh)
// Note: without this line we're taking the overhead of defer() into account.
b.StopTimer()
var dataItems []DataItem
for _, collector := range collectors {
dataItems = append(dataItems, collector.collect()...)
}
return dataItems
}
func waitNumPodsScheduled(num int, podInformer coreinformers.PodInformer) {
for { for {
scheduled, err := getScheduledPods(podInformer) scheduled, err := getScheduledPods(podInformer)
if err != nil { if err != nil {
klog.Fatalf("%v", err) klog.Fatalf("%v", err)
} }
if len(scheduled) >= test.InitPods.Num { if len(scheduled) >= num {
break break
} }
klog.Infof("got %d existing pods, required: %d", len(scheduled), test.InitPods.Num) klog.Infof("got %d existing pods, required: %d", len(scheduled), num)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
}
scheduled := int32(0) func getTestDataCollectors(tc testCase, podInformer coreinformers.PodInformer, b *testing.B) []testDataCollector {
completedCh := make(chan struct{}) collectors := []testDataCollector{newThroughputCollector(podInformer, map[string]string{"Name": b.Name()})}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ metricsCollectorConfig := defaultMetricsCollectorConfig
UpdateFunc: func(old, cur interface{}) { if tc.MetricsCollectorConfig != nil {
curPod := cur.(*v1.Pod) metricsCollectorConfig = *tc.MetricsCollectorConfig
oldPod := old.(*v1.Pod) }
collectors = append(collectors, newMetricsCollector(metricsCollectorConfig, map[string]string{"Name": b.Name()}))
return collectors
}
if len(oldPod.Spec.NodeName) == 0 && len(curPod.Spec.NodeName) > 0 { func getNodePreparer(nc nodeCase, clientset clientset.Interface) testutils.TestNodePreparer {
if atomic.AddInt32(&scheduled, 1) >= int32(test.PodsToSchedule.Num) { var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
completedCh <- struct{}{} if nc.NodeAllocatableStrategy != nil {
} nodeStrategy = nc.NodeAllocatableStrategy
} } else if nc.LabelNodePrepareStrategy != nil {
}, nodeStrategy = nc.LabelNodePrepareStrategy
}) } else if nc.UniqueNodeLabelStrategy != nil {
nodeStrategy = nc.UniqueNodeLabelStrategy
}
// start benchmark if nc.NodeTemplatePath != nil {
b.ResetTimer() return framework.NewIntegrationTestNodePreparerWithNodeSpec(
clientset,
[]testutils.CountToStrategy{{Count: nc.Num, Strategy: nodeStrategy}},
getNodeSpecFromFile(nc.NodeTemplatePath),
)
}
return framework.NewIntegrationTestNodePreparer(
clientset,
[]testutils.CountToStrategy{{Count: nc.Num, Strategy: nodeStrategy}},
"scheduler-perf-",
)
}
// Start measuring throughput func createPods(ns string, pc podCase, clientset clientset.Interface) {
stopCh := make(chan struct{}) strategy := getPodStrategy(pc)
throughputCollector := newThroughputCollector(podInformer) config := testutils.NewTestPodCreatorConfig()
go throughputCollector.run(stopCh) config.AddStrategy(ns, pc.Num, strategy)
podCreator := testutils.NewTestPodCreator(clientset, config)
// Scheduling the main workload
config = testutils.NewTestPodCreatorConfig()
config.AddStrategy(testNamespace, test.PodsToSchedule.Num, testPodStrategy)
podCreator = testutils.NewTestPodCreator(clientset, config)
podCreator.CreatePods() podCreator.CreatePods()
<-completedCh
close(stopCh)
// Note: without this line we're taking the overhead of defer() into account.
b.StopTimer()
setNameLabel := func(dataItem *DataItem) DataItem {
if dataItem.Labels == nil {
dataItem.Labels = map[string]string{}
}
dataItem.Labels["Name"] = b.Name()
return *dataItem
}
dataItems := []DataItem{
setNameLabel(throughputCollector.collect()),
}
for _, metric := range defaultMetrics {
dataItem := newMetricsCollector(metric).collect()
if dataItem == nil {
continue
}
dataItems = append(dataItems, setNameLabel(dataItem))
}
return dataItems
} }
func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy { func getPodStrategy(pc podCase) testutils.TestPodCreateStrategy {

View File

@ -118,20 +118,42 @@ func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
return ioutil.WriteFile(destFile, b, 0644) return ioutil.WriteFile(destFile, b, 0644)
} }
// metricsCollectorConfig is the config to be marshalled to YAML config file.
type metricsCollectorConfig struct {
Metrics []string
}
// metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint. // metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
// Currently only Histrogram metrics are supported. // Currently only Histrogram metrics are supported.
type metricsCollector struct { type metricsCollector struct {
metric string metricsCollectorConfig
labels map[string]string
} }
func newMetricsCollector(metric string) *metricsCollector { func newMetricsCollector(config metricsCollectorConfig, labels map[string]string) *metricsCollector {
return &metricsCollector{ return &metricsCollector{
metric: metric, metricsCollectorConfig: config,
labels: labels,
} }
} }
func (pc *metricsCollector) collect() *DataItem { func (*metricsCollector) run(stopCh chan struct{}) {
hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, pc.metric) // metricCollector doesn't need to start before the tests, so nothing to do here.
}
func (pc *metricsCollector) collect() []DataItem {
var dataItems []DataItem
for _, metric := range pc.Metrics {
dataItem := collectHistogram(metric, pc.labels)
if dataItem != nil {
dataItems = append(dataItems, *dataItem)
}
}
return dataItems
}
func collectHistogram(metric string, labels map[string]string) *DataItem {
hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, metric)
if err != nil { if err != nil {
klog.Error(err) klog.Error(err)
return nil return nil
@ -153,10 +175,13 @@ func (pc *metricsCollector) collect() *DataItem {
msFactor := float64(time.Second) / float64(time.Millisecond) msFactor := float64(time.Second) / float64(time.Millisecond)
// Copy labels and add "Metric" label for this metric.
labelMap := map[string]string{"Metric": metric}
for k, v := range labels {
labelMap[k] = v
}
return &DataItem{ return &DataItem{
Labels: map[string]string{ Labels: labelMap,
"Metric": pc.metric,
},
Data: map[string]float64{ Data: map[string]float64{
"Perc50": q50 * msFactor, "Perc50": q50 * msFactor,
"Perc90": q90 * msFactor, "Perc90": q90 * msFactor,
@ -170,11 +195,13 @@ func (pc *metricsCollector) collect() *DataItem {
type throughputCollector struct { type throughputCollector struct {
podInformer coreinformers.PodInformer podInformer coreinformers.PodInformer
schedulingThroughputs []float64 schedulingThroughputs []float64
labels map[string]string
} }
func newThroughputCollector(podInformer coreinformers.PodInformer) *throughputCollector { func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string) *throughputCollector {
return &throughputCollector{ return &throughputCollector{
podInformer: podInformer, podInformer: podInformer,
labels: labels,
} }
} }
@ -205,8 +232,8 @@ func (tc *throughputCollector) run(stopCh chan struct{}) {
} }
} }
func (tc *throughputCollector) collect() *DataItem { func (tc *throughputCollector) collect() []DataItem {
throughputSummary := &DataItem{} throughputSummary := DataItem{Labels: tc.labels}
if length := len(tc.schedulingThroughputs); length > 0 { if length := len(tc.schedulingThroughputs); length > 0 {
sort.Float64s(tc.schedulingThroughputs) sort.Float64s(tc.schedulingThroughputs)
sum := 0.0 sum := 0.0
@ -225,5 +252,6 @@ func (tc *throughputCollector) collect() *DataItem {
} }
throughputSummary.Unit = "pods/s" throughputSummary.Unit = "pods/s"
} }
return throughputSummary
return []DataItem{throughputSummary}
} }