mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 12:32:03 +00:00
Merge pull request #117230 from pohly/scheduler-perf-throughput
scheduler_perf: update throughputCollector
This commit is contained in:
commit
b4c6a70927
@ -845,7 +845,7 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload) [
|
|||||||
if concreteOp.CollectMetrics {
|
if concreteOp.CollectMetrics {
|
||||||
collectorCtx, collectorCancel = context.WithCancel(ctx)
|
collectorCtx, collectorCancel = context.WithCancel(ctx)
|
||||||
defer collectorCancel()
|
defer collectorCancel()
|
||||||
collectors = getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig)
|
collectors = getTestDataCollectors(b, podInformer, fmt.Sprintf("%s/%s", b.Name(), namespace), namespace, tc.MetricsCollectorConfig)
|
||||||
for _, collector := range collectors {
|
for _, collector := range collectors {
|
||||||
// Need loop-local variable for function below.
|
// Need loop-local variable for function below.
|
||||||
collector := collector
|
collector := collector
|
||||||
@ -1035,12 +1035,12 @@ type testDataCollector interface {
|
|||||||
collect() []DataItem
|
collect() []DataItem
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTestDataCollectors(podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector {
|
func getTestDataCollectors(tb testing.TB, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig) []testDataCollector {
|
||||||
if mcc == nil {
|
if mcc == nil {
|
||||||
mcc = &defaultMetricsCollectorConfig
|
mcc = &defaultMetricsCollectorConfig
|
||||||
}
|
}
|
||||||
return []testDataCollector{
|
return []testDataCollector{
|
||||||
newThroughputCollector(podInformer, map[string]string{"Name": name}, []string{namespace}),
|
newThroughputCollector(tb, podInformer, map[string]string{"Name": name}, []string{namespace}),
|
||||||
newMetricsCollector(mcc, map[string]string{"Name": name}),
|
newMetricsCollector(mcc, map[string]string{"Name": name}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,10 +50,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
dateFormat = "2006-01-02T15:04:05Z"
|
dateFormat = "2006-01-02T15:04:05Z"
|
||||||
testNamespace = "sched-test"
|
testNamespace = "sched-test"
|
||||||
setupNamespace = "sched-setup"
|
setupNamespace = "sched-setup"
|
||||||
throughputSampleFrequency = time.Second
|
throughputSampleInterval = time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
|
var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
|
||||||
@ -286,14 +286,16 @@ func collectHistogramVec(metric string, labels map[string]string, lvMap map[stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
type throughputCollector struct {
|
type throughputCollector struct {
|
||||||
|
tb testing.TB
|
||||||
podInformer coreinformers.PodInformer
|
podInformer coreinformers.PodInformer
|
||||||
schedulingThroughputs []float64
|
schedulingThroughputs []float64
|
||||||
labels map[string]string
|
labels map[string]string
|
||||||
namespaces []string
|
namespaces []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector {
|
func newThroughputCollector(tb testing.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector {
|
||||||
return &throughputCollector{
|
return &throughputCollector{
|
||||||
|
tb: tb,
|
||||||
podInformer: podInformer,
|
podInformer: podInformer,
|
||||||
labels: labels,
|
labels: labels,
|
||||||
namespaces: namespaces,
|
namespaces: namespaces,
|
||||||
@ -306,28 +308,71 @@ func (tc *throughputCollector) run(ctx context.Context) {
|
|||||||
klog.Fatalf("%v", err)
|
klog.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
lastScheduledCount := len(podsScheduled)
|
lastScheduledCount := len(podsScheduled)
|
||||||
ticker := time.NewTicker(throughputSampleFrequency)
|
ticker := time.NewTicker(throughputSampleInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
lastSampleTime := time.Now()
|
||||||
|
started := false
|
||||||
|
skipped := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
now := time.Now()
|
||||||
podsScheduled, err := getScheduledPods(tc.podInformer, tc.namespaces...)
|
podsScheduled, err := getScheduledPods(tc.podInformer, tc.namespaces...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("%v", err)
|
klog.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduled := len(podsScheduled)
|
scheduled := len(podsScheduled)
|
||||||
// Only do sampling if number of scheduled pods is greater than zero
|
// Only do sampling if number of scheduled pods is greater than zero.
|
||||||
if scheduled > 0 {
|
if scheduled == 0 {
|
||||||
samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second)
|
continue
|
||||||
throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds
|
}
|
||||||
tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
|
if !started {
|
||||||
|
started = true
|
||||||
|
// Skip the initial sample. It's likely to be an outlier because
|
||||||
|
// sampling and creating pods get started independently.
|
||||||
lastScheduledCount = scheduled
|
lastScheduledCount = scheduled
|
||||||
klog.Infof("%d pods scheduled", lastScheduledCount)
|
lastSampleTime = now
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
newScheduled := scheduled - lastScheduledCount
|
||||||
|
if newScheduled == 0 {
|
||||||
|
// Throughput would be zero for the interval.
|
||||||
|
// Instead of recording 0 pods/s, keep waiting
|
||||||
|
// until we see at least one additional pod
|
||||||
|
// being scheduled.
|
||||||
|
skipped++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should be roughly equal to
|
||||||
|
// throughputSampleInterval * (skipped + 1), but we
|
||||||
|
// don't count on that because the goroutine might not
|
||||||
|
// be scheduled immediately when the timer
|
||||||
|
// triggers. Instead we track the actual time stamps.
|
||||||
|
duration := now.Sub(lastSampleTime)
|
||||||
|
durationInSeconds := duration.Seconds()
|
||||||
|
throughput := float64(newScheduled) / durationInSeconds
|
||||||
|
expectedDuration := throughputSampleInterval * time.Duration(skipped+1)
|
||||||
|
errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100
|
||||||
|
if math.Abs(errorMargin) > 5 {
|
||||||
|
// This might affect the result, report it.
|
||||||
|
tc.tb.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin)
|
||||||
|
}
|
||||||
|
|
||||||
|
// To keep percentiles accurate, we have to record multiple samples with the same
|
||||||
|
// throughput value if we skipped some intervals.
|
||||||
|
for i := 0; i <= skipped; i++ {
|
||||||
|
tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
|
||||||
|
}
|
||||||
|
lastScheduledCount = scheduled
|
||||||
|
klog.Infof("%d pods scheduled", lastScheduledCount)
|
||||||
|
skipped = 0
|
||||||
|
lastSampleTime = now
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user