Add separate ops for collecting metrics from multiple namespaces in scheduler_perf

This commit is contained in:
Maciej Skoczeń 2024-09-19 13:31:16 +00:00
parent e456fbfaa6
commit a273e5381a

View File

@ -83,6 +83,8 @@ const (
churnOpcode operationCode = "churn"
barrierOpcode operationCode = "barrier"
sleepOpcode operationCode = "sleep"
startCollectingMetricsOpcode operationCode = "startCollectingMetrics"
stopCollectingMetricsOpcode operationCode = "stopCollectingMetrics"
)
const (
@ -414,6 +416,8 @@ func (op *op) UnmarshalJSON(b []byte) error {
&churnOp{},
&barrierOp{},
&sleepOp{},
&startCollectingMetricsOp{},
&stopCollectingMetricsOp{},
// TODO(#94601): add a delete nodes op to simulate scaling behaviour?
}
var firstError error
@ -815,6 +819,58 @@ func (so sleepOp) patchParams(_ *workload) (realOp, error) {
return &so, nil
}
// startCollectingMetricsOp defines an op that starts metrics collectors.
// stopCollectingMetricsOp has to be used after this op to finish collecting.
type startCollectingMetricsOp struct {
// Must be "startCollectingMetrics".
Opcode operationCode
// Name appended to workload's name in results.
Name string
// Namespaces for which the scheduling throughput metric is calculated.
Namespaces []string
}
func (scm *startCollectingMetricsOp) isValid(_ bool) error {
if scm.Opcode != startCollectingMetricsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, startCollectingMetricsOpcode)
}
if len(scm.Namespaces) == 0 {
return fmt.Errorf("namespaces cannot be empty")
}
return nil
}
func (*startCollectingMetricsOp) collectsMetrics() bool {
return false
}
func (scm startCollectingMetricsOp) patchParams(_ *workload) (realOp, error) {
return &scm, nil
}
// stopCollectingMetricsOp defines an op that stops collecting the metrics
// and writes them into the result slice.
// startCollectingMetricsOp has be used before this op to begin collecting.
type stopCollectingMetricsOp struct {
// Must be "stopCollectingMetrics".
Opcode operationCode
}
func (scm *stopCollectingMetricsOp) isValid(_ bool) error {
if scm.Opcode != stopCollectingMetricsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", scm.Opcode, stopCollectingMetricsOpcode)
}
return nil
}
func (*stopCollectingMetricsOp) collectsMetrics() bool {
return true
}
func (scm stopCollectingMetricsOp) patchParams(_ *workload) (realOp, error) {
return &scm, nil
}
var useTestingLog = flag.Bool("use-testing-log", false, "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks.")
func initTestOutput(tb testing.TB) io.Writer {
@ -1110,6 +1166,46 @@ func checkEmptyInFlightEvents() error {
return nil
}
func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string) (ktesting.TContext, []testDataCollector) {
collectorCtx := ktesting.WithCancel(tCtx)
workloadName := tCtx.Name()
// The first part is the same for each workload, therefore we can strip it.
workloadName = workloadName[strings.Index(name, "/")+1:]
collectors := getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", workloadName, name), namespaces, mcc, throughputErrorMargin)
for _, collector := range collectors {
// Need loop-local variable for function below.
collector := collector
err := collector.init()
if err != nil {
tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err)
}
collectorWG.Add(1)
go func() {
defer collectorWG.Done()
collector.run(collectorCtx)
}()
}
return collectorCtx, collectors
}
func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContext, collectorWG *sync.WaitGroup, threshold float64, tms thresholdMetricSelector, opIndex int, collectors []testDataCollector) []DataItem {
if collectorCtx == nil {
tCtx.Fatalf("op %d: Missing startCollectingMetrics operation before stopping", opIndex)
}
collectorCtx.Cancel("collecting metrics, collector must stop first")
collectorWG.Wait()
var dataItems []DataItem
for _, collector := range collectors {
items := collector.collect()
dataItems = append(dataItems, items...)
err := compareMetricWithThreshold(items, threshold, tms)
if err != nil {
tCtx.Errorf("op %d: %s", opIndex, err)
}
}
return dataItems
}
func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
b, benchmarking := tCtx.TB().(*testing.B)
if benchmarking {
@ -1145,13 +1241,20 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
defer wg.Wait()
defer tCtx.Cancel("workload is done")
var mu sync.Mutex
var dataItems []DataItem
nextNodeIndex := 0
// numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
numPodsScheduledPerNamespace := make(map[string]int)
var collectors []testDataCollector
// This needs a separate context and wait group because
// the metrics collecting needs to be sure that the goroutines
// are stopped.
var collectorCtx ktesting.TContext
var collectorWG sync.WaitGroup
defer collectorWG.Wait()
for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
realOp, err := op.realOp.patchParams(w)
if err != nil {
@ -1204,34 +1307,13 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
if concreteOp.PodTemplatePath == nil {
concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
}
var collectors []testDataCollector
// This needs a separate context and wait group because
// the code below needs to be sure that the goroutines
// are stopped.
var collectorCtx ktesting.TContext
var collectorWG sync.WaitGroup
defer collectorWG.Wait()
if concreteOp.CollectMetrics {
collectorCtx = ktesting.WithCancel(tCtx)
if collectorCtx != nil {
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
}
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace})
defer collectorCtx.Cancel("cleaning up")
name := tCtx.Name()
// The first part is the same for each work load, therefore we can strip it.
name = name[strings.Index(name, "/")+1:]
collectors = getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", name, namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin)
for _, collector := range collectors {
// Need loop-local variable for function below.
collector := collector
err = collector.init()
if err != nil {
tCtx.Fatalf("op %d: Failed to initialize data collector: %v", opIndex, err)
}
collectorWG.Add(1)
go func() {
defer collectorWG.Done()
collector.run(collectorCtx)
}()
}
}
if err := createPods(tCtx, namespace, concreteOp); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
@ -1249,18 +1331,9 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
// CollectMetrics and SkipWaitToCompletion can never be true at the
// same time, so if we're here, it means that all pods have been
// scheduled.
collectorCtx.Cancel("collecting metrix, collector must stop first")
collectorWG.Wait()
mu.Lock()
for _, collector := range collectors {
items := collector.collect()
items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors)
dataItems = append(dataItems, items...)
err := compareMetricWithThreshold(items, w.Threshold, *w.ThresholdMetricSelector)
if err != nil {
tCtx.Errorf("op %d: %s", opIndex, err)
}
}
mu.Unlock()
collectorCtx = nil
}
case *deletePodsOp:
@ -1440,6 +1513,19 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
case <-tCtx.Done():
case <-time.After(concreteOp.Duration):
}
case *startCollectingMetricsOp:
if collectorCtx != nil {
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
}
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces)
defer collectorCtx.Cancel("cleaning up")
case *stopCollectingMetricsOp:
items := stopCollectingMetrics(tCtx, collectorCtx, &collectorWG, w.Threshold, *w.ThresholdMetricSelector, opIndex, collectors)
dataItems = append(dataItems, items...)
collectorCtx = nil
default:
runable, ok := concreteOp.(runnableOp)
if !ok {
@ -1481,12 +1567,12 @@ type testDataCollector interface {
collect() []DataItem
}
func getTestDataCollectors(podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
func getTestDataCollectors(podInformer coreinformers.PodInformer, name string, namespaces []string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
if mcc == nil {
mcc = &defaultMetricsCollectorConfig
}
return []testDataCollector{
newThroughputCollector(podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin),
newThroughputCollector(podInformer, map[string]string{"Name": name}, namespaces, throughputErrorMargin),
newMetricsCollector(mcc, map[string]string{"Name": name}),
}
}