Improve Goroutines metric calls in parallelizer.Until

This commit is contained in:
Maciej Skoczeń
2025-01-27 12:01:17 +00:00
parent 8770bd58d0
commit bd8dee9637

View File

@@ -51,15 +51,28 @@ func chunkSizeFor(n, parallelism int) int {
return s
}
// numWorkersForChunkSize returns number of workers (goroutines)
// that will be created in workqueue.ParallelizeUntil
// for given parallelism, pieces and chunkSize values.
func numWorkersForChunkSize(parallelism, pieces, chunkSize int) int {
chunks := (pieces + chunkSize - 1) / chunkSize
if chunks < parallelism {
return chunks
}
return parallelism
}
// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
// A given operation will be a label that is recorded in the goroutine metric.
func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc, operation string) {
goroutinesMetric := metrics.Goroutines.WithLabelValues(operation)
withMetrics := func(piece int) {
goroutinesMetric.Inc()
doWorkPiece(piece)
goroutinesMetric.Dec()
}
chunkSize := chunkSizeFor(pieces, p.parallelism)
workers := numWorkersForChunkSize(p.parallelism, pieces, chunkSize)
workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, withMetrics, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism)))
goroutinesMetric := metrics.Goroutines.WithLabelValues(operation)
// Calling single Add with workers' count is more efficient than calling Inc or Dec per each work piece.
// This approach improves performance of some plugins (affinity, topology spreading) as well as preemption.
goroutinesMetric.Add(float64(workers))
defer goroutinesMetric.Add(float64(-workers))
workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSize))
}