Use sqrt(n) chunk size in pod affinity and core scheduler

This commit is contained in:
Aldo Culquicondor 2020-03-16 14:12:11 -04:00
parent 36efa035e2
commit e902e70d0d
15 changed files with 103 additions and 41 deletions

View File

@ -123,6 +123,7 @@ filegroup(
"//pkg/scheduler/framework:all-srcs",
"//pkg/scheduler/internal/cache:all-srcs",
"//pkg/scheduler/internal/heap:all-srcs",
"//pkg/scheduler/internal/parallelize:all-srcs",
"//pkg/scheduler/internal/queue:all-srcs",
"//pkg/scheduler/listers:all-srcs",
"//pkg/scheduler/metrics:all-srcs",

View File

@ -13,6 +13,7 @@ go_library(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
@ -28,7 +29,6 @@ go_library(
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/trace:go_default_library",

View File

@ -28,6 +28,7 @@ import (
"time"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
@ -35,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/client-go/util/workqueue"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@ -479,7 +479,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)
parallelize.Until(ctx, len(allNodes), checkNode)
processedNodes := int(filteredLen) + len(statuses)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
@ -870,7 +870,7 @@ func (g *genericScheduler) selectNodesForPreemption(
resultLock.Unlock()
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
parallelize.Until(ctx, len(potentialNodes), checkNode)
return nodeToVictims, nil
}

View File

@ -11,6 +11,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
@ -20,7 +21,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],

View File

@ -19,16 +19,15 @@ package interpodaffinity
import (
"context"
"fmt"
"math"
"sync"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -37,8 +36,6 @@ const (
// preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data for Filtering.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
preFilterStateKey = "PreFilter" + Name
// minChunkSize is the minimum number of work items sent to a routine when parallelizing.
minChunkSize = 8
// ErrReasonExistingAntiAffinityRulesNotMatch is used for ExistingPodsAntiAffinityRulesNotMatch predicate error.
ErrReasonExistingAntiAffinityRulesNotMatch = "node(s) didn't satisfy existing pods anti-affinity rules"
@ -243,7 +240,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*nodeinfo.Node
}
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode, chunkSizeFor(len(allNodes)))
parallelize.Until(ctx, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
@ -307,7 +304,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*nodei
appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap)
}
}
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode, chunkSizeFor(len(allNodes)))
parallelize.Until(context.Background(), len(allNodes), processNode)
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
}
@ -548,11 +545,3 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy
return nil
}
func chunkSizeFor(n int) workqueue.Options {
s := int(math.Sqrt(float64(n)))
if s < minChunkSize {
s = minChunkSize
}
return workqueue.WithChunkSize(s)
}

View File

@ -22,9 +22,9 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -259,7 +259,7 @@ func (pl *InterPodAffinity) PreScore(
pl.Unlock()
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
parallelize.Until(ctx, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
return framework.NewStatus(framework.Error, err.Error())
}

View File

@ -13,6 +13,7 @@ go_library(
deps = [
"//pkg/scheduler/framework/plugins/helper:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -25,7 +26,6 @@ go_library(
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -24,10 +24,10 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -267,7 +267,7 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
addTopologyPairMatchNum(pair, matchTotal)
}
}
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
parallelize.Until(context.Background(), len(allNodes), processNode)
// calculate min match for each topology pair
for i := 0; i < len(constraints); i++ {

View File

@ -25,10 +25,10 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)
const preScoreStateKey = "PreScore" + Name
@ -153,7 +153,7 @@ func (pl *PodTopologySpread) PreScore(
atomic.AddInt64(state.TopologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processAllNode)
parallelize.Until(ctx, len(allNodes), processAllNode)
cycleState.Write(preScoreStateKey, state)
return nil

View File

@ -15,6 +15,7 @@ go_library(
deps = [
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
@ -26,7 +27,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",

View File

@ -28,10 +28,10 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -513,7 +513,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
errCh := schedutil.NewErrorChannel()
// Run Score method for each node in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
parallelize.Until(ctx, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
nodeName := nodes[index].Name
s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
@ -534,7 +534,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
}
// Run NormalizeScore method for each ScorePlugin in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
nodeScoreList := pluginToNodeScores[pl.Name()]
if pl.ScoreExtensions() == nil {
@ -554,7 +554,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
}
// Apply score defaultWeights for each ScorePlugin in parallel.
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]

View File

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["parallelism.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/parallelize",
visibility = ["//pkg/scheduler:__subpackages__"],
deps = ["//staging/src/k8s.io/client-go/util/workqueue:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,43 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package parallelize
import (
"context"
"math"
"k8s.io/client-go/util/workqueue"
)
const parallelism = 16
// chunkSizeFor returns a chunk size for the given number of items to use for
// parallel work. The size aims to produce good CPU utilization.
func chunkSizeFor(n int) workqueue.Options {
s := int(math.Sqrt(float64(n)))
if r := n/parallelism + 1; s > r {
s = r
} else if s < 1 {
s = 1
}
return workqueue.WithChunkSize(s)
}
// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
func Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) {
workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece, chunkSizeFor(pieces))
}

View File

@ -83,20 +83,13 @@ func BenchmarkParallelizeUntil(b *testing.B) {
b.Run(tc.String(), func(b *testing.B) {
ctx := context.Background()
isPrime := make([]bool, tc.pieces)
b.ResetTimer()
for c := 0; c < b.N; c++ {
ParallelizeUntil(ctx, tc.workers, tc.pieces, func(p int) {
if p <= 1 {
return
}
isPrime[p] = true
for i := 2; i*i <= p; i++ {
if p%i == 0 {
isPrime[p] = false
return
}
}
isPrime[p] = calPrime(p)
}, WithChunkSize(tc.chunkSize))
}
b.StopTimer()
want := []bool{false, false, true, true, false, true, false, true, false, false, false, true}
if diff := cmp.Diff(want, isPrime[:len(want)]); diff != "" {
b.Errorf("miscalculated isPrime (-want,+got):\n%s", diff)
@ -104,3 +97,15 @@ func BenchmarkParallelizeUntil(b *testing.B) {
})
}
}
func calPrime(p int) bool {
if p <= 1 {
return false
}
for i := 2; i*i <= p; i++ {
if p%i == 0 {
return false
}
}
return true
}

View File

@ -47,6 +47,7 @@ var (
defaultTests = []struct{ nodes, existingPods, minPods int }{
{nodes: 500, existingPods: 500, minPods: 1000},
{nodes: 600, existingPods: 10000, minPods: 1000},
{nodes: 5000, existingPods: 5000, minPods: 1000},
}
testNamespace = "sched-test"