diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index f24a6060eb4..017620ea02d 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -823,6 +823,10 @@ type barrierOp struct { // Namespaces to block on. Empty array or not specifying this field signifies // that the barrier should block on all namespaces. Namespaces []string + // Labels used to filter the pods to block on. + // If empty, it won't filter the labels. + // Optional. + LabelSelector map[string]string // Determines what stage of pods scheduling the barrier should wait for. // If empty, it is interpreted as "Scheduled". // Optional @@ -1410,7 +1414,7 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact tCtx.Fatalf("op %d: %v", opIndex, err) } default: - if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil { + if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, nil, namespace, concreteOp.Count); err != nil { tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err) } } @@ -1584,14 +1588,14 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact } switch concreteOp.StageRequirement { case Attempted: - if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) } case Scheduled: // Default should be treated like "Scheduled", so handling both in the same way. fallthrough default: - if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { + if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil { tCtx.Fatalf("op %d: %v", opIndex, err) } // At the end of the barrier, we can be sure that there are no pods @@ -1863,7 +1867,7 @@ func createPodsSteadily(tCtx ktesting.TContext, namespace string, podInformer co // waitUntilPodsScheduledInNamespace blocks until all pods in the given // namespace are scheduled. Times out after 10 minutes because even at the // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete. -func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { +func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, labelSelector map[string]string, namespace string, wantCount int) error { var pendingPod *v1.Pod err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { @@ -1872,7 +1876,7 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei return true, ctx.Err() default: } - scheduled, attempted, unattempted, err := getScheduledPods(podInformer, namespace) + scheduled, attempted, unattempted, err := getScheduledPods(podInformer, labelSelector, namespace) if err != nil { return false, err } @@ -1900,7 +1904,7 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei // waitUntilPodsAttemptedInNamespace blocks until all pods in the given // namespace at least once went through a schedyling cycle. // Times out after 10 minutes similarly to waitUntilPodsScheduledInNamespace. -func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error { +func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, labelSelector map[string]string, namespace string, wantCount int) error { var pendingPod *v1.Pod err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { @@ -1909,7 +1913,7 @@ func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer corei return true, ctx.Err() default: } - scheduled, attempted, unattempted, err := getScheduledPods(podInformer, namespace) + scheduled, attempted, unattempted, err := getScheduledPods(podInformer, labelSelector, namespace) if err != nil { return false, err } @@ -1934,7 +1938,7 @@ func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer corei // waitUntilPodsScheduled blocks until the all pods in the given namespaces are // scheduled. -func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { +func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, labelSelector map[string]string, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { // If unspecified, default to all known namespaces. if len(namespaces) == 0 { for namespace := range numPodsScheduledPerNamespace { @@ -1951,7 +1955,7 @@ func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.Po if !ok { return fmt.Errorf("unknown namespace %s", namespace) } - if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, wantCount); err != nil { + if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, labelSelector, namespace, wantCount); err != nil { return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err) } } @@ -1960,7 +1964,7 @@ func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.Po // waitUntilPodsAttempted blocks until the all pods in the given namespaces are // attempted (at least once went through a schedyling cycle). -func waitUntilPodsAttempted(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { +func waitUntilPodsAttempted(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, labelSelector map[string]string, namespaces []string, numPodsScheduledPerNamespace map[string]int) error { // If unspecified, default to all known namespaces. if len(namespaces) == 0 { for namespace := range numPodsScheduledPerNamespace { @@ -1977,7 +1981,7 @@ func waitUntilPodsAttempted(tCtx ktesting.TContext, podInformer coreinformers.Po if !ok { return fmt.Errorf("unknown namespace %s", namespace) } - if err := waitUntilPodsAttemptedInNamespace(tCtx, podInformer, namespace, wantCount); err != nil { + if err := waitUntilPodsAttemptedInNamespace(tCtx, podInformer, labelSelector, namespace, wantCount); err != nil { return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err) } } diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index 884287646a6..879372af6f1 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -159,8 +159,9 @@ func isAttempted(pod *v1.Pod) bool { // getScheduledPods returns the list of scheduled, attempted but unschedulable // and unattempted pods in the specified namespaces. +// Label selector can be used to filter the pods. // Note that no namespaces specified matches all namespaces. -func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...string) ([]*v1.Pod, []*v1.Pod, []*v1.Pod, error) { +func getScheduledPods(podInformer coreinformers.PodInformer, labelSelector map[string]string, namespaces ...string) ([]*v1.Pod, []*v1.Pod, []*v1.Pod, error) { pods, err := podInformer.Lister().List(labels.Everything()) if err != nil { return nil, nil, nil, err @@ -172,7 +173,7 @@ func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...strin unattempted := make([]*v1.Pod, 0, len(pods)) for i := range pods { pod := pods[i] - if len(s) == 0 || s.Has(pod.Namespace) { + if (len(s) == 0 || s.Has(pod.Namespace)) && labelsMatch(pod.Labels, labelSelector) { if len(pod.Spec.NodeName) > 0 { scheduled = append(scheduled, pod) } else if isAttempted(pod) {