Make it possible to run Load test using Deployments or ReplicaSets

This commit is contained in:
gmarek 2016-11-25 18:15:00 +01:00
parent 8e8599fcd7
commit 070f0979c2
7 changed files with 392 additions and 183 deletions

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@ -35,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/util/sets"
utiluuid "k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/workqueue"
@ -56,11 +58,13 @@ const (
var MaxContainerFailures = 0
type DensityTestConfig struct {
Configs []testutils.RCConfig
Configs []testutils.RunObjectConfig
ClientSet clientset.Interface
InternalClientset internalclientset.Interface
PollInterval time.Duration
PodCount int
// What kind of resource we want to create
kind schema.GroupKind
}
func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceConstraint {
@ -193,13 +197,13 @@ func runDensityTest(dtc DensityTestConfig) time.Duration {
wg := sync.WaitGroup{}
wg.Add(len(dtc.Configs))
for i := range dtc.Configs {
rcConfig := dtc.Configs[i]
config := dtc.Configs[i]
go func() {
defer GinkgoRecover()
// Call wg.Done() in defer to avoid blocking whole test
// in case of error from RunRC.
defer wg.Done()
framework.ExpectNoError(framework.RunRC(rcConfig))
framework.ExpectNoError(config.Run())
}()
}
logStopCh := make(chan struct{})
@ -236,21 +240,21 @@ func runDensityTest(dtc DensityTestConfig) time.Duration {
func cleanupDensityTest(dtc DensityTestConfig) {
defer GinkgoRecover()
By("Deleting ReplicationController")
By("Deleting created Collections")
// We explicitly delete all pods to have API calls necessary for deletion accounted in metrics.
for i := range dtc.Configs {
rcName := dtc.Configs[i].Name
rc, err := dtc.ClientSet.Core().ReplicationControllers(dtc.Configs[i].Namespace).Get(rcName)
if err == nil && *(rc.Spec.Replicas) != 0 {
if framework.TestContext.GarbageCollectorEnabled {
By("Cleaning up only the replication controller, garbage collector will clean up the pods")
err := framework.DeleteRCAndWaitForGC(dtc.ClientSet, dtc.Configs[i].Namespace, rcName)
framework.ExpectNoError(err)
} else {
By("Cleaning up the replication controller and pods")
err := framework.DeleteRCAndPods(dtc.ClientSet, dtc.InternalClientset, dtc.Configs[i].Namespace, rcName)
framework.ExpectNoError(err)
}
name := dtc.Configs[i].GetName()
namespace := dtc.Configs[i].GetNamespace()
kind := dtc.Configs[i].GetKind()
// TODO: Remove Deployment guard once GC is implemented for Deployments.
if framework.TestContext.GarbageCollectorEnabled && kind != extensions.Kind("Deployment") {
By(fmt.Sprintf("Cleaning up only the %v, garbage collector will clean up the pods", kind))
err := framework.DeleteResourceAndWaitForGC(dtc.ClientSet, kind, namespace, name)
framework.ExpectNoError(err)
} else {
By(fmt.Sprintf("Cleaning up the %v and pods", kind))
err := framework.DeleteResourceAndPods(dtc.ClientSet, dtc.InternalClientset, kind, dtc.Configs[i].GetNamespace(), name)
framework.ExpectNoError(err)
}
}
}
@ -265,7 +269,7 @@ func cleanupDensityTest(dtc DensityTestConfig) {
var _ = framework.KubeDescribe("Density", func() {
var c clientset.Interface
var nodeCount int
var RCName string
var name string
var additionalPodsPrefix string
var ns string
var uuid string
@ -352,27 +356,31 @@ var _ = framework.KubeDescribe("Density", func() {
podsPerNode int
// Controls how often the apiserver is polled for pods
interval time.Duration
// What kind of resource we should be creating. Default: ReplicationController
kind schema.GroupKind
}
densityTests := []Density{
// TODO: Expose runLatencyTest as ginkgo flag.
{podsPerNode: 3, runLatencyTest: false},
{podsPerNode: 30, runLatencyTest: true},
{podsPerNode: 50, runLatencyTest: false},
{podsPerNode: 95, runLatencyTest: true},
{podsPerNode: 100, runLatencyTest: false},
{podsPerNode: 3, runLatencyTest: false, kind: api.Kind("ReplicationController")},
{podsPerNode: 30, runLatencyTest: true, kind: api.Kind("ReplicationController")},
{podsPerNode: 50, runLatencyTest: false, kind: api.Kind("ReplicationController")},
{podsPerNode: 95, runLatencyTest: true, kind: api.Kind("ReplicationController")},
{podsPerNode: 100, runLatencyTest: false, kind: api.Kind("ReplicationController")},
}
for _, testArg := range densityTests {
feature := "ManualPerformance"
switch testArg.podsPerNode {
case 30:
feature = "Performance"
if testArg.kind == api.Kind("ReplicationController") {
feature = "Performance"
}
case 95:
feature = "HighDensityPerformance"
}
name := fmt.Sprintf("[Feature:%s] should allow starting %d pods per node", feature, testArg.podsPerNode)
name := fmt.Sprintf("[Feature:%s] should allow starting %d pods per node using %v", feature, testArg.podsPerNode, testArg.kind)
itArg := testArg
It(name, func() {
nodePreparer := framework.NewE2ETestNodePreparer(
@ -392,44 +400,55 @@ var _ = framework.KubeDescribe("Density", func() {
defer fileHndl.Close()
// nodeCountPerNamespace and CreateNamespaces are defined in load.go
numberOfRCs := (nodeCount + nodeCountPerNamespace - 1) / nodeCountPerNamespace
namespaces, err := CreateNamespaces(f, numberOfRCs, fmt.Sprintf("density-%v", testArg.podsPerNode))
numberOfCollections := (nodeCount + nodeCountPerNamespace - 1) / nodeCountPerNamespace
namespaces, err := CreateNamespaces(f, numberOfCollections, fmt.Sprintf("density-%v", testArg.podsPerNode))
framework.ExpectNoError(err)
RCConfigs := make([]testutils.RCConfig, numberOfRCs)
configs := make([]testutils.RunObjectConfig, numberOfCollections)
// Since all RCs are created at the same time, timeout for each config
// has to assume that it will be run at the very end.
podThroughput := 20
timeout := time.Duration(totalPods/podThroughput)*time.Second + 3*time.Minute
// createClients is defined in load.go
clients, internalClients, err := createClients(numberOfRCs)
for i := 0; i < numberOfRCs; i++ {
RCName := fmt.Sprintf("density%v-%v-%v", totalPods, i, uuid)
clients, internalClients, err := createClients(numberOfCollections)
for i := 0; i < numberOfCollections; i++ {
name := fmt.Sprintf("density%v-%v-%v", totalPods, i, uuid)
nsName := namespaces[i].Name
RCConfigs[i] = testutils.RCConfig{
baseConfig := &testutils.RCConfig{
Client: clients[i],
InternalClient: internalClients[i],
Image: framework.GetPauseImageName(f.ClientSet),
Name: RCName,
Name: name,
Namespace: nsName,
Labels: map[string]string{"type": "densityPod"},
PollInterval: DensityPollInterval,
Timeout: timeout,
PodStatusFile: fileHndl,
Replicas: (totalPods + numberOfRCs - 1) / numberOfRCs,
Replicas: (totalPods + numberOfCollections - 1) / numberOfCollections,
CpuRequest: nodeCpuCapacity / 100,
MemRequest: nodeMemCapacity / 100,
MaxContainerFailures: &MaxContainerFailures,
Silent: true,
}
switch itArg.kind {
case api.Kind("ReplicationController"):
configs[i] = baseConfig
case extensions.Kind("ReplicaSet"):
configs[i] = &testutils.ReplicaSetConfig{RCConfig: *baseConfig}
case extensions.Kind("Deployment"):
configs[i] = &testutils.DeploymentConfig{RCConfig: *baseConfig}
default:
framework.Failf("Unsupported kind: %v", itArg.kind)
}
}
dConfig := DensityTestConfig{
ClientSet: f.ClientSet,
InternalClientset: f.InternalClientset,
Configs: RCConfigs,
Configs: configs,
PodCount: totalPods,
PollInterval: DensityPollInterval,
kind: itArg.kind,
}
e2eStartupTime = runDensityTest(dConfig)
if itArg.runLatencyTest {
@ -657,29 +676,29 @@ var _ = framework.KubeDescribe("Density", func() {
fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid))
framework.ExpectNoError(err)
defer fileHndl.Close()
rcCnt := 1
RCConfigs := make([]testutils.RCConfig, rcCnt)
podsPerRC := int(totalPods / rcCnt)
for i := 0; i < rcCnt; i++ {
if i == rcCnt-1 {
podsPerRC += int(math.Mod(float64(totalPods), float64(rcCnt)))
collectionCount := 1
configs := make([]testutils.RunObjectConfig, collectionCount)
podsPerCollection := int(totalPods / collectionCount)
for i := 0; i < collectionCount; i++ {
if i == collectionCount-1 {
podsPerCollection += int(math.Mod(float64(totalPods), float64(collectionCount)))
}
RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
RCConfigs[i] = testutils.RCConfig{Client: c,
name = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
configs[i] = &testutils.RCConfig{Client: c,
Image: framework.GetPauseImageName(f.ClientSet),
Name: RCName,
Name: name,
Namespace: ns,
Labels: map[string]string{"type": "densityPod"},
PollInterval: DensityPollInterval,
PodStatusFile: fileHndl,
Replicas: podsPerRC,
Replicas: podsPerCollection,
MaxContainerFailures: &MaxContainerFailures,
Silent: true,
}
}
dConfig := DensityTestConfig{
ClientSet: f.ClientSet,
Configs: RCConfigs,
Configs: configs,
PodCount: totalPods,
PollInterval: DensityPollInterval,
}
@ -727,6 +746,6 @@ func createRunningPodFromRC(wg *sync.WaitGroup, c clientset.Interface, name, ns,
}
_, err := c.Core().ReplicationControllers(ns).Create(rc)
framework.ExpectNoError(err)
framework.ExpectNoError(framework.WaitForRCPodsRunning(c, ns, name))
framework.ExpectNoError(framework.WaitForControlledPodsRunning(c, ns, name, api.Kind("ReplicationController")))
framework.Logf("Found pod '%s' running", name)
}

View File

@ -21,6 +21,7 @@ import (
"path/filepath"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/labels"
@ -90,7 +91,7 @@ var _ = framework.KubeDescribe("ClusterDns [Feature:Example]", func() {
// wait for objects
for _, ns := range namespaces {
framework.WaitForRCPodsRunning(c, ns.Name, backendRcName)
framework.WaitForControlledPodsRunning(c, ns.Name, backendRcName, api.Kind("ReplicationController"))
framework.WaitForService(c, ns.Name, backendSvcName, true, framework.Poll, framework.ServiceStartTimeout)
}
// it is not enough that pods are running because they may be set to running, but

View File

@ -2622,9 +2622,29 @@ func RemoveTaintOffNode(c clientset.Interface, nodeName string, taint v1.Taint)
}
}
func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error {
By(fmt.Sprintf("Scaling replication controller %s in namespace %s to %d", name, ns, size))
scaler, err := kubectl.ScalerFor(api.Kind("ReplicationController"), internalClientset)
func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Scaler, error) {
switch kind {
case api.Kind("ReplicationController"):
return kubectl.ScalerFor(api.Kind("ReplicationController"), internalClientset)
case extensionsinternal.Kind("ReplicaSet"):
return kubectl.ScalerFor(extensionsinternal.Kind("ReplicaSet"), internalClientset)
case extensionsinternal.Kind("Deployment"):
return kubectl.ScalerFor(extensionsinternal.Kind("Deployment"), internalClientset)
default:
return nil, fmt.Errorf("Unsupported kind for getting Scaler: %v", kind)
}
}
func ScaleResource(
clientset clientset.Interface,
internalClientset internalclientset.Interface,
ns, name string,
size uint,
wait bool,
kind schema.GroupKind,
) error {
By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
scaler, err := getScalerForKind(internalClientset, kind)
if err != nil {
return err
}
@ -2636,51 +2656,32 @@ func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.
if !wait {
return nil
}
return WaitForRCPodsRunning(clientset, ns, name)
return WaitForControlledPodsRunning(clientset, ns, name, kind)
}
// Wait up to 10 minutes for pods to become Running.
func WaitForRCPodsRunning(c clientset.Interface, ns, rcName string) error {
rc, err := c.Core().ReplicationControllers(ns).Get(rcName)
func WaitForControlledPodsRunning(c clientset.Interface, ns, name string, kind schema.GroupKind) error {
rtObject, err := getRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
return err
}
selector, err := getSelectorFromRuntimeObject(rtObject)
if err != nil {
return err
}
selector := labels.SelectorFromSet(labels.Set(rc.Spec.Selector))
err = testutils.WaitForPodsWithLabelRunning(c, ns, selector)
if err != nil {
return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %v", rcName, err)
return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %v", name, err)
}
return nil
}
func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, internalClientset, ns, name, size, wait, api.Kind("ReplicationController"))
}
func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error {
By(fmt.Sprintf("Scaling Deployment %s in namespace %s to %d", name, ns, size))
scaler, err := kubectl.ScalerFor(extensionsinternal.Kind("Deployment"), internalClientset)
if err != nil {
return err
}
waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute)
waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute)
if err = scaler.Scale(ns, name, size, nil, waitForScale, waitForReplicas); err != nil {
return fmt.Errorf("error while scaling Deployment %s to %d replicas: %v", name, size, err)
}
if !wait {
return nil
}
return WaitForDeploymentPodsRunning(clientset, ns, name)
}
func WaitForDeploymentPodsRunning(c clientset.Interface, ns, name string) error {
deployment, err := c.Extensions().Deployments(ns).Get(name)
if err != nil {
return err
}
selector := labels.SelectorFromSet(labels.Set(deployment.Spec.Selector.MatchLabels))
err = testutils.WaitForPodsWithLabelRunning(c, ns, selector)
if err != nil {
return fmt.Errorf("Error while waiting for Deployment %s pods to be running: %v", name, err)
}
return nil
return ScaleResource(clientset, internalClientset, ns, name, size, wait, extensionsinternal.Kind("Deployment"))
}
// Returns true if all the specified pods are scheduled, else returns false.
@ -2760,26 +2761,102 @@ func WaitForPodsWithLabelRunningReady(c clientset.Interface, ns string, label la
return pods, err
}
// DeleteRCAndPods a Replication Controller and all pods it spawned
func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string) error {
By(fmt.Sprintf("deleting replication controller %s in namespace %s", name, ns))
rc, err := clientset.Core().ReplicationControllers(ns).Get(name)
func getRuntimeObjectForKind(c clientset.Interface, kind schema.GroupKind, ns, name string) (runtime.Object, error) {
switch kind {
case api.Kind("ReplicationController"):
return c.Core().ReplicationControllers(ns).Get(name)
case extensionsinternal.Kind("ReplicaSet"):
return c.Extensions().ReplicaSets(ns).Get(name)
case extensionsinternal.Kind("Deployment"):
return c.Extensions().Deployments(ns).Get(name)
default:
return nil, fmt.Errorf("Unsupported kind when getting runtime object: %v", kind)
}
}
func deleteResource(c clientset.Interface, kind schema.GroupKind, ns, name string, deleteOption *v1.DeleteOptions) error {
switch kind {
case api.Kind("ReplicationController"):
return c.Core().ReplicationControllers(ns).Delete(name, deleteOption)
case extensionsinternal.Kind("ReplicaSet"):
return c.Extensions().ReplicaSets(ns).Delete(name, deleteOption)
case extensionsinternal.Kind("Deployment"):
return c.Extensions().Deployments(ns).Delete(name, deleteOption)
default:
return fmt.Errorf("Unsupported kind when deleting: %v", kind)
}
}
func getSelectorFromRuntimeObject(obj runtime.Object) (labels.Selector, error) {
switch typed := obj.(type) {
case *v1.ReplicationController:
return labels.SelectorFromSet(typed.Spec.Selector), nil
case *extensions.ReplicaSet:
return metav1.LabelSelectorAsSelector(typed.Spec.Selector)
case *extensions.Deployment:
return metav1.LabelSelectorAsSelector(typed.Spec.Selector)
default:
return nil, fmt.Errorf("Unsupported kind when getting selector: %v", obj)
}
}
func getReplicasFromRuntimeObject(obj runtime.Object) (int32, error) {
switch typed := obj.(type) {
case *v1.ReplicationController:
if typed.Spec.Replicas != nil {
return *typed.Spec.Replicas, nil
}
return 0, nil
case *extensions.ReplicaSet:
if typed.Spec.Replicas != nil {
return *typed.Spec.Replicas, nil
}
return 0, nil
case *extensions.Deployment:
if typed.Spec.Replicas != nil {
return *typed.Spec.Replicas, nil
}
return 0, nil
default:
return -1, fmt.Errorf("Unsupported kind when getting number of replicas: %v", obj)
}
}
func getReaperForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Reaper, error) {
switch kind {
case api.Kind("ReplicationController"):
return kubectl.ReaperFor(api.Kind("ReplicationController"), internalClientset)
case extensionsinternal.Kind("ReplicaSet"):
return kubectl.ReaperFor(extensionsinternal.Kind("ReplicaSet"), internalClientset)
case extensionsinternal.Kind("Deployment"):
return kubectl.ReaperFor(extensionsinternal.Kind("Deployment"), internalClientset)
default:
return nil, fmt.Errorf("Unsupported kind: %v", kind)
}
}
// DeleteResourceAndPods deletes a given resource and all pods it spawned
func DeleteResourceAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, kind schema.GroupKind, ns, name string) error {
By(fmt.Sprintf("deleting %v %s in namespace %s", kind, name, ns))
rtObject, err := getRuntimeObjectForKind(clientset, kind, ns, name)
if err != nil {
if apierrs.IsNotFound(err) {
Logf("RC %s was already deleted: %v", name, err)
Logf("%v %s not found: %v", kind, name, err)
return nil
}
return err
}
reaper, err := kubectl.ReaperForReplicationController(internalClientset.Core(), 10*time.Minute)
selector, err := getSelectorFromRuntimeObject(rtObject)
if err != nil {
if apierrs.IsNotFound(err) {
Logf("RC %s was already deleted: %v", name, err)
return nil
}
return err
}
ps, err := podStoreForRC(clientset, rc)
reaper, err := getReaperForKind(internalClientset, kind)
if err != nil {
return err
}
ps, err := podStoreForSelector(clientset, ns, selector)
if err != nil {
return err
}
@ -2787,20 +2864,20 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl
startTime := time.Now()
err = reaper.Stop(ns, name, 0, nil)
if apierrs.IsNotFound(err) {
Logf("RC %s was already deleted: %v", name, err)
Logf("%v %s was already deleted: %v", kind, name, err)
return nil
}
if err != nil {
return fmt.Errorf("error while stopping RC: %s: %v", name, err)
return fmt.Errorf("error while stopping %v: %s: %v", kind, name, err)
}
deleteRCTime := time.Now().Sub(startTime)
Logf("Deleting RC %s took: %v", name, deleteRCTime)
deleteTime := time.Now().Sub(startTime)
Logf("Deleting %v %s took: %v", kind, name, deleteTime)
err = waitForPodsInactive(ps, 10*time.Millisecond, 10*time.Minute)
if err != nil {
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
}
terminatePodTime := time.Now().Sub(startTime) - deleteRCTime
Logf("Terminating RC %s pods took: %v", name, terminatePodTime)
terminatePodTime := time.Now().Sub(startTime) - deleteTime
Logf("Terminating %v %s pods took: %v", kind, name, terminatePodTime)
// this is to relieve namespace controller's pressure when deleting the
// namespace after a test.
err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute)
@ -2810,57 +2887,75 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl
return nil
}
// DeleteRCAndWaitForGC deletes only the Replication Controller and waits for GC to delete the pods.
func DeleteRCAndWaitForGC(c clientset.Interface, ns, name string) error {
By(fmt.Sprintf("deleting replication controller %s in namespace %s, will wait for the garbage collector to delete the pods", name, ns))
rc, err := c.Core().ReplicationControllers(ns).Get(name)
func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string) error {
return DeleteResourceAndPods(clientset, internalClientset, api.Kind("ReplicationController"), ns, name)
}
// DeleteResourceAndWaitForGC deletes only given resource and waits for GC to delete the pods.
func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns, name string) error {
By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", kind, name, ns))
rtObject, err := getRuntimeObjectForKind(c, kind, ns, name)
if err != nil {
if apierrs.IsNotFound(err) {
Logf("RC %s was already deleted: %v", name, err)
Logf("%v %s not found: %v", kind, name, err)
return nil
}
return err
}
ps, err := podStoreForRC(c, rc)
selector, err := getSelectorFromRuntimeObject(rtObject)
if err != nil {
return err
}
replicas, err := getReplicasFromRuntimeObject(rtObject)
if err != nil {
return err
}
ps, err := podStoreForSelector(c, ns, selector)
if err != nil {
return err
}
defer ps.Stop()
startTime := time.Now()
falseVar := false
deleteOption := &v1.DeleteOptions{OrphanDependents: &falseVar}
err = c.Core().ReplicationControllers(ns).Delete(name, deleteOption)
err = deleteResource(c, kind, ns, name, deleteOption)
if err != nil && apierrs.IsNotFound(err) {
Logf("RC %s was already deleted: %v", name, err)
Logf("%v %s was already deleted: %v", kind, name, err)
return nil
}
if err != nil {
return err
}
deleteRCTime := time.Now().Sub(startTime)
Logf("Deleting RC %s took: %v", name, deleteRCTime)
deleteTime := time.Now().Sub(startTime)
Logf("Deleting %v %s took: %v", kind, name, deleteTime)
var interval, timeout time.Duration
switch {
case *(rc.Spec.Replicas) < 100:
case replicas < 100:
interval = 100 * time.Millisecond
case *(rc.Spec.Replicas) < 1000:
case replicas < 1000:
interval = 1 * time.Second
default:
interval = 10 * time.Second
}
if *(rc.Spec.Replicas) < 5000 {
if replicas < 5000 {
timeout = 10 * time.Minute
} else {
timeout = time.Duration(*(rc.Spec.Replicas)/gcThroughput) * time.Second
timeout = time.Duration(replicas/gcThroughput) * time.Second
// gcThroughput is pretty strict now, add a bit more to it
timeout = timeout + 3*time.Minute
}
err = waitForPodsInactive(ps, interval, timeout)
if err != nil {
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
}
terminatePodTime := time.Now().Sub(startTime) - deleteRCTime
Logf("Terminating RC %s pods took: %v", name, terminatePodTime)
terminatePodTime := time.Now().Sub(startTime) - deleteTime
Logf("Terminating %v %s pods took: %v", kind, name, terminatePodTime)
err = waitForPodsGone(ps, interval, 10*time.Minute)
if err != nil {
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
@ -2868,11 +2963,15 @@ func DeleteRCAndWaitForGC(c clientset.Interface, ns, name string) error {
return nil
}
// podStoreForRC creates a PodStore that monitors pods belong to the rc. It
// waits until the reflector does a List() before returning.
func podStoreForRC(c clientset.Interface, rc *v1.ReplicationController) (*testutils.PodStore, error) {
labels := labels.SelectorFromSet(rc.Spec.Selector)
ps := testutils.NewPodStore(c, rc.Namespace, labels, fields.Everything())
// DeleteRCAndWaitForGC deletes only the Replication Controller and waits for GC to delete the pods.
func DeleteRCAndWaitForGC(c clientset.Interface, ns, name string) error {
return DeleteResourceAndWaitForGC(c, api.Kind("ReplicationController"), ns, name)
}
// podStoreForSelector creates a PodStore that monitors pods from given namespace matching given selector.
// It waits until the reflector does a List() before returning.
func podStoreForSelector(c clientset.Interface, ns string, selector labels.Selector) (*testutils.PodStore, error) {
ps := testutils.NewPodStore(c, ns, selector, fields.Everything())
err := wait.Poll(1*time.Second, 2*time.Minute, func() (bool, error) {
if len(ps.Reflector.LastSyncResourceVersion()) != 0 {
return true, nil
@ -4312,7 +4411,7 @@ func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalcl
return err
}
if replicas == 0 {
ps, err := podStoreForRC(clientset, rc)
ps, err := podStoreForSelector(clientset, rc.Namespace, labels.SelectorFromSet(rc.Spec.Selector))
if err != nil {
return err
}

View File

@ -27,12 +27,15 @@ import (
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/transport"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/test/e2e/framework"
@ -43,15 +46,15 @@ import (
)
const (
smallRCSize = 5
mediumRCSize = 30
bigRCSize = 250
smallRCGroupName = "load-small-rc"
mediumRCGroupName = "load-medium-rc"
bigRCGroupName = "load-big-rc"
smallRCBatchSize = 30
mediumRCBatchSize = 5
bigRCBatchSize = 1
smallGroupSize = 5
mediumGroupSize = 30
bigGroupSize = 250
smallGroupName = "load-small"
mediumGroupName = "load-medium"
bigGroupName = "load-big"
smallGroupBatchSize = 30
mediumGroupBatchSize = 5
bigGroupBatchSize = 1
// We start RCs/Services/pods/... in different namespace in this test.
// nodeCountPerNamespace determines how many namespaces we will be using
// depending on the number of nodes in the underlying cluster.
@ -66,7 +69,7 @@ var _ = framework.KubeDescribe("Load capacity", func() {
var clientset clientset.Interface
var nodeCount int
var ns string
var configs []*testutils.RCConfig
var configs []testutils.RunObjectConfig
// Gathers metrics before teardown
// TODO add flag that allows to skip cleanup on failure
@ -117,20 +120,22 @@ var _ = framework.KubeDescribe("Load capacity", func() {
podsPerNode int
image string
command []string
// What kind of resource we want to create
kind schema.GroupKind
}
loadTests := []Load{
// The container will consume 1 cpu and 512mb of memory.
{podsPerNode: 3, image: "jess/stress", command: []string{"stress", "-c", "1", "-m", "2"}},
{podsPerNode: 30, image: "gcr.io/google_containers/serve_hostname:v1.4"},
{podsPerNode: 3, image: "jess/stress", command: []string{"stress", "-c", "1", "-m", "2"}, kind: api.Kind("ReplicationController")},
{podsPerNode: 30, image: "gcr.io/google_containers/serve_hostname:v1.4", kind: api.Kind("ReplicationController")},
}
for _, testArg := range loadTests {
feature := "ManualPerformance"
if testArg.podsPerNode == 30 {
if testArg.podsPerNode == 30 && testArg.kind == api.Kind("ReplicationController") {
feature = "Performance"
}
name := fmt.Sprintf("[Feature:%s] should be able to handle %v pods per node", feature, testArg.podsPerNode)
name := fmt.Sprintf("[Feature:%s] should be able to handle %v pods per node %v", feature, testArg.podsPerNode, testArg.kind)
itArg := testArg
It(name, func() {
@ -140,7 +145,7 @@ var _ = framework.KubeDescribe("Load capacity", func() {
framework.ExpectNoError(err)
totalPods := itArg.podsPerNode * nodeCount
configs = generateRCConfigs(totalPods, itArg.image, itArg.command, namespaces)
configs = generateConfigs(totalPods, itArg.image, itArg.command, namespaces, itArg.kind)
var services []*v1.Service
// Read the environment variable to see if we want to create services
createServices := os.Getenv("CREATE_SERVICES")
@ -173,7 +178,7 @@ var _ = framework.KubeDescribe("Load capacity", func() {
// We may want to revisit it in the future.
framework.Logf("Starting to create ReplicationControllers...")
creatingTime := time.Duration(totalPods/throughput) * time.Second
createAllRC(configs, creatingTime)
createAllResources(configs, creatingTime)
By("============================================================================")
// We would like to spread scaling replication controllers over time
@ -182,11 +187,11 @@ var _ = framework.KubeDescribe("Load capacity", func() {
// The expected number of created/deleted pods is less than totalPods/3.
scalingTime := time.Duration(totalPods/(3*throughput)) * time.Second
framework.Logf("Starting to scale ReplicationControllers first time...")
scaleAllRC(configs, scalingTime)
scaleAllResources(configs, scalingTime)
By("============================================================================")
framework.Logf("Starting to scale ReplicationControllers second time...")
scaleAllRC(configs, scalingTime)
scaleAllResources(configs, scalingTime)
By("============================================================================")
// Cleanup all created replication controllers.
@ -194,7 +199,7 @@ var _ = framework.KubeDescribe("Load capacity", func() {
// We may want to revisit it in the future.
deletingTime := time.Duration(totalPods/throughput) * time.Second
framework.Logf("Starting to delete ReplicationControllers...")
deleteAllRC(configs, deletingTime)
deleteAllResources(configs, deletingTime)
if createServices == "true" {
framework.Logf("Starting to delete services...")
for _, service := range services {
@ -259,27 +264,27 @@ func createClients(numberOfClients int) ([]*clientset.Clientset, []*internalclie
return clients, internalClients, nil
}
func computeRCCounts(total int) (int, int, int) {
func computePodCounts(total int) (int, int, int) {
// Small RCs owns ~0.5 of total number of pods, medium and big RCs ~0.25 each.
// For example for 3000 pods (100 nodes, 30 pods per node) there are:
// - 300 small RCs each 5 pods
// - 25 medium RCs each 30 pods
// - 3 big RCs each 250 pods
bigRCCount := total / 4 / bigRCSize
total -= bigRCCount * bigRCSize
mediumRCCount := total / 3 / mediumRCSize
total -= mediumRCCount * mediumRCSize
smallRCCount := total / smallRCSize
bigRCCount := total / 4 / bigGroupSize
total -= bigRCCount * bigGroupSize
mediumRCCount := total / 3 / mediumGroupSize
total -= mediumRCCount * mediumGroupSize
smallRCCount := total / smallGroupSize
return smallRCCount, mediumRCCount, bigRCCount
}
func generateRCConfigs(totalPods int, image string, command []string, nss []*v1.Namespace) []*testutils.RCConfig {
configs := make([]*testutils.RCConfig, 0)
func generateConfigs(totalPods int, image string, command []string, nss []*v1.Namespace, kind schema.GroupKind) []testutils.RunObjectConfig {
configs := make([]testutils.RunObjectConfig, 0)
smallRCCount, mediumRCCount, bigRCCount := computeRCCounts(totalPods)
configs = append(configs, generateRCConfigsForGroup(nss, smallRCGroupName, smallRCSize, smallRCCount, image, command)...)
configs = append(configs, generateRCConfigsForGroup(nss, mediumRCGroupName, mediumRCSize, mediumRCCount, image, command)...)
configs = append(configs, generateRCConfigsForGroup(nss, bigRCGroupName, bigRCSize, bigRCCount, image, command)...)
smallRCCount, mediumRCCount, bigRCCount := computePodCounts(totalPods)
configs = append(configs, generateConfigsForGroup(nss, smallGroupName, smallGroupSize, smallRCCount, image, command, kind)...)
configs = append(configs, generateConfigsForGroup(nss, mediumGroupName, mediumGroupSize, mediumRCCount, image, command, kind)...)
configs = append(configs, generateConfigsForGroup(nss, bigGroupName, bigGroupSize, bigRCCount, image, command, kind)...)
// Create a number of clients to better simulate real usecase
// where not everyone is using exactly the same client.
@ -288,18 +293,18 @@ func generateRCConfigs(totalPods int, image string, command []string, nss []*v1.
framework.ExpectNoError(err)
for i := 0; i < len(configs); i++ {
configs[i].Client = clients[i%len(clients)]
configs[i].InternalClient = internalClients[i%len(internalClients)]
configs[i].SetClient(clients[i%len(clients)])
configs[i].SetInternalClient(internalClients[i%len(internalClients)])
}
return configs
}
func generateRCConfigsForGroup(
nss []*v1.Namespace, groupName string, size, count int, image string, command []string) []*testutils.RCConfig {
configs := make([]*testutils.RCConfig, 0, count)
func generateConfigsForGroup(
nss []*v1.Namespace, groupName string, size, count int, image string, command []string, kind schema.GroupKind) []testutils.RunObjectConfig {
configs := make([]testutils.RunObjectConfig, 0, count)
for i := 1; i <= count; i++ {
config := &testutils.RCConfig{
baseConfig := &testutils.RCConfig{
Client: nil, // this will be overwritten later
InternalClient: nil, // this will be overwritten later
Name: groupName + "-" + strconv.Itoa(i),
@ -311,20 +316,31 @@ func generateRCConfigsForGroup(
CpuRequest: 10, // 0.01 core
MemRequest: 26214400, // 25MB
}
var config testutils.RunObjectConfig
switch kind {
case api.Kind("ReplicationController"):
config = baseConfig
case extensions.Kind("ReplicaSet"):
config = &testutils.ReplicaSetConfig{RCConfig: *baseConfig}
case extensions.Kind("Deployment"):
config = &testutils.DeploymentConfig{RCConfig: *baseConfig}
default:
framework.Failf("Unsupported kind for config creation: %v", kind)
}
configs = append(configs, config)
}
return configs
}
func generateServicesForConfigs(configs []*testutils.RCConfig) []*v1.Service {
func generateServicesForConfigs(configs []testutils.RunObjectConfig) []*v1.Service {
services := make([]*v1.Service, 0, len(configs))
for _, config := range configs {
serviceName := config.Name + "-svc"
labels := map[string]string{"name": config.Name}
serviceName := config.GetName() + "-svc"
labels := map[string]string{"name": config.GetName()}
service := &v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: serviceName,
Namespace: config.Namespace,
Namespace: config.GetNamespace(),
},
Spec: v1.ServiceSpec{
Selector: labels,
@ -343,69 +359,75 @@ func sleepUpTo(d time.Duration) {
time.Sleep(time.Duration(rand.Int63n(d.Nanoseconds())))
}
func createAllRC(configs []*testutils.RCConfig, creatingTime time.Duration) {
func createAllResources(configs []testutils.RunObjectConfig, creatingTime time.Duration) {
var wg sync.WaitGroup
wg.Add(len(configs))
for _, config := range configs {
go createRC(&wg, config, creatingTime)
go createResource(&wg, config, creatingTime)
}
wg.Wait()
}
func createRC(wg *sync.WaitGroup, config *testutils.RCConfig, creatingTime time.Duration) {
func createResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, creatingTime time.Duration) {
defer GinkgoRecover()
defer wg.Done()
sleepUpTo(creatingTime)
framework.ExpectNoError(framework.RunRC(*config), fmt.Sprintf("creating rc %s", config.Name))
framework.ExpectNoError(config.Run(), fmt.Sprintf("creating %v %s", config.GetKind(), config.GetName()))
}
func scaleAllRC(configs []*testutils.RCConfig, scalingTime time.Duration) {
func scaleAllResources(configs []testutils.RunObjectConfig, scalingTime time.Duration) {
var wg sync.WaitGroup
wg.Add(len(configs))
for _, config := range configs {
go scaleRC(&wg, config, scalingTime)
go scaleResource(&wg, config, scalingTime)
}
wg.Wait()
}
// Scales RC to a random size within [0.5*size, 1.5*size] and lists all the pods afterwards.
// Scaling happens always based on original size, not the current size.
func scaleRC(wg *sync.WaitGroup, config *testutils.RCConfig, scalingTime time.Duration) {
func scaleResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, scalingTime time.Duration) {
defer GinkgoRecover()
defer wg.Done()
sleepUpTo(scalingTime)
newSize := uint(rand.Intn(config.Replicas) + config.Replicas/2)
framework.ExpectNoError(framework.ScaleRC(config.Client, config.InternalClient, config.Namespace, config.Name, newSize, true),
fmt.Sprintf("scaling rc %s for the first time", config.Name))
selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
newSize := uint(rand.Intn(config.GetReplicas()) + config.GetReplicas()/2)
framework.ExpectNoError(framework.ScaleResource(
config.GetClient(), config.GetInternalClient(), config.GetNamespace(), config.GetName(), newSize, true, config.GetKind()),
fmt.Sprintf("scaling rc %s for the first time", config.GetName()))
selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.GetName()}))
options := v1.ListOptions{
LabelSelector: selector.String(),
ResourceVersion: "0",
}
_, err := config.Client.Core().Pods(config.Namespace).List(options)
framework.ExpectNoError(err, fmt.Sprintf("listing pods from rc %v", config.Name))
_, err := config.GetClient().Core().Pods(config.GetNamespace()).List(options)
framework.ExpectNoError(err, fmt.Sprintf("listing pods from rc %v", config.GetName()))
}
func deleteAllRC(configs []*testutils.RCConfig, deletingTime time.Duration) {
func deleteAllResources(configs []testutils.RunObjectConfig, deletingTime time.Duration) {
var wg sync.WaitGroup
wg.Add(len(configs))
for _, config := range configs {
go deleteRC(&wg, config, deletingTime)
go deleteResource(&wg, config, deletingTime)
}
wg.Wait()
}
func deleteRC(wg *sync.WaitGroup, config *testutils.RCConfig, deletingTime time.Duration) {
func deleteResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, deletingTime time.Duration) {
defer GinkgoRecover()
defer wg.Done()
sleepUpTo(deletingTime)
if framework.TestContext.GarbageCollectorEnabled {
framework.ExpectNoError(framework.DeleteRCAndWaitForGC(config.Client, config.Namespace, config.Name), fmt.Sprintf("deleting rc %s", config.Name))
if framework.TestContext.GarbageCollectorEnabled && config.GetKind() != extensions.Kind("Deployment") {
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(
config.GetClient(), config.GetKind(), config.GetNamespace(), config.GetName()),
fmt.Sprintf("deleting %v %s", config.GetKind(), config.GetName()))
} else {
framework.ExpectNoError(framework.DeleteRCAndPods(config.Client, config.InternalClient, config.Namespace, config.Name), fmt.Sprintf("deleting rc %s", config.Name))
framework.ExpectNoError(framework.DeleteResourceAndPods(
config.GetClient(), config.GetInternalClient(), config.GetKind(), config.GetNamespace(), config.GetName()),
fmt.Sprintf("deleting %v %s", config.GetKind(), config.GetName()))
}
}

View File

@ -104,7 +104,7 @@ func (h *haproxyControllerTester) start(namespace string) (err error) {
if err != nil {
return
}
if err = framework.WaitForRCPodsRunning(h.client, namespace, rc.Name); err != nil {
if err = framework.WaitForControlledPodsRunning(h.client, namespace, rc.Name, api.Kind("ReplicationController")); err != nil {
return
}
h.rcName = rc.Name
@ -171,7 +171,7 @@ func (s *ingManager) start(namespace string) (err error) {
if err != nil {
return
}
if err = framework.WaitForRCPodsRunning(s.client, rc.Namespace, rc.Name); err != nil {
if err = framework.WaitForControlledPodsRunning(s.client, rc.Namespace, rc.Name, api.Kind("ReplicationController")); err != nil {
return
}
}

View File

@ -25,6 +25,7 @@ go_library(
"//pkg/api/errors:go_default_library",
"//pkg/api/resource:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/client/cache:go_default_library",
@ -33,6 +34,7 @@ go_library(
"//pkg/fields:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/uuid:go_default_library",
"//pkg/util/workqueue:go_default_library",

View File

@ -27,12 +27,14 @@ import (
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/v1"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/workqueue"
@ -45,6 +47,18 @@ const (
nonExist = "NonExist"
)
type RunObjectConfig interface {
Run() error
GetName() string
GetNamespace() string
GetKind() schema.GroupKind
GetClient() clientset.Interface
GetInternalClient() internalclientset.Interface
SetClient(clientset.Interface)
SetInternalClient(internalclientset.Interface)
GetReplicas() int
}
type RCConfig struct {
Client clientset.Interface
InternalClient internalclientset.Interface
@ -193,6 +207,14 @@ func RunDeployment(config DeploymentConfig) error {
return config.start()
}
func (config *DeploymentConfig) Run() error {
return RunDeployment(*config)
}
func (config *DeploymentConfig) GetKind() schema.GroupKind {
return extensionsinternal.Kind("Deployment")
}
func (config *DeploymentConfig) create() error {
deployment := &extensions.Deployment{
ObjectMeta: v1.ObjectMeta{
@ -245,6 +267,14 @@ func RunReplicaSet(config ReplicaSetConfig) error {
return config.start()
}
func (config *ReplicaSetConfig) Run() error {
return RunReplicaSet(*config)
}
func (config *ReplicaSetConfig) GetKind() schema.GroupKind {
return extensionsinternal.Kind("ReplicaSet")
}
func (config *ReplicaSetConfig) create() error {
rs := &extensions.ReplicaSet{
ObjectMeta: v1.ObjectMeta{
@ -297,6 +327,42 @@ func RunRC(config RCConfig) error {
return config.start()
}
func (config *RCConfig) Run() error {
return RunRC(*config)
}
func (config *RCConfig) GetName() string {
return config.Name
}
func (config *RCConfig) GetNamespace() string {
return config.Namespace
}
func (config *RCConfig) GetKind() schema.GroupKind {
return api.Kind("ReplicationController")
}
func (config *RCConfig) GetClient() clientset.Interface {
return config.Client
}
func (config *RCConfig) GetInternalClient() internalclientset.Interface {
return config.InternalClient
}
func (config *RCConfig) SetClient(c clientset.Interface) {
config.Client = c
}
func (config *RCConfig) SetInternalClient(c internalclientset.Interface) {
config.InternalClient = c
}
func (config *RCConfig) GetReplicas() int {
return config.Replicas
}
func (config *RCConfig) create() error {
dnsDefault := v1.DNSDefault
if config.DNSPolicy == nil {