diff --git a/test/integration/scheduler_perf/config/performance-config.yaml b/test/integration/scheduler_perf/config/performance-config.yaml index cb7649a6e71..82a0abc9e87 100644 --- a/test/integration/scheduler_perf/config/performance-config.yaml +++ b/test/integration/scheduler_perf/config/performance-config.yaml @@ -1333,7 +1333,13 @@ # Create pods that will be gradually deleted after being scheduled. - opcode: createPods countParam: $deletingPods + # Delete scheduled pods, which will generate many AssignedPodDelete events. + # Each of them will be processed by the scheduling queue. + # But, the scheduling throughput should only be minimally impacted by the number of gated Pods. + - opcode: deletePods + namespace: namespace-3 deletePodsPerSecond: 50 + skipWaitToCompletion: true - opcode: createPods countParam: $measurePods collectMetrics: true @@ -1402,7 +1408,10 @@ countParam: $deletingPods podTemplatePath: config/templates/pod-with-finalizer.yaml skipWaitToCompletion: true + - opcode: deletePods + namespace: namespace-1 deletePodsPerSecond: 100 + skipWaitToCompletion: true - opcode: createPods countParam: $measurePods collectMetrics: true diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 47c2c9d6dca..eb75dd5a3e7 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -77,6 +77,7 @@ const ( createNamespacesOpcode operationCode = "createNamespaces" createPodsOpcode operationCode = "createPods" createPodSetsOpcode operationCode = "createPodSets" + deletePodsOpcode operationCode = "deletePods" createResourceClaimsOpcode operationCode = "createResourceClaims" createResourceDriverOpcode operationCode = "createResourceDriver" churnOpcode operationCode = "churn" @@ -407,6 +408,7 @@ func (op *op) UnmarshalJSON(b []byte) error { &createNamespacesOp{}, &createPodsOp{}, &createPodSetsOp{}, + &deletePodsOp{}, &createResourceClaimsOp{}, &createResourceDriverOp{}, &churnOp{}, @@ -586,9 +588,6 @@ type createPodsOp struct { // Optional PersistentVolumeTemplatePath *string PersistentVolumeClaimTemplatePath *string - // Number of pods to be deleted per second after they were scheduled. If set to 0, pods are not deleted. - // Optional - DeletePodsPerSecond int } func (cpo *createPodsOp) isValid(allowParameterization bool) error { @@ -604,9 +603,6 @@ func (cpo *createPodsOp) isValid(allowParameterization bool) error { // use-cases right now. return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time") } - if cpo.DeletePodsPerSecond < 0 { - return fmt.Errorf("invalid DeletePodsPerSecond=%d; should be non-negative", cpo.DeletePodsPerSecond) - } return nil } @@ -665,6 +661,46 @@ func (cpso createPodSetsOp) patchParams(w *workload) (realOp, error) { return &cpso, (&cpso).isValid(true) } +// deletePodsOp defines an op where previously created pods are deleted. +// The test can block on the completion of this op before moving forward or +// continue asynchronously. +type deletePodsOp struct { + // Must be "deletePods". + Opcode operationCode + // Namespace the pods should be deleted from. + Namespace string + // Labels used to filter the pods to delete. + // If empty, it will delete all Pods in the namespace. + // Optional. + LabelSelector map[string]string + // Whether or not to wait for all pods in this op to be deleted. + // Defaults to false if not specified. + // Optional + SkipWaitToCompletion bool + // Number of pods to be deleted per second. + // If zero, all pods are deleted at once. + // Optional + DeletePodsPerSecond int +} + +func (dpo *deletePodsOp) isValid(allowParameterization bool) error { + if dpo.Opcode != deletePodsOpcode { + return fmt.Errorf("invalid opcode %q; expected %q", dpo.Opcode, deletePodsOpcode) + } + if dpo.DeletePodsPerSecond < 0 { + return fmt.Errorf("invalid DeletePodsPerSecond=%d; should be non-negative", dpo.DeletePodsPerSecond) + } + return nil +} + +func (dpo *deletePodsOp) collectsMetrics() bool { + return false +} + +func (dpo deletePodsOp) patchParams(w *workload) (realOp, error) { + return &dpo, nil +} + // churnOp defines an op where services are created as a part of a workload. type churnOp struct { // Must be "churnOp". @@ -1227,32 +1263,53 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact mu.Unlock() } - if concreteOp.DeletePodsPerSecond > 0 { - pods, err := podInformer.Lister().Pods(namespace).List(labels.Everything()) - if err != nil { - tCtx.Fatalf("op %d: error in listing scheduled pods in the namespace: %v", opIndex, err) - } + case *deletePodsOp: + labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector) - ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond)) - defer ticker.Stop() + podsToDelete, err := podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector) + if err != nil { + tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err) + } - wg.Add(1) - go func(opIndex int) { - defer wg.Done() - for i := 0; i < len(pods); i++ { + deletePods := func(opIndex int) { + if concreteOp.DeletePodsPerSecond > 0 { + ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond)) + defer ticker.Stop() + + for i := 0; i < len(podsToDelete); i++ { select { case <-ticker.C: - if err := tCtx.Client().CoreV1().Pods(namespace).Delete(tCtx, pods[i].Name, metav1.DeleteOptions{}); err != nil { + if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil { if errors.Is(err, context.Canceled) { return } - tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, pods[i].Name, err) + tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err) } case <-tCtx.Done(): return } } + return + } + listOpts := metav1.ListOptions{ + LabelSelector: labelSelector.String(), + } + if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(tCtx, metav1.DeleteOptions{}, listOpts); err != nil { + if errors.Is(err, context.Canceled) { + return + } + tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err) + } + } + + if concreteOp.SkipWaitToCompletion { + wg.Add(1) + go func(opIndex int) { + defer wg.Done() + deletePods(opIndex) }(opIndex) + } else { + deletePods(opIndex) } case *churnOp: