Merge pull request #89070 from alculquicondor/static-partitioning

Add chunk size option to ParallelizeUntil
This commit is contained in:
Kubernetes Prow Robot 2020-03-24 15:27:02 -07:00 committed by GitHub
commit f898f45b20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 256 additions and 35 deletions

View File

@ -123,6 +123,7 @@ filegroup(
"//pkg/scheduler/framework:all-srcs",
"//pkg/scheduler/internal/cache:all-srcs",
"//pkg/scheduler/internal/heap:all-srcs",
"//pkg/scheduler/internal/parallelize:all-srcs",
"//pkg/scheduler/internal/queue:all-srcs",
"//pkg/scheduler/listers:all-srcs",
"//pkg/scheduler/metrics:all-srcs",

View File

@ -13,6 +13,7 @@ go_library(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
@ -28,7 +29,6 @@ go_library(
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/trace:go_default_library",

View File

@ -28,6 +28,7 @@ import (
"time"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
@ -35,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/client-go/util/workqueue"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@ -479,7 +479,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)
parallelize.Until(ctx, len(allNodes), checkNode)
processedNodes := int(filteredLen) + len(statuses)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
@ -870,7 +870,7 @@ func (g *genericScheduler) selectNodesForPreemption(
resultLock.Unlock()
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
parallelize.Until(ctx, len(potentialNodes), checkNode)
return nodeToVictims, nil
}

View File

@ -11,6 +11,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
@ -20,7 +21,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],

View File

@ -25,9 +25,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -240,7 +240,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*nodeinfo.Node
}
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
parallelize.Until(ctx, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
@ -304,7 +304,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*nodei
appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap)
}
}
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
parallelize.Until(context.Background(), len(allNodes), processNode)
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
}

View File

@ -22,9 +22,9 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -259,7 +259,7 @@ func (pl *InterPodAffinity) PreScore(
pl.Unlock()
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
parallelize.Until(ctx, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
return framework.NewStatus(framework.Error, err.Error())
}

View File

@ -13,6 +13,7 @@ go_library(
deps = [
"//pkg/scheduler/framework/plugins/helper:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -25,7 +26,6 @@ go_library(
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -24,10 +24,10 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -267,7 +267,7 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
addTopologyPairMatchNum(pair, matchTotal)
}
}
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
parallelize.Until(context.Background(), len(allNodes), processNode)
// calculate min match for each topology pair
for i := 0; i < len(constraints); i++ {

View File

@ -25,10 +25,10 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
const preScoreStateKey = "PreScore" + Name
@ -153,7 +153,7 @@ func (pl *PodTopologySpread) PreScore(
atomic.AddInt64(state.TopologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode)
parallelize.Until(ctx, len(allNodes), processAllNode)
cycleState.Write(preScoreStateKey, state)
return nil

View File

@ -15,6 +15,7 @@ go_library(
deps = [
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
@ -26,7 +27,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",

View File

@ -28,10 +28,10 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -513,7 +513,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
errCh := schedutil.NewErrorChannel()
// Run Score method for each node in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
parallelize.Until(ctx, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
@ -534,7 +534,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
}
// Run NormalizeScore method for each ScorePlugin in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
nodeScoreList := pluginToNodeScores[pl.Name()]
if pl.ScoreExtensions() == nil {
@ -554,7 +554,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
}
// Apply score defaultWeights for each ScorePlugin in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]

View File

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["parallelism.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/parallelize",
visibility = ["//pkg/scheduler:__subpackages__"],
deps = ["//staging/src/k8s.io/client-go/util/workqueue:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,43 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package parallelize
import (
"context"
"math"
"k8s.io/client-go/util/workqueue"
)
const parallelism = 16
// chunkSizeFor returns a chunk size for the given number of items to use for
// parallel work. The size aims to produce good CPU utilization.
func chunkSizeFor(n int) workqueue.Options {
s := int(math.Sqrt(float64(n)))
if r := n/parallelism + 1; s > r {
s = r
} else if s < 1 {
s = 1
}
return workqueue.WithChunkSize(s)
}
// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
func Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) {
workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece, chunkSizeFor(pieces))
}

View File

@ -14,6 +14,7 @@ require (
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.3.1
github.com/google/gofuzz v1.1.0
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.1.0

View File

@ -13,6 +13,7 @@ go_test(
"delaying_queue_test.go",
"main_test.go",
"metrics_test.go",
"parallelizer_test.go",
"queue_test.go",
"rate_limiting_queue_test.go",
],
@ -20,6 +21,7 @@ go_test(
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
],
)

View File

@ -25,39 +25,77 @@ import (
type DoWorkPieceFunc func(piece int)
type options struct {
chunkSize int
}
type Options func(*options)
// WithChunkSize allows to set chunks of work items to the workers, rather than
// processing one by one.
// It is recommended to use this option if the number of pieces significantly
// higher than the number of workers and the work done for each item is small.
func WithChunkSize(c int) func(*options) {
return func(o *options) {
o.chunkSize = c
}
}
// ParallelizeUntil is a framework that allows for parallelizing N
// independent pieces of work until done or the context is canceled.
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) {
if pieces == 0 {
return
}
o := options{}
for _, opt := range opts {
opt(&o)
}
chunkSize := o.chunkSize
if chunkSize < 1 {
chunkSize = 1
}
toProcess := make(chan int, pieces)
for i := 0; i < pieces; i++ {
chunks := ceilDiv(pieces, chunkSize)
toProcess := make(chan int, chunks)
for i := 0; i < chunks; i++ {
toProcess <- i
}
close(toProcess)
if pieces < workers {
workers = pieces
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
}
if chunks < workers {
workers = chunks
}
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
select {
case <-stop:
return
default:
doWorkPiece(piece)
for chunk := range toProcess {
start := chunk * chunkSize
end := start + chunkSize
if end > pieces {
end = pieces
}
for p := start; p < end; p++ {
select {
case <-stop:
return
default:
doWorkPiece(p)
}
}
}
}()
}
wg.Wait()
}
func ceilDiv(a, b int) int {
return (a + b - 1) / b
}

View File

@ -0,0 +1,111 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"context"
"fmt"
"sync/atomic"
"testing"
"github.com/google/go-cmp/cmp"
)
type testCase struct {
pieces int
workers int
chunkSize int
}
func (c testCase) String() string {
return fmt.Sprintf("pieces:%d,workers:%d,chunkSize:%d", c.pieces, c.workers, c.chunkSize)
}
var cases = []testCase{
{
pieces: 1000,
workers: 10,
chunkSize: 1,
},
{
pieces: 1000,
workers: 10,
chunkSize: 10,
},
{
pieces: 1000,
workers: 10,
chunkSize: 100,
},
{
pieces: 999,
workers: 10,
chunkSize: 13,
},
}
func TestParallelizeUntil(t *testing.T) {
for _, tc := range cases {
t.Run(tc.String(), func(t *testing.T) {
seen := make([]int32, tc.pieces)
ctx := context.Background()
ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) {
atomic.AddInt32(&seen[p], 1)
}, WithChunkSize(tc.chunkSize))
wantSeen := make([]int32, tc.pieces)
for i := 0; i < tc.pieces; i++ {
wantSeen[i] = 1
}
if diff := cmp.Diff(wantSeen, seen); diff != "" {
t.Errorf("bad number of visits (-want,+got):\n%s", diff)
}
})
}
}
func BenchmarkParallelizeUntil(b *testing.B) {
for _, tc := range cases {
b.Run(tc.String(), func(b *testing.B) {
ctx := context.Background()
isPrime := make([]bool, tc.pieces)
b.ResetTimer()
for c := 0; c < b.N; c++ {
ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) {
isPrime[p] = calPrime(p)
}, WithChunkSize(tc.chunkSize))
}
b.StopTimer()
want := []bool{false, false, true, true, false, true, false, true, false, false, false, true}
if diff := cmp.Diff(want, isPrime[:len(want)]); diff != "" {
b.Errorf("miscalculated isPrime (-want,+got):\n%s", diff)
}
})
}
}
func calPrime(p int) bool {
if p <= 1 {
return false
}
for i := 2; i*i <= p; i++ {
if p%i == 0 {
return false
}
}
return true
}

View File

@ -187,6 +187,7 @@
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports",
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources",
"k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1",
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize",
"k8s.io/kubernetes/pkg/scheduler/listers",
"k8s.io/kubernetes/pkg/scheduler/metrics",
"k8s.io/kubernetes/pkg/scheduler/nodeinfo",

View File

@ -47,6 +47,7 @@ var (
defaultTests = []struct{ nodes, existingPods, minPods int }{
{nodes: 500, existingPods: 500, minPods: 1000},
{nodes: 600, existingPods: 10000, minPods: 1000},
{nodes: 5000, existingPods: 5000, minPods: 1000},
}
testNamespace = "sched-test"