Merge pull request #35204 from gmarek/node-affinity

Automatic merge from submit-queue

Add node affinity test to scheduler benchmark

cc @wojtek-t
This commit is contained in:
Kubernetes Submit Queue 2016-11-03 05:09:12 -07:00 committed by GitHub
commit a8a8660415
3 changed files with 159 additions and 26 deletions

View File

@ -38,8 +38,11 @@ go_test(
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//plugin/pkg/scheduler/factory:go_default_library",
"//test/integration/framework:go_default_library",
"//test/utils:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/renstrom/dedent",
],
)

View File

@ -22,10 +22,13 @@ import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
"k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils"
"github.com/golang/glog"
"github.com/renstrom/dedent"
)
const (
@ -39,19 +42,86 @@ func TestSchedule100Node3KPods(t *testing.T) {
if testing.Short() {
t.Skip("Skipping because we want to run short tests")
}
if min := schedulePods(100, 3000); min < threshold3K {
config := defaultSchedulerBenchmarkConfig(100, 3000)
if min := schedulePods(config); min < threshold3K {
t.Errorf("To small pod scheduling throughput for 3k pods. Expected %v got %v", threshold3K, min)
} else {
fmt.Printf("Minimal observed throughput for 3k pod test: %v\n", min)
}
}
// TestSchedule100Node3KPods schedules 3k pods using Node affinity on 100 nodes.
func TestSchedule100Node3KNodeAffinityPods(t *testing.T) {
if testing.Short() {
t.Skip("Skipping because we want to run short tests")
}
config := baseConfig()
config.numNodes = 100
config.numPods = 3000
// number of Node-Pod sets with Pods NodeAffinity matching given Nodes.
numGroups := 10
nodeAffinityKey := "kubernetes.io/sched-perf-node-affinity"
nodeStrategies := make([]testutils.CountToStrategy, 0, 10)
for i := 0; i < numGroups; i++ {
nodeStrategies = append(nodeStrategies, testutils.CountToStrategy{
Count: config.numNodes / numGroups,
Strategy: testutils.NewLabelNodePrepareStrategy(nodeAffinityKey, fmt.Sprintf("%v", i)),
})
}
config.nodePreparer = framework.NewIntegrationTestNodePreparer(
config.schedulerConfigFactory.Client,
nodeStrategies,
"scheduler-perf-",
)
affinityTemplate := dedent.Dedent(`
{
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "` + nodeAffinityKey + `",
"operator": "In",
"values": ["%v"]
}]
}]
}
}
}`)
podCreatorConfig := testutils.NewTestPodCreatorConfig()
for i := 0; i < numGroups; i++ {
podCreatorConfig.AddStrategy("sched-perf-node-affinity", config.numPods/numGroups,
testutils.NewCustomCreatePodStrategy(&api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "sched-perf-node-affinity-pod-",
Annotations: map[string]string{api.AffinityAnnotationKey: fmt.Sprintf(affinityTemplate, i)},
},
Spec: testutils.MakePodSpec(),
}),
)
}
config.podCreator = testutils.NewTestPodCreator(config.schedulerConfigFactory.Client, podCreatorConfig)
if min := schedulePods(config); min < threshold30K {
t.Errorf("To small pod scheduling throughput for 30k pods. Expected %v got %v", threshold30K, min)
} else {
fmt.Printf("Minimal observed throughput for 30k pod test: %v\n", min)
}
}
// TestSchedule1000Node30KPods schedules 30k pods on 1000 nodes.
func TestSchedule1000Node30KPods(t *testing.T) {
if testing.Short() {
t.Skip("Skipping because we want to run short tests")
}
if min := schedulePods(1000, 30000); min < threshold30K {
config := defaultSchedulerBenchmarkConfig(1000, 30000)
if min := schedulePods(config); min < threshold30K {
t.Errorf("To small pod scheduling throughput for 30k pods. Expected %v got %v", threshold30K, min)
} else {
fmt.Printf("Minimal observed throughput for 30k pod test: %v\n", min)
@ -64,37 +134,64 @@ func TestSchedule1000Node30KPods(t *testing.T) {
// if testing.Short() {
// t.Skip("Skipping because we want to run short tests")
// }
// if min := schedulePods(2000, 60000); min < threshold60K {
// config := defaultSchedulerBenchmarkConfig(2000, 60000)
// if min := schedulePods(config); min < threshold60K {
// t.Errorf("To small pod scheduling throughput for 60k pods. Expected %v got %v", threshold60K, min)
// } else {
// fmt.Printf("Minimal observed throughput for 60k pod test: %v\n", min)
// }
// }
type testConfig struct {
numPods int
numNodes int
nodePreparer testutils.TestNodePreparer
podCreator *testutils.TestPodCreator
schedulerConfigFactory *factory.ConfigFactory
destroyFunc func()
}
func baseConfig() *testConfig {
schedulerConfigFactory, destroyFunc := mustSetupScheduler()
return &testConfig{
schedulerConfigFactory: schedulerConfigFactory,
destroyFunc: destroyFunc,
}
}
func defaultSchedulerBenchmarkConfig(numNodes, numPods int) *testConfig {
baseConfig := baseConfig()
nodePreparer := framework.NewIntegrationTestNodePreparer(
baseConfig.schedulerConfigFactory.Client,
[]testutils.CountToStrategy{{Count: numNodes, Strategy: &testutils.TrivialNodePrepareStrategy{}}},
"scheduler-perf-",
)
config := testutils.NewTestPodCreatorConfig()
config.AddStrategy("sched-test", numPods, testutils.NewSimpleWithControllerCreatePodStrategy("rc1"))
podCreator := testutils.NewTestPodCreator(baseConfig.schedulerConfigFactory.Client, config)
baseConfig.nodePreparer = nodePreparer
baseConfig.podCreator = podCreator
baseConfig.numPods = numPods
baseConfig.numNodes = numNodes
return baseConfig
}
// schedulePods schedules specific number of pods on specific number of nodes.
// This is used to learn the scheduling throughput on various
// sizes of cluster and changes as more and more pods are scheduled.
// It won't stop until all pods are scheduled.
// It retruns the minimum of throughput over whole run.
func schedulePods(numNodes, numPods int) int32 {
schedulerConfigFactory, destroyFunc := mustSetupScheduler()
defer destroyFunc()
c := schedulerConfigFactory.Client
nodePreparer := framework.NewIntegrationTestNodePreparer(
c,
[]testutils.CountToStrategy{{Count: numNodes, Strategy: &testutils.TrivialNodePrepareStrategy{}}},
"scheduler-perf-",
)
if err := nodePreparer.PrepareNodes(); err != nil {
func schedulePods(config *testConfig) int32 {
defer config.destroyFunc()
if err := config.nodePreparer.PrepareNodes(); err != nil {
glog.Fatalf("%v", err)
}
defer nodePreparer.CleanupNodes()
config := testutils.NewTestPodCreatorConfig()
config.AddStrategy("sched-test", numPods, testutils.NewSimpleWithControllerCreatePodStrategy("rc1"))
podCreator := testutils.NewTestPodCreator(c, config)
podCreator.CreatePods()
defer config.nodePreparer.CleanupNodes()
config.podCreator.CreatePods()
prev := 0
minQps := int32(math.MaxInt32)
@ -103,10 +200,10 @@ func schedulePods(numNodes, numPods int) int32 {
// This can potentially affect performance of scheduler, since List() is done under mutex.
// Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler.
// TODO: Setup watch on apiserver and wait until all pods scheduled.
scheduled := schedulerConfigFactory.ScheduledPodLister.Indexer.List()
if len(scheduled) >= numPods {
scheduled := config.schedulerConfigFactory.ScheduledPodLister.Indexer.List()
if len(scheduled) >= config.numPods {
fmt.Printf("Scheduled %v Pods in %v seconds (%v per second on average).\n",
numPods, int(time.Since(start)/time.Second), numPods/int(time.Since(start)/time.Second))
config.numPods, int(time.Since(start)/time.Second), config.numPods/int(time.Since(start)/time.Second))
return minQps
}
// There's no point in printing it for the last iteration, as the value is random

View File

@ -635,6 +635,39 @@ func (*TrivialNodePrepareStrategy) CleanupNode(node *api.Node) *api.Node {
return &nodeCopy
}
type LabelNodePrepareStrategy struct {
labelKey string
labelValue string
}
func NewLabelNodePrepareStrategy(labelKey string, labelValue string) *LabelNodePrepareStrategy {
return &LabelNodePrepareStrategy{
labelKey: labelKey,
labelValue: labelValue,
}
}
func (s *LabelNodePrepareStrategy) PreparePatch(*api.Node) []byte {
labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.labelKey, s.labelValue)
patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
return []byte(patch)
}
func (s *LabelNodePrepareStrategy) CleanupNode(node *api.Node) *api.Node {
objCopy, err := api.Scheme.DeepCopy(*node)
if err != nil {
return &api.Node{}
}
nodeCopy, ok := (objCopy).(*api.Node)
if !ok {
return &api.Node{}
}
if node.Labels != nil && len(node.Labels[s.labelKey]) != 0 {
delete(nodeCopy.Labels, s.labelKey)
}
return nodeCopy
}
func DoPrepareNode(client clientset.Interface, node *api.Node, strategy PrepareNodeStrategy) error {
var err error
patch := strategy.PreparePatch(node)
@ -717,7 +750,7 @@ func (c *TestPodCreator) CreatePods() error {
return nil
}
func makePodSpec() api.PodSpec {
func MakePodSpec() api.PodSpec {
return api.PodSpec{
Containers: []api.Container{{
Name: "pause",
@ -804,7 +837,7 @@ func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
ObjectMeta: api.ObjectMeta{
GenerateName: "simple-pod-",
},
Spec: makePodSpec(),
Spec: MakePodSpec(),
}
return NewCustomCreatePodStrategy(basePod)
}
@ -816,7 +849,7 @@ func NewSimpleWithControllerCreatePodStrategy(controllerName string) TestPodCrea
GenerateName: controllerName + "-pod-",
Labels: map[string]string{"name": controllerName},
},
Spec: makePodSpec(),
Spec: MakePodSpec(),
}
if err := createController(client, controllerName, namespace, podCount, basePod); err != nil {
return err