Merge pull request #127759 from macsko/allow_to_filter_pods_using_labels_while_collecting_metrics_scheduler_perf

Allow to filter pods using labels while collecting metrics in scheduler_perf
This commit is contained in:
Kubernetes Prow Robot 2024-09-30 20:37:35 +01:00 committed by GitHub
commit 5e65529ca9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 14 deletions

View File

@ -871,6 +871,10 @@ type startCollectingMetricsOp struct {
Name string Name string
// Namespaces for which the scheduling throughput metric is calculated. // Namespaces for which the scheduling throughput metric is calculated.
Namespaces []string Namespaces []string
// Labels used to filter the pods for which the scheduling throughput metric is collected.
// If empty, it will collect the metric for all pods in the selected namespaces.
// Optional.
LabelSelector map[string]string
} }
func (scm *startCollectingMetricsOp) isValid(_ bool) error { func (scm *startCollectingMetricsOp) isValid(_ bool) error {
@ -1227,12 +1231,12 @@ func checkEmptyInFlightEvents() error {
return nil return nil
} }
func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string) (ktesting.TContext, []testDataCollector) { func startCollectingMetrics(tCtx ktesting.TContext, collectorWG *sync.WaitGroup, podInformer coreinformers.PodInformer, mcc *metricsCollectorConfig, throughputErrorMargin float64, opIndex int, name string, namespaces []string, labelSelector map[string]string) (ktesting.TContext, []testDataCollector) {
collectorCtx := ktesting.WithCancel(tCtx) collectorCtx := ktesting.WithCancel(tCtx)
workloadName := tCtx.Name() workloadName := tCtx.Name()
// The first part is the same for each workload, therefore we can strip it. // The first part is the same for each workload, therefore we can strip it.
workloadName = workloadName[strings.Index(name, "/")+1:] workloadName = workloadName[strings.Index(name, "/")+1:]
collectors := getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", workloadName, name), namespaces, mcc, throughputErrorMargin) collectors := getTestDataCollectors(podInformer, fmt.Sprintf("%s/%s", workloadName, name), namespaces, labelSelector, mcc, throughputErrorMargin)
for _, collector := range collectors { for _, collector := range collectors {
// Need loop-local variable for function below. // Need loop-local variable for function below.
collector := collector collector := collector
@ -1373,7 +1377,7 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
if collectorCtx != nil { if collectorCtx != nil {
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
} }
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}) collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, namespace, []string{namespace}, nil)
defer collectorCtx.Cancel("cleaning up") defer collectorCtx.Cancel("cleaning up")
} }
if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil { if err := createPodsRapidly(tCtx, namespace, concreteOp); err != nil {
@ -1584,7 +1588,7 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
if collectorCtx != nil { if collectorCtx != nil {
tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex) tCtx.Fatalf("op %d: Metrics collection is overlapping. Probably second collector was started before stopping a previous one", opIndex)
} }
collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces) collectorCtx, collectors = startCollectingMetrics(tCtx, &collectorWG, podInformer, tc.MetricsCollectorConfig, throughputErrorMargin, opIndex, concreteOp.Name, concreteOp.Namespaces, concreteOp.LabelSelector)
defer collectorCtx.Cancel("cleaning up") defer collectorCtx.Cancel("cleaning up")
case *stopCollectingMetricsOp: case *stopCollectingMetricsOp:
@ -1633,12 +1637,12 @@ type testDataCollector interface {
collect() []DataItem collect() []DataItem
} }
func getTestDataCollectors(podInformer coreinformers.PodInformer, name string, namespaces []string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { func getTestDataCollectors(podInformer coreinformers.PodInformer, name string, namespaces []string, labelSelector map[string]string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
if mcc == nil { if mcc == nil {
mcc = &defaultMetricsCollectorConfig mcc = &defaultMetricsCollectorConfig
} }
return []testDataCollector{ return []testDataCollector{
newThroughputCollector(podInformer, map[string]string{"Name": name}, namespaces, throughputErrorMargin), newThroughputCollector(podInformer, map[string]string{"Name": name}, labelSelector, namespaces, throughputErrorMargin),
newMetricsCollector(mcc, map[string]string{"Name": name}), newMetricsCollector(mcc, map[string]string{"Name": name}),
} }
} }

View File

@ -405,7 +405,8 @@ func collectHistogramVec(metric string, labels map[string]string, lvMap map[stri
type throughputCollector struct { type throughputCollector struct {
podInformer coreinformers.PodInformer podInformer coreinformers.PodInformer
schedulingThroughputs []float64 schedulingThroughputs []float64
labels map[string]string labelSelector map[string]string
resultLabels map[string]string
namespaces sets.Set[string] namespaces sets.Set[string]
errorMargin float64 errorMargin float64
@ -413,12 +414,13 @@ type throughputCollector struct {
start time.Time start time.Time
} }
func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector { func newThroughputCollector(podInformer coreinformers.PodInformer, resultLabels map[string]string, labelSelector map[string]string, namespaces []string, errorMargin float64) *throughputCollector {
return &throughputCollector{ return &throughputCollector{
podInformer: podInformer, podInformer: podInformer,
labels: labels, labelSelector: labelSelector,
namespaces: sets.New(namespaces...), resultLabels: resultLabels,
errorMargin: errorMargin, namespaces: sets.New(namespaces...),
errorMargin: errorMargin,
} }
} }
@ -451,7 +453,7 @@ func (tc *throughputCollector) run(tCtx ktesting.TContext) {
return return
} }
if !tc.namespaces.Has(newPod.Namespace) { if !tc.namespaces.Has(newPod.Namespace) || !labelsMatch(newPod.Labels, tc.labelSelector) {
return return
} }
@ -577,7 +579,7 @@ func (tc *throughputCollector) run(tCtx ktesting.TContext) {
func (tc *throughputCollector) collect() []DataItem { func (tc *throughputCollector) collect() []DataItem {
throughputSummary := DataItem{ throughputSummary := DataItem{
Labels: tc.labels, Labels: tc.resultLabels,
progress: tc.progress, progress: tc.progress,
start: tc.start, start: tc.start,
} }