Generalize Pod creation across e2e and integration tests

This commit is contained in:
gmarek 2016-10-20 11:51:06 +02:00
parent b1d8961fe4
commit be57ca5015
7 changed files with 169 additions and 68 deletions

View File

@ -204,6 +204,7 @@ func GetPauseImageName(c clientset.Interface) string {
}
// GetPauseImageNameForHostArch fetches the pause image name for the same architecture the test is running on.
// TODO: move this function to the test/utils
func GetPauseImageNameForHostArch() string {
return currentPodInfraContainerImageName + "-" + goRuntime.GOARCH + ":" + currentPodInfraContainerImageVersion
}

View File

@ -16,17 +16,14 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/resource:go_default_library",
"//pkg/apimachinery/registered:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/unversioned:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/client/restclient:go_default_library",
"//pkg/util/workqueue:go_default_library",
"//plugin/pkg/scheduler:go_default_library",
"//plugin/pkg/scheduler/algorithmprovider:go_default_library",
"//plugin/pkg/scheduler/factory:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor:github.com/golang/glog",
],

View File

@ -67,7 +67,11 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) {
glog.Fatalf("%v", err)
}
defer nodePreparer.CleanupNodes()
makePodsFromRC(c, "rc1", numScheduledPods)
config := testutils.NewTestPodCreatorConfig()
config.AddStrategy("sched-test", numScheduledPods, testutils.NewSimpleWithControllerCreatePodStrategy("rc1"))
podCreator := testutils.NewTestPodCreator(c, config)
podCreator.CreatePods()
for {
scheduled := schedulerConfigFactory.ScheduledPodLister.Indexer.List()
@ -78,7 +82,10 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) {
}
// start benchmark
b.ResetTimer()
makePodsFromRC(c, "rc2", b.N)
config = testutils.NewTestPodCreatorConfig()
config.AddStrategy("sched-test", b.N, testutils.NewSimpleWithControllerCreatePodStrategy("rc2"))
podCreator = testutils.NewTestPodCreator(c, config)
podCreator.CreatePods()
for {
// This can potentially affect performance of scheduler, since List() is done under mutex.
// TODO: Setup watch on apiserver and wait until all pods scheduled.

View File

@ -90,7 +90,11 @@ func schedulePods(numNodes, numPods int) int32 {
glog.Fatalf("%v", err)
}
defer nodePreparer.CleanupNodes()
makePodsFromRC(c, "rc1", numPods)
config := testutils.NewTestPodCreatorConfig()
config.AddStrategy("sched-test", numPods, testutils.NewSimpleWithControllerCreatePodStrategy("rc1"))
podCreator := testutils.NewTestPodCreator(c, config)
podCreator.CreatePods()
prev := 0
minQps := int32(math.MaxInt32)

View File

@ -22,17 +22,14 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/apimachinery/registered"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/plugin/pkg/scheduler"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
e2e "k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/integration/framework"
)
@ -79,62 +76,3 @@ func mustSetupScheduler() (schedulerConfigFactory *factory.ConfigFactory, destro
}
return
}
func makePodSpec() api.PodSpec {
return api.PodSpec{
Containers: []api.Container{{
Name: "pause",
Image: e2e.GetPauseImageNameForHostArch(),
Ports: []api.ContainerPort{{ContainerPort: 80}},
Resources: api.ResourceRequirements{
Limits: api.ResourceList{
api.ResourceCPU: resource.MustParse("100m"),
api.ResourceMemory: resource.MustParse("500Mi"),
},
Requests: api.ResourceList{
api.ResourceCPU: resource.MustParse("100m"),
api.ResourceMemory: resource.MustParse("500Mi"),
},
},
}},
}
}
// makePodsFromRC will create a ReplicationController object and
// a given number of pods (imitating the controller).
func makePodsFromRC(c clientset.Interface, name string, podCount int) {
rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.ReplicationControllerSpec{
Replicas: int32(podCount),
Selector: map[string]string{"name": name},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"name": name},
},
Spec: makePodSpec(),
},
},
}
if _, err := c.Core().ReplicationControllers("default").Create(rc); err != nil {
glog.Fatalf("unexpected error: %v", err)
}
basePod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "scheduler-test-pod-",
Labels: map[string]string{"name": name},
},
Spec: makePodSpec(),
}
createPod := func(i int) {
for {
if _, err := c.Core().Pods("default").Create(basePod); err == nil {
break
}
}
}
workqueue.Parallelize(30, podCount, createPod)
}

View File

@ -33,6 +33,7 @@ go_library(
"//pkg/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/uuid:go_default_library",
"//pkg/util/workqueue:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
],

View File

@ -20,6 +20,7 @@ import (
"fmt"
"math"
"os"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
@ -32,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/workqueue"
"github.com/golang/glog"
)
@ -671,3 +673,154 @@ func DoCleanupNode(client clientset.Interface, nodeName string, strategy Prepare
}
return fmt.Errorf("To many conflicts when trying to cleanup Node %v", nodeName)
}
type TestPodCreateStrategy func(client clientset.Interface, namespace string, podCount int) error
type CountToPodStrategy struct {
Count int
Strategy TestPodCreateStrategy
}
type TestPodCreatorConfig map[string][]CountToPodStrategy
func NewTestPodCreatorConfig() *TestPodCreatorConfig {
config := make(TestPodCreatorConfig)
return &config
}
func (c *TestPodCreatorConfig) AddStrategy(
namespace string, podCount int, strategy TestPodCreateStrategy) {
(*c)[namespace] = append((*c)[namespace], CountToPodStrategy{Count: podCount, Strategy: strategy})
}
type TestPodCreator struct {
Client clientset.Interface
// namespace -> count -> strategy
Config *TestPodCreatorConfig
}
func NewTestPodCreator(client clientset.Interface, config *TestPodCreatorConfig) *TestPodCreator {
return &TestPodCreator{
Client: client,
Config: config,
}
}
func (c *TestPodCreator) CreatePods() error {
for ns, v := range *(c.Config) {
for _, countToStrategy := range v {
if err := countToStrategy.Strategy(c.Client, ns, countToStrategy.Count); err != nil {
return err
}
}
}
return nil
}
func makePodSpec() api.PodSpec {
return api.PodSpec{
Containers: []api.Container{{
Name: "pause",
Image: "kubernetes/pause",
Ports: []api.ContainerPort{{ContainerPort: 80}},
Resources: api.ResourceRequirements{
Limits: api.ResourceList{
api.ResourceCPU: resource.MustParse("100m"),
api.ResourceMemory: resource.MustParse("500Mi"),
},
Requests: api.ResourceList{
api.ResourceCPU: resource.MustParse("100m"),
api.ResourceMemory: resource.MustParse("500Mi"),
},
},
}},
}
}
func makeCreatePod(client clientset.Interface, namespace string, podTemplate *api.Pod) error {
var err error
for attempt := 0; attempt < retries; attempt++ {
if _, err := client.Core().Pods(namespace).Create(podTemplate); err == nil {
return nil
}
glog.Errorf("Error while creating pod, maybe retry: %v", err)
}
return fmt.Errorf("Terminal error while creating pod, won't retry: %v", err)
}
func createPod(client clientset.Interface, namespace string, podCount int, podTemplate *api.Pod) error {
var createError error
lock := sync.Mutex{}
createPodFunc := func(i int) {
if err := makeCreatePod(client, namespace, podTemplate); err != nil {
lock.Lock()
defer lock.Unlock()
createError = err
}
}
if podCount < 30 {
workqueue.Parallelize(podCount, podCount, createPodFunc)
} else {
workqueue.Parallelize(30, podCount, createPodFunc)
}
return createError
}
func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *api.Pod) error {
rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: controllerName,
},
Spec: api.ReplicationControllerSpec{
Replicas: int32(podCount),
Selector: map[string]string{"name": controllerName},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"name": controllerName},
},
Spec: podTemplate.Spec,
},
},
}
var err error
for attempt := 0; attempt < retries; attempt++ {
if _, err := client.Core().ReplicationControllers(namespace).Create(rc); err == nil {
return nil
}
glog.Errorf("Error while creating rc, maybe retry: %v", err)
}
return fmt.Errorf("Terminal error while creating rc, won't retry: %v", err)
}
func NewCustomCreatePodStrategy(podTemplate *api.Pod) TestPodCreateStrategy {
return func(client clientset.Interface, namespace string, podCount int) error {
return createPod(client, namespace, podCount, podTemplate)
}
}
func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
basePod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "simple-pod-",
},
Spec: makePodSpec(),
}
return NewCustomCreatePodStrategy(basePod)
}
func NewSimpleWithControllerCreatePodStrategy(controllerName string) TestPodCreateStrategy {
return func(client clientset.Interface, namespace string, podCount int) error {
basePod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: controllerName + "-pod-",
Labels: map[string]string{"name": controllerName},
},
Spec: makePodSpec(),
}
if err := createController(client, controllerName, namespace, podCount, basePod); err != nil {
return err
}
return createPod(client, namespace, podCount, basePod)
}
}