Add deletePodsOp to scheduler_perf

This commit is contained in:
Maciej Skoczeń 2024-09-19 10:41:29 +00:00
parent f0415a65b4
commit 287b61918a
2 changed files with 85 additions and 19 deletions

View File

@ -1333,7 +1333,13 @@
# Create pods that will be gradually deleted after being scheduled.
- opcode: createPods
countParam: $deletingPods
# Delete scheduled pods, which will generate many AssignedPodDelete events.
# Each of them will be processed by the scheduling queue.
# But, the scheduling throughput should only be minimally impacted by the number of gated Pods.
- opcode: deletePods
namespace: namespace-3
deletePodsPerSecond: 50
skipWaitToCompletion: true
- opcode: createPods
countParam: $measurePods
collectMetrics: true
@ -1402,7 +1408,10 @@
countParam: $deletingPods
podTemplatePath: config/templates/pod-with-finalizer.yaml
skipWaitToCompletion: true
- opcode: deletePods
namespace: namespace-1
deletePodsPerSecond: 100
skipWaitToCompletion: true
- opcode: createPods
countParam: $measurePods
collectMetrics: true

View File

@ -77,6 +77,7 @@ const (
createNamespacesOpcode operationCode = "createNamespaces"
createPodsOpcode operationCode = "createPods"
createPodSetsOpcode operationCode = "createPodSets"
deletePodsOpcode operationCode = "deletePods"
createResourceClaimsOpcode operationCode = "createResourceClaims"
createResourceDriverOpcode operationCode = "createResourceDriver"
churnOpcode operationCode = "churn"
@ -407,6 +408,7 @@ func (op *op) UnmarshalJSON(b []byte) error {
&createNamespacesOp{},
&createPodsOp{},
&createPodSetsOp{},
&deletePodsOp{},
&createResourceClaimsOp{},
&createResourceDriverOp{},
&churnOp{},
@ -586,9 +588,6 @@ type createPodsOp struct {
// Optional
PersistentVolumeTemplatePath *string
PersistentVolumeClaimTemplatePath *string
// Number of pods to be deleted per second after they were scheduled. If set to 0, pods are not deleted.
// Optional
DeletePodsPerSecond int
}
func (cpo *createPodsOp) isValid(allowParameterization bool) error {
@ -604,9 +603,6 @@ func (cpo *createPodsOp) isValid(allowParameterization bool) error {
// use-cases right now.
return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time")
}
if cpo.DeletePodsPerSecond < 0 {
return fmt.Errorf("invalid DeletePodsPerSecond=%d; should be non-negative", cpo.DeletePodsPerSecond)
}
return nil
}
@ -665,6 +661,46 @@ func (cpso createPodSetsOp) patchParams(w *workload) (realOp, error) {
return &cpso, (&cpso).isValid(true)
}
// deletePodsOp defines an op where previously created pods are deleted.
// The test can block on the completion of this op before moving forward or
// continue asynchronously.
type deletePodsOp struct {
// Must be "deletePods".
Opcode operationCode
// Namespace the pods should be deleted from.
Namespace string
// Labels used to filter the pods to delete.
// If empty, it will delete all Pods in the namespace.
// Optional.
LabelSelector map[string]string
// Whether or not to wait for all pods in this op to be deleted.
// Defaults to false if not specified.
// Optional
SkipWaitToCompletion bool
// Number of pods to be deleted per second.
// If zero, all pods are deleted at once.
// Optional
DeletePodsPerSecond int
}
func (dpo *deletePodsOp) isValid(allowParameterization bool) error {
if dpo.Opcode != deletePodsOpcode {
return fmt.Errorf("invalid opcode %q; expected %q", dpo.Opcode, deletePodsOpcode)
}
if dpo.DeletePodsPerSecond < 0 {
return fmt.Errorf("invalid DeletePodsPerSecond=%d; should be non-negative", dpo.DeletePodsPerSecond)
}
return nil
}
func (dpo *deletePodsOp) collectsMetrics() bool {
return false
}
func (dpo deletePodsOp) patchParams(w *workload) (realOp, error) {
return &dpo, nil
}
// churnOp defines an op where services are created as a part of a workload.
type churnOp struct {
// Must be "churnOp".
@ -1227,32 +1263,53 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFact
mu.Unlock()
}
if concreteOp.DeletePodsPerSecond > 0 {
pods, err := podInformer.Lister().Pods(namespace).List(labels.Everything())
if err != nil {
tCtx.Fatalf("op %d: error in listing scheduled pods in the namespace: %v", opIndex, err)
}
case *deletePodsOp:
labelSelector := labels.ValidatedSetSelector(concreteOp.LabelSelector)
ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond))
defer ticker.Stop()
podsToDelete, err := podInformer.Lister().Pods(concreteOp.Namespace).List(labelSelector)
if err != nil {
tCtx.Fatalf("op %d: error in listing pods in the namespace %s: %v", opIndex, concreteOp.Namespace, err)
}
wg.Add(1)
go func(opIndex int) {
defer wg.Done()
for i := 0; i < len(pods); i++ {
deletePods := func(opIndex int) {
if concreteOp.DeletePodsPerSecond > 0 {
ticker := time.NewTicker(time.Second / time.Duration(concreteOp.DeletePodsPerSecond))
defer ticker.Stop()
for i := 0; i < len(podsToDelete); i++ {
select {
case <-ticker.C:
if err := tCtx.Client().CoreV1().Pods(namespace).Delete(tCtx, pods[i].Name, metav1.DeleteOptions{}); err != nil {
if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).Delete(tCtx, podsToDelete[i].Name, metav1.DeleteOptions{}); err != nil {
if errors.Is(err, context.Canceled) {
return
}
tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, pods[i].Name, err)
tCtx.Errorf("op %d: unable to delete pod %v: %v", opIndex, podsToDelete[i].Name, err)
}
case <-tCtx.Done():
return
}
}
return
}
listOpts := metav1.ListOptions{
LabelSelector: labelSelector.String(),
}
if err := tCtx.Client().CoreV1().Pods(concreteOp.Namespace).DeleteCollection(tCtx, metav1.DeleteOptions{}, listOpts); err != nil {
if errors.Is(err, context.Canceled) {
return
}
tCtx.Errorf("op %d: unable to delete pods in namespace %v: %v", opIndex, concreteOp.Namespace, err)
}
}
if concreteOp.SkipWaitToCompletion {
wg.Add(1)
go func(opIndex int) {
defer wg.Done()
deletePods(opIndex)
}(opIndex)
} else {
deletePods(opIndex)
}
case *churnOp: