Allow to filter pods using labels on barrier in scheduler_perf

This commit is contained in:
Maciej Skoczeń 2024-10-01 08:48:37 +00:00
parent 22a30e7cbb
commit 5e2552c2b0
2 changed files with 18 additions and 13 deletions

View File

@ -823,6 +823,10 @@ type barrierOp struct {
// Namespaces to block on. Empty array or not specifying this field signifies
// that the barrier should block on all namespaces.
Namespaces []string
// Labels used to filter the pods to block on.
// If empty, it won't filter the labels.
// Optional.
LabelSelector map[string]string
// Determines what stage of pods scheduling the barrier should wait for.
// If empty, it is interpreted as "Scheduled".
// Optional
@ -1410,7 +1414,7 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
tCtx.Fatalf("op %d: %v", opIndex, err)
}
default:
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil {
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, nil, namespace, concreteOp.Count); err != nil {
tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
}
}
@ -1584,14 +1588,14 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
}
switch concreteOp.StageRequirement {
case Attempted:
if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
if err := waitUntilPodsAttempted(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
case Scheduled:
// Default should be treated like "Scheduled", so handling both in the same way.
fallthrough
default:
if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.LabelSelector, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
tCtx.Fatalf("op %d: %v", opIndex, err)
}
// At the end of the barrier, we can be sure that there are no pods
@ -1863,7 +1867,7 @@ func createPodsSteadily(tCtx ktesting.TContext, namespace string, podInformer co
// waitUntilPodsScheduledInNamespace blocks until all pods in the given
// namespace are scheduled. Times out after 10 minutes because even at the
// lowest observed QPS of ~10 pods/sec, a 5000-node test should complete.
func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, labelSelector map[string]string, namespace string, wantCount int) error {
var pendingPod *v1.Pod
err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
@ -1872,7 +1876,7 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
return true, ctx.Err()
default:
}
scheduled, attempted, unattempted, err := getScheduledPods(podInformer, namespace)
scheduled, attempted, unattempted, err := getScheduledPods(podInformer, labelSelector, namespace)
if err != nil {
return false, err
}
@ -1900,7 +1904,7 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
// waitUntilPodsAttemptedInNamespace blocks until all pods in the given
// namespace at least once went through a schedyling cycle.
// Times out after 10 minutes similarly to waitUntilPodsScheduledInNamespace.
func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, labelSelector map[string]string, namespace string, wantCount int) error {
var pendingPod *v1.Pod
err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
@ -1909,7 +1913,7 @@ func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer corei
return true, ctx.Err()
default:
}
scheduled, attempted, unattempted, err := getScheduledPods(podInformer, namespace)
scheduled, attempted, unattempted, err := getScheduledPods(podInformer, labelSelector, namespace)
if err != nil {
return false, err
}
@ -1934,7 +1938,7 @@ func waitUntilPodsAttemptedInNamespace(tCtx ktesting.TContext, podInformer corei
// waitUntilPodsScheduled blocks until the all pods in the given namespaces are
// scheduled.
func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, labelSelector map[string]string, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
// If unspecified, default to all known namespaces.
if len(namespaces) == 0 {
for namespace := range numPodsScheduledPerNamespace {
@ -1951,7 +1955,7 @@ func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.Po
if !ok {
return fmt.Errorf("unknown namespace %s", namespace)
}
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, wantCount); err != nil {
if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, labelSelector, namespace, wantCount); err != nil {
return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err)
}
}
@ -1960,7 +1964,7 @@ func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.Po
// waitUntilPodsAttempted blocks until the all pods in the given namespaces are
// attempted (at least once went through a schedyling cycle).
func waitUntilPodsAttempted(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
func waitUntilPodsAttempted(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, labelSelector map[string]string, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
// If unspecified, default to all known namespaces.
if len(namespaces) == 0 {
for namespace := range numPodsScheduledPerNamespace {
@ -1977,7 +1981,7 @@ func waitUntilPodsAttempted(tCtx ktesting.TContext, podInformer coreinformers.Po
if !ok {
return fmt.Errorf("unknown namespace %s", namespace)
}
if err := waitUntilPodsAttemptedInNamespace(tCtx, podInformer, namespace, wantCount); err != nil {
if err := waitUntilPodsAttemptedInNamespace(tCtx, podInformer, labelSelector, namespace, wantCount); err != nil {
return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err)
}
}

View File

@ -159,8 +159,9 @@ func isAttempted(pod *v1.Pod) bool {
// getScheduledPods returns the list of scheduled, attempted but unschedulable
// and unattempted pods in the specified namespaces.
// Label selector can be used to filter the pods.
// Note that no namespaces specified matches all namespaces.
func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...string) ([]*v1.Pod, []*v1.Pod, []*v1.Pod, error) {
func getScheduledPods(podInformer coreinformers.PodInformer, labelSelector map[string]string, namespaces ...string) ([]*v1.Pod, []*v1.Pod, []*v1.Pod, error) {
pods, err := podInformer.Lister().List(labels.Everything())
if err != nil {
return nil, nil, nil, err
@ -172,7 +173,7 @@ func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...strin
unattempted := make([]*v1.Pod, 0, len(pods))
for i := range pods {
pod := pods[i]
if len(s) == 0 || s.Has(pod.Namespace) {
if (len(s) == 0 || s.Has(pod.Namespace)) && labelsMatch(pod.Labels, labelSelector) {
if len(pod.Spec.NodeName) > 0 {
scheduled = append(scheduled, pod)
} else if isAttempted(pod) {