Add chunk size option to ParallelizeUntil

Signed-off-by: Aldo Culquicondor <acondor@google.com>
This commit is contained in:
Aldo Culquicondor 2020-03-11 17:13:55 -04:00
parent c441a1a7dc
commit 36efa035e2
5 changed files with 175 additions and 17 deletions

View File

@ -19,6 +19,7 @@ package interpodaffinity
import (
"context"
"fmt"
"math"
"sync"
"k8s.io/api/core/v1"
@ -36,6 +37,8 @@ const (
// preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data for Filtering.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
preFilterStateKey = "PreFilter" + Name
// minChunkSize is the minimum number of work items sent to a routine when parallelizing.
minChunkSize = 8
// ErrReasonExistingAntiAffinityRulesNotMatch is used for ExistingPodsAntiAffinityRulesNotMatch predicate error.
ErrReasonExistingAntiAffinityRulesNotMatch = "node(s) didn't satisfy existing pods anti-affinity rules"
@ -240,7 +243,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*nodeinfo.Node
}
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode, chunkSizeFor(len(allNodes)))
if err := errCh.ReceiveError(); err != nil {
return nil, err
@ -304,7 +307,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*nodei
appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap)
}
}
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode, chunkSizeFor(len(allNodes)))
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
}
@ -545,3 +548,11 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy
return nil
}
func chunkSizeFor(n int) workqueue.Options {
s := int(math.Sqrt(float64(n)))
if s < minChunkSize {
s = minChunkSize
}
return workqueue.WithChunkSize(s)
}

View File

@ -14,6 +14,7 @@ require (
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.3.0
github.com/google/gofuzz v1.1.0
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.1.0

View File

@ -13,6 +13,7 @@ go_test(
"delaying_queue_test.go",
"main_test.go",
"metrics_test.go",
"parallelizer_test.go",
"queue_test.go",
"rate_limiting_queue_test.go",
],
@ -20,6 +21,7 @@ go_test(
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
],
)

View File

@ -25,39 +25,77 @@ import (
type DoWorkPieceFunc func(piece int)
type options struct {
chunkSize int
}
type Options func(*options)
// WithChunkSize allows to set chunks of work items to the workers, rather than
// processing one by one.
// It is recommended to use this option if the number of pieces significantly
// higher than the number of workers and the work done for each item is small.
func WithChunkSize(c int) func(*options) {
return func(o *options) {
o.chunkSize = c
}
}
// ParallelizeUntil is a framework that allows for parallelizing N
// independent pieces of work until done or the context is canceled.
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) {
if pieces == 0 {
return
}
o := options{}
for _, opt := range opts {
opt(&o)
}
chunkSize := o.chunkSize
if chunkSize < 1 {
chunkSize = 1
}
toProcess := make(chan int, pieces)
for i := 0; i < pieces; i++ {
chunks := ceilDiv(pieces, chunkSize)
toProcess := make(chan int, chunks)
for i := 0; i < chunks; i++ {
toProcess <- i
}
close(toProcess)
if pieces < workers {
workers = pieces
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
}
if chunks < workers {
workers = chunks
}
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
for chunk := range toProcess {
start := chunk * chunkSize
end := start + chunkSize
if end > pieces {
end = pieces
}
for p := start; p < end; p++ {
select {
case <-stop:
return
default:
doWorkPiece(piece)
doWorkPiece(p)
}
}
}
}()
}
wg.Wait()
}
func ceilDiv(a, b int) int {
return (a + b - 1) / b
}

View File

@ -0,0 +1,106 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"context"
"fmt"
"sync/atomic"
"testing"
"github.com/google/go-cmp/cmp"
)
type testCase struct {
pieces int
workers int
chunkSize int
}
func (c testCase) String() string {
return fmt.Sprintf("pieces:%d,workers:%d,chunkSize:%d", c.pieces, c.workers, c.chunkSize)
}
var cases = []testCase{
{
pieces: 1000,
workers: 10,
chunkSize: 1,
},
{
pieces: 1000,
workers: 10,
chunkSize: 10,
},
{
pieces: 1000,
workers: 10,
chunkSize: 100,
},
{
pieces: 999,
workers: 10,
chunkSize: 13,
},
}
func TestParallelizeUntil(t *testing.T) {
for _, tc := range cases {
t.Run(tc.String(), func(t *testing.T) {
seen := make([]int32, tc.pieces)
ctx := context.Background()
ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) {
atomic.AddInt32(&seen[p], 1)
}, WithChunkSize(tc.chunkSize))
wantSeen := make([]int32, tc.pieces)
for i := 0; i < tc.pieces; i++ {
wantSeen[i] = 1
}
if diff := cmp.Diff(wantSeen, seen); diff != "" {
t.Errorf("bad number of visits (-want,+got):\n%s", diff)
}
})
}
}
func BenchmarkParallelizeUntil(b *testing.B) {
for _, tc := range cases {
b.Run(tc.String(), func(b *testing.B) {
ctx := context.Background()
isPrime := make([]bool, tc.pieces)
for c := 0; c < b.N; c++ {
ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) {
if p <= 1 {
return
}
isPrime[p] = true
for i := 2; i*i <= p; i++ {
if p%i == 0 {
isPrime[p] = false
return
}
}
}, WithChunkSize(tc.chunkSize))
}
want := []bool{false, false, true, true, false, true, false, true, false, false, false, true}
if diff := cmp.Diff(want, isPrime[:len(want)]); diff != "" {
b.Errorf("miscalculated isPrime (-want,+got):\n%s", diff)
}
})
}
}