scheduler_perf: update throughputCollector

The previous solution had some shortcomings:

- It was based on the assumption that the goroutine gets woken up at regular
  intervals. This is not actually guaranteed. Now the code keeps track of the
  actual start and end of an interval and verifies that assumption.

- If no pod was scheduled (unlikely, but could happen), then
  "0 pods/s" got recorded. In such a case, the metric was always either
  zero or >= 1. A better solution is to extend the interval
  until some pod gets scheduled. With the larger time interval
  it is then possible to also track, for example, 0.5 pods/s.
This commit is contained in:
Patrick Ohly 2023-01-27 19:22:16 +01:00
parent 1d27cbc784
commit 78b8af9fed
2 changed files with 60 additions and 15 deletions

View File

@ -835,7 +835,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
if concreteOp.CollectMetrics {
collectorCtx, collectorCancel = context.WithCancel(ctx)
defer collectorCancel()
collectors = getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig)
collectors = getTestDataCollectors(b, podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig)
for _, collector := range collectors {
// Need loop-local variable for function below.
collector := collector
@ -1025,12 +1025,12 @@ type testDataCollector interface {
collect() []DataItem
}
func getTestDataCollectors(podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector {
func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector {
if mcc == nil {
mcc = &defaultMetricsCollectorConfig
}
return []testDataCollector{
newThroughputCollector(podInformer, map[string]string{"Name": name}, []string{namespace}),
newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}),
newMetricsCollector(mcc, map[string]string{"Name": name}),
}
}

View File

@ -50,10 +50,10 @@ import (
)
const (
dateFormat = "2006-01-02T15:04:05Z"
testNamespace = "sched-test"
setupNamespace = "sched-setup"
throughputSampleFrequency = time.Second
dateFormat = "2006-01-02T15:04:05Z"
testNamespace = "sched-test"
setupNamespace = "sched-setup"
throughputSampleInterval = time.Second
)
var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
@ -286,14 +286,16 @@ func collectHistogramVec(metric string, labels map[string]string, lvMap map[stri
}
type throughputCollector struct {
tb testing.TB
podInformer coreinformers.PodInformer
schedulingThroughputs []float64
labels map[string]string
namespaces []string
}
func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector {
func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector {
return &throughputCollector{
tb: tb,
podInformer: podInformer,
labels: labels,
namespaces: namespaces,
@ -306,28 +308,71 @@ func (tc *throughputCollector) run(ctx context.Context) {
klog.Fatalf("%v", err)
}
lastScheduledCount := len(podsScheduled)
ticker := time.NewTicker(throughputSampleFrequency)
ticker := time.NewTicker(throughputSampleInterval)
defer ticker.Stop()
lastSampleTime := time.Now()
started := false
skipped := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
now := time.Now()
podsScheduled, err := getScheduledPods(tc.podInformer, tc.namespaces...)
if err != nil {
klog.Fatalf("%v", err)
}
scheduled := len(podsScheduled)
// Only do sampling if number of scheduled pods is greater than zero
if scheduled > 0 {
samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second)
throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds
tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
// Only do sampling if number of scheduled pods is greater than zero.
if scheduled == 0 {
continue
}
if !started {
started = true
// Skip the initial sample. It's likely to be an outlier because
// sampling and creating pods get started independently.
lastScheduledCount = scheduled
klog.Infof("%d pods scheduled", lastScheduledCount)
lastSampleTime = now
continue
}
newScheduled := scheduled - lastScheduledCount
if newScheduled == 0 {
// Throughput would be zero for the interval.
// Instead of recording 0 pods/s, keep waiting
// until we see at least one additional pod
// being scheduled.
skipped++
continue
}
// This should be roughly equal to
// throughputSampleInterval * (skipped + 1), but we
// don't count on that because the goroutine might not
// be scheduled immediately when the timer
// triggers. Instead we track the actual time stamps.
duration := now.Sub(lastSampleTime)
durationInSeconds := duration.Seconds()
throughput := float64(newScheduled) / durationInSeconds
expectedDuration := throughputSampleInterval * time.Duration(skipped+1)
errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100
if math.Abs(errorMargin) > 5 {
// This might affect the result, report it.
tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin)
}
// To keep percentiles accurate, we have to record multiple samples with the same
// throughput value if we skipped some intervals.
for i := 0; i <= skipped; i++ {
tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
}
lastScheduledCount = scheduled
klog.Infof("%d pods scheduled", lastScheduledCount)
skipped = 0
lastSampleTime = now
}
}
}