Merge pull request #128999 from macsko/improve_goroutines_metric_writes_in_parallelizer_until

Improve Goroutines metric calls in parallelizer.Until
This commit is contained in:
Kubernetes Prow Robot 2025-02-03 07:30:57 -08:00 committed by GitHub
commit 1b7a059187
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -51,15 +51,28 @@ func chunkSizeFor(n, parallelism int) int {
return s return s
} }
// numWorkersForChunkSize returns number of workers (goroutines)
// that will be created in workqueue.ParallelizeUntil
// for given parallelism, pieces and chunkSize values.
func numWorkersForChunkSize(parallelism, pieces, chunkSize int) int {
chunks := (pieces + chunkSize - 1) / chunkSize
if chunks < parallelism {
return chunks
}
return parallelism
}
// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms. // Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
// A given operation will be a label that is recorded in the goroutine metric. // A given operation will be a label that is recorded in the goroutine metric.
func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc, operation string) { func (p Parallelizer) Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc, operation string) {
goroutinesMetric := metrics.Goroutines.WithLabelValues(operation) chunkSize := chunkSizeFor(pieces, p.parallelism)
withMetrics := func(piece int) { workers := numWorkersForChunkSize(p.parallelism, pieces, chunkSize)
goroutinesMetric.Inc()
doWorkPiece(piece)
goroutinesMetric.Dec()
}
workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, withMetrics, workqueue.WithChunkSize(chunkSizeFor(pieces, p.parallelism))) goroutinesMetric := metrics.Goroutines.WithLabelValues(operation)
// Calling single Add with workers' count is more efficient than calling Inc or Dec per each work piece.
// This approach improves performance of some plugins (affinity, topology spreading) as well as preemption.
goroutinesMetric.Add(float64(workers))
defer goroutinesMetric.Add(float64(-workers))
workqueue.ParallelizeUntil(ctx, p.parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSize))
} }