From 36efa035e292480d4b0ccc7a4ffbf9aa579fbf30 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 11 Mar 2020 17:13:55 -0400 Subject: [PATCH] Add chunk size option to ParallelizeUntil Signed-off-by: Aldo Culquicondor --- .../plugins/interpodaffinity/filtering.go | 15 ++- staging/src/k8s.io/client-go/go.mod | 1 + .../src/k8s.io/client-go/util/workqueue/BUILD | 2 + .../client-go/util/workqueue/parallelizer.go | 68 ++++++++--- .../util/workqueue/parallelizer_test.go | 106 ++++++++++++++++++ 5 files changed, 175 insertions(+), 17 deletions(-) create mode 100644 staging/src/k8s.io/client-go/util/workqueue/parallelizer_test.go diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index af035260623..0f0e9b37ac1 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -19,6 +19,7 @@ package interpodaffinity import ( "context" "fmt" + "math" "sync" "k8s.io/api/core/v1" @@ -36,6 +37,8 @@ const ( // preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data for Filtering. // Using the name of the plugin will likely help us avoid collisions with other plugins. preFilterStateKey = "PreFilter" + Name + // minChunkSize is the minimum number of work items sent to a routine when parallelizing. + minChunkSize = 8 // ErrReasonExistingAntiAffinityRulesNotMatch is used for ExistingPodsAntiAffinityRulesNotMatch predicate error. ErrReasonExistingAntiAffinityRulesNotMatch = "node(s) didn't satisfy existing pods anti-affinity rules" @@ -240,7 +243,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*nodeinfo.Node } } } - workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) + workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode, chunkSizeFor(len(allNodes))) if err := errCh.ReceiveError(); err != nil { return nil, err @@ -304,7 +307,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*nodei appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap) } } - workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode) + workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode, chunkSizeFor(len(allNodes))) return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil } @@ -545,3 +548,11 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy return nil } + +func chunkSizeFor(n int) workqueue.Options { + s := int(math.Sqrt(float64(n))) + if s < minChunkSize { + s = minChunkSize + } + return workqueue.WithChunkSize(s) +} diff --git a/staging/src/k8s.io/client-go/go.mod b/staging/src/k8s.io/client-go/go.mod index 5f2761c0723..1d60130fbee 100644 --- a/staging/src/k8s.io/client-go/go.mod +++ b/staging/src/k8s.io/client-go/go.mod @@ -14,6 +14,7 @@ require ( github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 // indirect + github.com/google/go-cmp v0.3.0 github.com/google/gofuzz v1.1.0 github.com/google/uuid v1.1.1 github.com/googleapis/gnostic v0.1.0 diff --git a/staging/src/k8s.io/client-go/util/workqueue/BUILD b/staging/src/k8s.io/client-go/util/workqueue/BUILD index 538ad31cde9..302d16a930c 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/BUILD +++ b/staging/src/k8s.io/client-go/util/workqueue/BUILD @@ -13,6 +13,7 @@ go_test( "delaying_queue_test.go", "main_test.go", "metrics_test.go", + "parallelizer_test.go", "queue_test.go", "rate_limiting_queue_test.go", ], @@ -20,6 +21,7 @@ go_test( deps = [ "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go b/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go index 5928a0c5b7d..366bf20a312 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go +++ b/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go @@ -25,39 +25,77 @@ import ( type DoWorkPieceFunc func(piece int) +type options struct { + chunkSize int +} + +type Options func(*options) + +// WithChunkSize allows to set chunks of work items to the workers, rather than +// processing one by one. +// It is recommended to use this option if the number of pieces significantly +// higher than the number of workers and the work done for each item is small. +func WithChunkSize(c int) func(*options) { + return func(o *options) { + o.chunkSize = c + } +} + // ParallelizeUntil is a framework that allows for parallelizing N // independent pieces of work until done or the context is canceled. -func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) { - var stop <-chan struct{} - if ctx != nil { - stop = ctx.Done() +func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) { + if pieces == 0 { + return + } + o := options{} + for _, opt := range opts { + opt(&o) + } + chunkSize := o.chunkSize + if chunkSize < 1 { + chunkSize = 1 } - toProcess := make(chan int, pieces) - for i := 0; i < pieces; i++ { + chunks := ceilDiv(pieces, chunkSize) + toProcess := make(chan int, chunks) + for i := 0; i < chunks; i++ { toProcess <- i } close(toProcess) - if pieces < workers { - workers = pieces + var stop <-chan struct{} + if ctx != nil { + stop = ctx.Done() + } + if chunks < workers { + workers = chunks } - wg := sync.WaitGroup{} wg.Add(workers) for i := 0; i < workers; i++ { go func() { defer utilruntime.HandleCrash() defer wg.Done() - for piece := range toProcess { - select { - case <-stop: - return - default: - doWorkPiece(piece) + for chunk := range toProcess { + start := chunk * chunkSize + end := start + chunkSize + if end > pieces { + end = pieces + } + for p := start; p < end; p++ { + select { + case <-stop: + return + default: + doWorkPiece(p) + } } } }() } wg.Wait() } + +func ceilDiv(a, b int) int { + return (a + b - 1) / b +} diff --git a/staging/src/k8s.io/client-go/util/workqueue/parallelizer_test.go b/staging/src/k8s.io/client-go/util/workqueue/parallelizer_test.go new file mode 100644 index 00000000000..07362268204 --- /dev/null +++ b/staging/src/k8s.io/client-go/util/workqueue/parallelizer_test.go @@ -0,0 +1,106 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + + "github.com/google/go-cmp/cmp" +) + +type testCase struct { + pieces int + workers int + chunkSize int +} + +func (c testCase) String() string { + return fmt.Sprintf("pieces:%d,workers:%d,chunkSize:%d", c.pieces, c.workers, c.chunkSize) +} + +var cases = []testCase{ + { + pieces: 1000, + workers: 10, + chunkSize: 1, + }, + { + pieces: 1000, + workers: 10, + chunkSize: 10, + }, + { + pieces: 1000, + workers: 10, + chunkSize: 100, + }, + { + pieces: 999, + workers: 10, + chunkSize: 13, + }, +} + +func TestParallelizeUntil(t *testing.T) { + for _, tc := range cases { + t.Run(tc.String(), func(t *testing.T) { + seen := make([]int32, tc.pieces) + ctx := context.Background() + ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) { + atomic.AddInt32(&seen[p], 1) + }, WithChunkSize(tc.chunkSize)) + + wantSeen := make([]int32, tc.pieces) + for i := 0; i < tc.pieces; i++ { + wantSeen[i] = 1 + } + if diff := cmp.Diff(wantSeen, seen); diff != "" { + t.Errorf("bad number of visits (-want,+got):\n%s", diff) + } + }) + } +} + +func BenchmarkParallelizeUntil(b *testing.B) { + for _, tc := range cases { + b.Run(tc.String(), func(b *testing.B) { + ctx := context.Background() + isPrime := make([]bool, tc.pieces) + for c := 0; c < b.N; c++ { + ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) { + if p <= 1 { + return + } + isPrime[p] = true + for i := 2; i*i <= p; i++ { + if p%i == 0 { + isPrime[p] = false + return + } + } + }, WithChunkSize(tc.chunkSize)) + } + want := []bool{false, false, true, true, false, true, false, true, false, false, false, true} + if diff := cmp.Diff(want, isPrime[:len(want)]); diff != "" { + b.Errorf("miscalculated isPrime (-want,+got):\n%s", diff) + } + }) + } +}