diff --git a/pkg/scheduler/framework/parallelize/parallelism.go b/pkg/scheduler/framework/parallelize/parallelism.go index 94441b97d9c..2ac14289a0c 100644 --- a/pkg/scheduler/framework/parallelize/parallelism.go +++ b/pkg/scheduler/framework/parallelize/parallelism.go @@ -51,15 +51,28 @@ func chunkSizeFor(n, parallelism int) int { return s } +// numWorkersForChunkSize returns number of workers (goroutines) +// that will be created in workqueue.ParallelizeUntil +// for given parallelism, pieces and chunkSize values. +func numWorkersForChunkSize(parallelism, pieces, chunkSize int) int { + chunks := (pieces + chunkSize - 1) / chunkSize + if chunks < parallelism { + return chunks + } + return parallelism +} + // Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms. // A given operation will be a label that is recorded in the goroutine metric. func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc, operation string) { - goroutinesMetric := metrics.Goroutines.WithLabelValues(operation) - withMetrics := func(piece int) { - goroutinesMetric.Inc() - doWorkPiece(piece) - goroutinesMetric.Dec() - } + chunkSize := chunkSizeFor(pieces, p.parallelism) + workers := numWorkersForChunkSize(p.parallelism, pieces, chunkSize) - workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, withMetrics, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism))) + goroutinesMetric := metrics.Goroutines.WithLabelValues(operation) + // Calling single Add with workers' count is more efficient than calling Inc or Dec per each work piece. + // This approach improves performance of some plugins (affinity, topology spreading) as well as preemption. + goroutinesMetric.Add(float64(workers)) + defer goroutinesMetric.Add(float64(-workers)) + + workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSize)) }