diff --git a/go.mod b/go.mod index 4d6410a1..d5aac9d9 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 // indirect + github.com/google/go-cmp v0.3.1 github.com/google/gofuzz v1.1.0 github.com/google/uuid v1.1.1 github.com/googleapis/gnostic v0.1.0 diff --git a/util/workqueue/parallelizer.go b/util/workqueue/parallelizer.go index 5928a0c5..366bf20a 100644 --- a/util/workqueue/parallelizer.go +++ b/util/workqueue/parallelizer.go @@ -25,39 +25,77 @@ import ( type DoWorkPieceFunc func(piece int) +type options struct { + chunkSize int +} + +type Options func(*options) + +// WithChunkSize allows to set chunks of work items to the workers, rather than +// processing one by one. +// It is recommended to use this option if the number of pieces significantly +// higher than the number of workers and the work done for each item is small. +func WithChunkSize(c int) func(*options) { + return func(o *options) { + o.chunkSize = c + } +} + // ParallelizeUntil is a framework that allows for parallelizing N // independent pieces of work until done or the context is canceled. -func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) { - var stop <-chan struct{} - if ctx != nil { - stop = ctx.Done() +func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) { + if pieces == 0 { + return + } + o := options{} + for _, opt := range opts { + opt(&o) + } + chunkSize := o.chunkSize + if chunkSize < 1 { + chunkSize = 1 } - toProcess := make(chan int, pieces) - for i := 0; i < pieces; i++ { + chunks := ceilDiv(pieces, chunkSize) + toProcess := make(chan int, chunks) + for i := 0; i < chunks; i++ { toProcess <- i } close(toProcess) - if pieces < workers { - workers = pieces + var stop <-chan struct{} + if ctx != nil { + stop = ctx.Done() + } + if chunks < workers { + workers = chunks } - wg := sync.WaitGroup{} wg.Add(workers) for i := 0; i < workers; i++ { go func() { defer utilruntime.HandleCrash() defer wg.Done() - for piece := range toProcess { - select { - case <-stop: - return - default: - doWorkPiece(piece) + for chunk := range toProcess { + start := chunk * chunkSize + end := start + chunkSize + if end > pieces { + end = pieces + } + for p := start; p < end; p++ { + select { + case <-stop: + return + default: + doWorkPiece(p) + } } } }() } wg.Wait() } + +func ceilDiv(a, b int) int { + return (a + b - 1) / b +} diff --git a/util/workqueue/parallelizer_test.go b/util/workqueue/parallelizer_test.go new file mode 100644 index 00000000..24438346 --- /dev/null +++ b/util/workqueue/parallelizer_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + + "github.com/google/go-cmp/cmp" +) + +type testCase struct { + pieces int + workers int + chunkSize int +} + +func (c testCase) String() string { + return fmt.Sprintf("pieces:%d,workers:%d,chunkSize:%d", c.pieces, c.workers, c.chunkSize) +} + +var cases = []testCase{ + { + pieces: 1000, + workers: 10, + chunkSize: 1, + }, + { + pieces: 1000, + workers: 10, + chunkSize: 10, + }, + { + pieces: 1000, + workers: 10, + chunkSize: 100, + }, + { + pieces: 999, + workers: 10, + chunkSize: 13, + }, +} + +func TestParallelizeUntil(t *testing.T) { + for _, tc := range cases { + t.Run(tc.String(), func(t *testing.T) { + seen := make([]int32, tc.pieces) + ctx := context.Background() + ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) { + atomic.AddInt32(&seen[p], 1) + }, WithChunkSize(tc.chunkSize)) + + wantSeen := make([]int32, tc.pieces) + for i := 0; i < tc.pieces; i++ { + wantSeen[i] = 1 + } + if diff := cmp.Diff(wantSeen, seen); diff != "" { + t.Errorf("bad number of visits (-want,+got):\n%s", diff) + } + }) + } +} + +func BenchmarkParallelizeUntil(b *testing.B) { + for _, tc := range cases { + b.Run(tc.String(), func(b *testing.B) { + ctx := context.Background() + isPrime := make([]bool, tc.pieces) + b.ResetTimer() + for c := 0; c < b.N; c++ { + ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) { + isPrime[p] = calPrime(p) + }, WithChunkSize(tc.chunkSize)) + } + b.StopTimer() + want := []bool{false, false, true, true, false, true, false, true, false, false, false, true} + if diff := cmp.Diff(want, isPrime[:len(want)]); diff != "" { + b.Errorf("miscalculated isPrime (-want,+got):\n%s", diff) + } + }) + } +} + +func calPrime(p int) bool { + if p <= 1 { + return false + } + for i := 2; i*i <= p; i++ { + if p%i == 0 { + return false + } + } + return true +}