From 326ffd93416c45a2ff2a44f3717e147a9cf29197 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 11 Mar 2020 17:13:55 -0400 Subject: [PATCH 1/3] Add chunk size option to ParallelizeUntil Signed-off-by: Aldo Culquicondor Kubernetes-commit: 36efa035e292480d4b0ccc7a4ffbf9aa579fbf30 --- go.mod | 10 +-- go.sum | 2 - util/workqueue/parallelizer.go | 68 ++++++++++++++---- util/workqueue/parallelizer_test.go | 106 ++++++++++++++++++++++++++++ 4 files changed, 165 insertions(+), 21 deletions(-) create mode 100644 util/workqueue/parallelizer_test.go diff --git a/go.mod b/go.mod index 4d6410a1..1d60130f 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 // indirect + github.com/google/go-cmp v0.3.0 github.com/google/gofuzz v1.1.0 github.com/google/uuid v1.1.1 github.com/googleapis/gnostic v0.1.0 @@ -27,8 +28,8 @@ require ( golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 google.golang.org/appengine v1.5.0 // indirect - k8s.io/api v0.0.0-20200320042356-1fc28ea2498c - k8s.io/apimachinery v0.0.0-20200324202305-1aec6bc431a9 + k8s.io/api v0.0.0 + k8s.io/apimachinery v0.0.0 k8s.io/klog v1.0.0 k8s.io/utils v0.0.0-20200322164244-327a8059b905 sigs.k8s.io/yaml v1.2.0 @@ -37,6 +38,7 @@ require ( replace ( golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13 golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13 - k8s.io/api => k8s.io/api v0.0.0-20200320042356-1fc28ea2498c - k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200324202305-1aec6bc431a9 + k8s.io/api => ../api + k8s.io/apimachinery => ../apimachinery + k8s.io/client-go => ../client-go ) diff --git a/go.sum b/go.sum index 3b04fdae..01257fca 100644 --- a/go.sum +++ b/go.sum @@ -187,8 +187,6 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -k8s.io/api v0.0.0-20200320042356-1fc28ea2498c/go.mod h1:5nMyHS4bWX496fulniJ+Sws3P6GLvaP43GadMObLf58= -k8s.io/apimachinery v0.0.0-20200324202305-1aec6bc431a9/go.mod h1:yKN3QjQfKl8UdUL9RQ+/1VkR7nIUs7w02zC5CXhD+G0= k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= diff --git a/util/workqueue/parallelizer.go b/util/workqueue/parallelizer.go index 5928a0c5..366bf20a 100644 --- a/util/workqueue/parallelizer.go +++ b/util/workqueue/parallelizer.go @@ -25,39 +25,77 @@ import ( type DoWorkPieceFunc func(piece int) +type options struct { + chunkSize int +} + +type Options func(*options) + +// WithChunkSize allows to set chunks of work items to the workers, rather than +// processing one by one. +// It is recommended to use this option if the number of pieces significantly +// higher than the number of workers and the work done for each item is small. +func WithChunkSize(c int) func(*options) { + return func(o *options) { + o.chunkSize = c + } +} + // ParallelizeUntil is a framework that allows for parallelizing N // independent pieces of work until done or the context is canceled. -func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) { - var stop <-chan struct{} - if ctx != nil { - stop = ctx.Done() +func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) { + if pieces == 0 { + return + } + o := options{} + for _, opt := range opts { + opt(&o) + } + chunkSize := o.chunkSize + if chunkSize < 1 { + chunkSize = 1 } - toProcess := make(chan int, pieces) - for i := 0; i < pieces; i++ { + chunks := ceilDiv(pieces, chunkSize) + toProcess := make(chan int, chunks) + for i := 0; i < chunks; i++ { toProcess <- i } close(toProcess) - if pieces < workers { - workers = pieces + var stop <-chan struct{} + if ctx != nil { + stop = ctx.Done() + } + if chunks < workers { + workers = chunks } - wg := sync.WaitGroup{} wg.Add(workers) for i := 0; i < workers; i++ { go func() { defer utilruntime.HandleCrash() defer wg.Done() - for piece := range toProcess { - select { - case <-stop: - return - default: - doWorkPiece(piece) + for chunk := range toProcess { + start := chunk * chunkSize + end := start + chunkSize + if end > pieces { + end = pieces + } + for p := start; p < end; p++ { + select { + case <-stop: + return + default: + doWorkPiece(p) + } } } }() } wg.Wait() } + +func ceilDiv(a, b int) int { + return (a + b - 1) / b +} diff --git a/util/workqueue/parallelizer_test.go b/util/workqueue/parallelizer_test.go new file mode 100644 index 00000000..07362268 --- /dev/null +++ b/util/workqueue/parallelizer_test.go @@ -0,0 +1,106 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + + "github.com/google/go-cmp/cmp" +) + +type testCase struct { + pieces int + workers int + chunkSize int +} + +func (c testCase) String() string { + return fmt.Sprintf("pieces:%d,workers:%d,chunkSize:%d", c.pieces, c.workers, c.chunkSize) +} + +var cases = []testCase{ + { + pieces: 1000, + workers: 10, + chunkSize: 1, + }, + { + pieces: 1000, + workers: 10, + chunkSize: 10, + }, + { + pieces: 1000, + workers: 10, + chunkSize: 100, + }, + { + pieces: 999, + workers: 10, + chunkSize: 13, + }, +} + +func TestParallelizeUntil(t *testing.T) { + for _, tc := range cases { + t.Run(tc.String(), func(t *testing.T) { + seen := make([]int32, tc.pieces) + ctx := context.Background() + ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) { + atomic.AddInt32(&seen[p], 1) + }, WithChunkSize(tc.chunkSize)) + + wantSeen := make([]int32, tc.pieces) + for i := 0; i < tc.pieces; i++ { + wantSeen[i] = 1 + } + if diff := cmp.Diff(wantSeen, seen); diff != "" { + t.Errorf("bad number of visits (-want,+got):\n%s", diff) + } + }) + } +} + +func BenchmarkParallelizeUntil(b *testing.B) { + for _, tc := range cases { + b.Run(tc.String(), func(b *testing.B) { + ctx := context.Background() + isPrime := make([]bool, tc.pieces) + for c := 0; c < b.N; c++ { + ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) { + if p <= 1 { + return + } + isPrime[p] = true + for i := 2; i*i <= p; i++ { + if p%i == 0 { + isPrime[p] = false + return + } + } + }, WithChunkSize(tc.chunkSize)) + } + want := []bool{false, false, true, true, false, true, false, true, false, false, false, true} + if diff := cmp.Diff(want, isPrime[:len(want)]); diff != "" { + b.Errorf("miscalculated isPrime (-want,+got):\n%s", diff) + } + }) + } +} From 7ee24064f0ea80bc727beb5a75ee3b94311abfd9 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Mon, 16 Mar 2020 14:12:11 -0400 Subject: [PATCH 2/3] Use sqrt(n) chunk size in pod affinity and core scheduler Kubernetes-commit: e902e70d0d9bca1a1c823ce9d04d8fd68e8f3396 --- util/workqueue/parallelizer_test.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/util/workqueue/parallelizer_test.go b/util/workqueue/parallelizer_test.go index 07362268..24438346 100644 --- a/util/workqueue/parallelizer_test.go +++ b/util/workqueue/parallelizer_test.go @@ -83,20 +83,13 @@ func BenchmarkParallelizeUntil(b *testing.B) { b.Run(tc.String(), func(b *testing.B) { ctx := context.Background() isPrime := make([]bool, tc.pieces) + b.ResetTimer() for c := 0; c < b.N; c++ { ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) { - if p <= 1 { - return - } - isPrime[p] = true - for i := 2; i*i <= p; i++ { - if p%i == 0 { - isPrime[p] = false - return - } - } + isPrime[p] = calPrime(p) }, WithChunkSize(tc.chunkSize)) } + b.StopTimer() want := []bool{false, false, true, true, false, true, false, true, false, false, false, true} if diff := cmp.Diff(want, isPrime[:len(want)]); diff != "" { b.Errorf("miscalculated isPrime (-want,+got):\n%s", diff) @@ -104,3 +97,15 @@ func BenchmarkParallelizeUntil(b *testing.B) { }) } } + +func calPrime(p int) bool { + if p <= 1 { + return false + } + for i := 2; i*i <= p; i++ { + if p%i == 0 { + return false + } + } + return true +} From cc684433cdc7bafa070005c3d4523c4172a89d5a Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 18 Mar 2020 14:48:06 -0400 Subject: [PATCH 3/3] Add transient dependency to test/e2e/framework/.import-restrictions Signed-off-by: Aldo Culquicondor Kubernetes-commit: b01e3dc394304603146f17785e3a3cb31f991497 --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 1d60130f..0b65250e 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 // indirect - github.com/google/go-cmp v0.3.0 + github.com/google/go-cmp v0.3.1 github.com/google/gofuzz v1.1.0 github.com/google/uuid v1.1.1 github.com/googleapis/gnostic v0.1.0