added RunDeployment to allow horizontal pod autoscaling e2e tests to use deployments

added deployment-based e2e tests for horizontal pod autoscaling
adjusted to changes from PR #16330
changed test titles according to PR comments & to merge change from PR #16895
This commit is contained in:
Alexander Vollschwitz 2015-10-30 14:48:32 +01:00
parent 4566e039bf
commit 108cb4121d
3 changed files with 207 additions and 79 deletions

View File

@ -40,6 +40,8 @@ const (
startServiceInterval = 5 * time.Second
resourceConsumerImage = "gcr.io/google_containers/resource_consumer:beta"
rcIsNil = "ERROR: replicationController = nil"
deploymentIsNil = "ERROR: deployment = nil"
invalidKind = "ERROR: invalid workload kind for resource consumer"
)
/*
@ -52,6 +54,7 @@ rc.ConsumeCPU(300)
*/
type ResourceConsumer struct {
name string
kind string
framework *Framework
cpu chan int
mem chan int
@ -63,12 +66,13 @@ type ResourceConsumer struct {
requestSizeInMegabytes int
}
func NewDynamicResourceConsumer(name string, replicas, initCPUTotal, initMemoryTotal int, cpuLimit, memLimit int64, framework *Framework) *ResourceConsumer {
return newResourceConsumer(name, replicas, initCPUTotal, initMemoryTotal, dynamicConsumptionTimeInSeconds, dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, cpuLimit, memLimit, framework)
func NewDynamicResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTotal int, cpuLimit, memLimit int64, framework *Framework) *ResourceConsumer {
return newResourceConsumer(name, kind, replicas, initCPUTotal, initMemoryTotal, dynamicConsumptionTimeInSeconds, dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, cpuLimit, memLimit, framework)
}
// TODO this still defaults to replication controller
func NewStaticResourceConsumer(name string, replicas, initCPUTotal, initMemoryTotal int, cpuLimit, memLimit int64, framework *Framework) *ResourceConsumer {
return newResourceConsumer(name, replicas, initCPUTotal, initMemoryTotal, staticConsumptionTimeInSeconds, initCPUTotal/replicas, initMemoryTotal/replicas, cpuLimit, memLimit, framework)
return newResourceConsumer(name, kindRC, replicas, initCPUTotal, initMemoryTotal, staticConsumptionTimeInSeconds, initCPUTotal/replicas, initMemoryTotal/replicas, cpuLimit, memLimit, framework)
}
/*
@ -78,10 +82,11 @@ initMemoryTotal argument is in megabytes
memLimit argument is in megabytes, memLimit is a maximum amount of memory that can be consumed by a single pod
cpuLimit argument is in millicores, cpuLimit is a maximum amount of cpu that can be consumed by a single pod
*/
func newResourceConsumer(name string, replicas, initCPUTotal, initMemoryTotal, consumptionTimeInSeconds, requestSizeInMillicores, requestSizeInMegabytes int, cpuLimit, memLimit int64, framework *Framework) *ResourceConsumer {
runServiceAndRCForResourceConsumer(framework.Client, framework.Namespace.Name, name, replicas, cpuLimit, memLimit)
func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTotal, consumptionTimeInSeconds, requestSizeInMillicores, requestSizeInMegabytes int, cpuLimit, memLimit int64, framework *Framework) *ResourceConsumer {
runServiceAndWorkloadForResourceConsumer(framework.Client, framework.Namespace.Name, name, kind, replicas, cpuLimit, memLimit)
rc := &ResourceConsumer{
name: name,
kind: kind,
framework: framework,
cpu: make(chan int),
mem: make(chan int),
@ -210,22 +215,35 @@ func (rc *ResourceConsumer) sendOneConsumeMemRequest(megabytes int, durationSec
}
func (rc *ResourceConsumer) GetReplicas() int {
replicationController, err := rc.framework.Client.ReplicationControllers(rc.framework.Namespace.Name).Get(rc.name)
expectNoError(err)
if replicationController == nil {
Failf(rcIsNil)
switch rc.kind {
case kindRC:
replicationController, err := rc.framework.Client.ReplicationControllers(rc.framework.Namespace.Name).Get(rc.name)
expectNoError(err)
if replicationController == nil {
Failf(rcIsNil)
}
return replicationController.Status.Replicas
case kindDeployment:
deployment, err := rc.framework.Client.Deployments(rc.framework.Namespace.Name).Get(rc.name)
expectNoError(err)
if deployment == nil {
Failf(deploymentIsNil)
}
return deployment.Status.Replicas
default:
Failf(invalidKind)
}
return replicationController.Status.Replicas
return 0
}
func (rc *ResourceConsumer) WaitForReplicas(desiredReplicas int) {
timeout := 10 * time.Minute
for start := time.Now(); time.Since(start) < timeout; time.Sleep(20 * time.Second) {
if desiredReplicas == rc.GetReplicas() {
Logf("Replication Controller current replicas number is equal to desired replicas number: %d", desiredReplicas)
Logf("%s: current replicas number is equal to desired replicas number: %d", rc.kind, desiredReplicas)
return
} else {
Logf("Replication Controller current replicas number %d waiting to be %d", rc.GetReplicas(), desiredReplicas)
Logf("%s: current replicas number %d waiting to be %d", rc.kind, rc.GetReplicas(), desiredReplicas)
}
}
Failf("timeout waiting %v for pods size to be %d", timeout, desiredReplicas)
@ -252,8 +270,8 @@ func (rc *ResourceConsumer) CleanUp() {
expectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.name))
}
func runServiceAndRCForResourceConsumer(c *client.Client, ns, name string, replicas int, cpuLimitMillis, memLimitMb int64) {
By(fmt.Sprintf("Running consuming RC %s with %v replicas", name, replicas))
func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind string, replicas int, cpuLimitMillis, memLimitMb int64) {
By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas))
_, err := c.Services(ns).Create(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: name,
@ -270,7 +288,8 @@ func runServiceAndRCForResourceConsumer(c *client.Client, ns, name string, repli
},
})
expectNoError(err)
config := RCConfig{
rcConfig := RCConfig{
Client: c,
Image: resourceConsumerImage,
Name: name,
@ -282,7 +301,21 @@ func runServiceAndRCForResourceConsumer(c *client.Client, ns, name string, repli
MemRequest: memLimitMb * 1024 * 1024, // MemLimit is in bytes
MemLimit: memLimitMb * 1024 * 1024,
}
expectNoError(RunRC(config))
switch kind {
case kindRC:
expectNoError(RunRC(rcConfig))
break
case kindDeployment:
dpConfig := DeploymentConfig{
rcConfig,
}
expectNoError(RunDeployment(dpConfig))
break
default:
Failf(invalidKind)
}
// Make sure endpoints are propagated.
// TODO(piosz): replace sleep with endpoints watch.
time.Sleep(10 * time.Second)

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api"
@ -26,37 +27,57 @@ import (
)
const (
kind = "replicationController"
kindRC = "replicationController"
kindDeployment = "deployment"
subresource = "scale"
stabilityTimeout = 10 * time.Minute
)
var _ = Describe("Horizontal pod autoscaling", func() {
var rc *ResourceConsumer
f := NewFramework("horizontal-pod-autoscaling")
// CPU tests
It("[Autoscaling Suite] should scale from 1 pod to 3 pods and from 3 to 5 (scale resource: CPU)", func() {
rc = NewDynamicResourceConsumer("rc", 1, 250, 0, 500, 100, f)
defer rc.CleanUp()
createCPUHorizontalPodAutoscaler(rc, 20)
rc.WaitForReplicas(3)
rc.EnsureDesiredReplicas(3, stabilityTimeout)
rc.ConsumeCPU(700)
rc.WaitForReplicas(5)
titleUp := "%s should scale from 1 pod to 3 pods and from 3 to 5 (via %s, with scale resource: CPU)"
titleDown := "%s should scale from 5 pods to 3 pods and from 3 to 1 (via %s, with scale resource: CPU)"
// CPU tests via deployments
It(fmt.Sprintf(titleUp, "[Skipped]", kindDeployment), func() {
scaleUp("deployment", kindDeployment, rc, f)
})
It(fmt.Sprintf(titleDown, "[Skipped]", kindDeployment), func() {
scaleDown("deployment", kindDeployment, rc, f)
})
It("[Autoscaling Suite] should scale from 5 pods to 3 pods and from 3 to 1 (scale resource: CPU)", func() {
rc = NewDynamicResourceConsumer("rc", 5, 400, 0, 500, 100, f)
defer rc.CleanUp()
createCPUHorizontalPodAutoscaler(rc, 30)
rc.WaitForReplicas(3)
rc.EnsureDesiredReplicas(3, stabilityTimeout)
rc.ConsumeCPU(100)
rc.WaitForReplicas(1)
// CPU tests via replication controllers
It(fmt.Sprintf(titleUp, "[Autoscaling Suite]", kindRC), func() {
scaleUp("rc", kindRC, rc, f)
})
It(fmt.Sprintf(titleDown, "[Autoscaling Suite]", kindRC), func() {
scaleDown("rc", kindRC, rc, f)
})
})
func scaleUp(name, kind string, rc *ResourceConsumer, f *Framework) {
rc = NewDynamicResourceConsumer(name, kind, 1, 250, 0, 500, 100, f)
defer rc.CleanUp()
createCPUHorizontalPodAutoscaler(rc, 20)
rc.WaitForReplicas(3)
rc.EnsureDesiredReplicas(3, stabilityTimeout)
rc.ConsumeCPU(700)
rc.WaitForReplicas(5)
}
func scaleDown(name, kind string, rc *ResourceConsumer, f *Framework) {
rc = NewDynamicResourceConsumer(name, kind, 5, 400, 0, 500, 100, f)
defer rc.CleanUp()
createCPUHorizontalPodAutoscaler(rc, 30)
rc.WaitForReplicas(3)
rc.EnsureDesiredReplicas(3, stabilityTimeout)
rc.ConsumeCPU(100)
rc.WaitForReplicas(1)
}
func createCPUHorizontalPodAutoscaler(rc *ResourceConsumer, cpu int) {
minReplicas := 1
hpa := &extensions.HorizontalPodAutoscaler{
@ -66,7 +87,7 @@ func createCPUHorizontalPodAutoscaler(rc *ResourceConsumer, cpu int) {
},
Spec: extensions.HorizontalPodAutoscalerSpec{
ScaleRef: extensions.SubresourceReference{
Kind: kind,
Kind: rc.kind,
Name: rc.name,
Subresource: subresource,
},

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/api"
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
@ -204,6 +205,10 @@ type RCConfig struct {
MaxContainerFailures *int
}
type DeploymentConfig struct {
RCConfig
}
func nowStamp() string {
return time.Now().Format(time.StampMilli)
}
@ -1251,21 +1256,71 @@ func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff {
return podInfoMap
}
// RunDeployment Launches (and verifies correctness) of a Deployment
// and will wait for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling cleanup).
func RunDeployment(config DeploymentConfig) error {
err := config.create()
if err != nil {
return err
}
return config.start()
}
func (config *DeploymentConfig) create() error {
By(fmt.Sprintf("creating deployment %s in namespace %s", config.Name, config.Namespace))
deployment := &extensions.Deployment{
ObjectMeta: api.ObjectMeta{
Name: config.Name,
},
Spec: extensions.DeploymentSpec{
Replicas: config.Replicas,
Selector: map[string]string{
"name": config.Name,
},
UniqueLabelKey: "deployment.kubernetes.io/podTemplateHash",
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"name": config.Name},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: config.Name,
Image: config.Image,
Command: config.Command,
Ports: []api.ContainerPort{{ContainerPort: 80}},
},
},
},
},
},
}
config.applyTo(&deployment.Spec.Template)
_, err := config.Client.Deployments(config.Namespace).Create(deployment)
if err != nil {
return fmt.Errorf("Error creating deployment: %v", err)
}
Logf("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, deployment.Spec.Replicas)
return nil
}
// RunRC Launches (and verifies correctness) of a Replication Controller
// and will wait for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling cleanup).
func RunRC(config RCConfig) error {
// Don't force tests to fail if they don't care about containers restarting.
var maxContainerFailures int
if config.MaxContainerFailures == nil {
maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
} else {
maxContainerFailures = *config.MaxContainerFailures
err := config.create()
if err != nil {
return err
}
return config.start()
}
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
func (config *RCConfig) create() error {
By(fmt.Sprintf("creating replication controller %s in namespace %s", config.Name, config.Namespace))
rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
@ -1293,47 +1348,66 @@ func RunRC(config RCConfig) error {
},
},
}
if config.Env != nil {
for k, v := range config.Env {
c := &rc.Spec.Template.Spec.Containers[0]
c.Env = append(c.Env, api.EnvVar{Name: k, Value: v})
}
}
if config.Labels != nil {
for k, v := range config.Labels {
rc.Spec.Template.ObjectMeta.Labels[k] = v
}
}
if config.Ports != nil {
for k, v := range config.Ports {
c := &rc.Spec.Template.Spec.Containers[0]
c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: v})
}
}
if config.CpuLimit > 0 || config.MemLimit > 0 {
rc.Spec.Template.Spec.Containers[0].Resources.Limits = api.ResourceList{}
}
if config.CpuLimit > 0 {
rc.Spec.Template.Spec.Containers[0].Resources.Limits[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
}
if config.MemLimit > 0 {
rc.Spec.Template.Spec.Containers[0].Resources.Limits[api.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
}
if config.CpuRequest > 0 || config.MemRequest > 0 {
rc.Spec.Template.Spec.Containers[0].Resources.Requests = api.ResourceList{}
}
if config.CpuRequest > 0 {
rc.Spec.Template.Spec.Containers[0].Resources.Requests[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
}
if config.MemRequest > 0 {
rc.Spec.Template.Spec.Containers[0].Resources.Requests[api.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
}
config.applyTo(rc.Spec.Template)
_, err := config.Client.ReplicationControllers(config.Namespace).Create(rc)
if err != nil {
return fmt.Errorf("Error creating replication controller: %v", err)
}
Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, rc.Spec.Replicas)
return nil
}
func (config *RCConfig) applyTo(template *api.PodTemplateSpec) {
if config.Env != nil {
for k, v := range config.Env {
c := &template.Spec.Containers[0]
c.Env = append(c.Env, api.EnvVar{Name: k, Value: v})
}
}
if config.Labels != nil {
for k, v := range config.Labels {
template.ObjectMeta.Labels[k] = v
}
}
if config.Ports != nil {
for k, v := range config.Ports {
c := &template.Spec.Containers[0]
c.Ports = append(c.Ports, api.ContainerPort{Name: k, ContainerPort: v})
}
}
if config.CpuLimit > 0 || config.MemLimit > 0 {
template.Spec.Containers[0].Resources.Limits = api.ResourceList{}
}
if config.CpuLimit > 0 {
template.Spec.Containers[0].Resources.Limits[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
}
if config.MemLimit > 0 {
template.Spec.Containers[0].Resources.Limits[api.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
}
if config.CpuRequest > 0 || config.MemRequest > 0 {
template.Spec.Containers[0].Resources.Requests = api.ResourceList{}
}
if config.CpuRequest > 0 {
template.Spec.Containers[0].Resources.Requests[api.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
}
if config.MemRequest > 0 {
template.Spec.Containers[0].Resources.Requests[api.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
}
}
func (config *RCConfig) start() error {
// Don't force tests to fail if they don't care about containers restarting.
var maxContainerFailures int
if config.MaxContainerFailures == nil {
maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
} else {
maxContainerFailures = *config.MaxContainerFailures
}
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
podStore := newPodStore(config.Client, config.Namespace, label, fields.Everything())
defer podStore.Stop()
@ -1406,7 +1480,7 @@ func RunRC(config RCConfig) error {
}
Logf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
rc.Name, len(pods), config.Replicas, running, pending, waiting, inactive, terminating, unknown, runningButNotReady)
config.Name, len(pods), config.Replicas, running, pending, waiting, inactive, terminating, unknown, runningButNotReady)
promPushRunningPending(running, pending)