diff --git a/pkg/kubectl/resize.go b/pkg/kubectl/resize.go index 68aaafc886f..4b78bf40bf1 100644 --- a/pkg/kubectl/resize.go +++ b/pkg/kubectl/resize.go @@ -105,6 +105,10 @@ type RetryParams struct { interval, timeout time.Duration } +func NewRetryParams(interval, timeout time.Duration) *RetryParams { + return &RetryParams{interval, timeout} +} + // ResizeCondition is a closure around Resize that facilitates retries via util.wait func ResizeCondition(r Resizer, precondition *ResizePrecondition, namespace, name string, count uint) wait.ConditionFunc { return func() (bool, error) { diff --git a/test/e2e/load.go b/test/e2e/load.go new file mode 100644 index 00000000000..529421db5d9 --- /dev/null +++ b/test/e2e/load.go @@ -0,0 +1,151 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + image = "gcr.io/google_containers/serve_hostname:1.1" + simulationTime = 20 * time.Minute + smallRCSize = 5 + mediumRCSize = 30 + bigRCSize = 250 +) + +// This test suite can take a long time to run, so by default it is added to +// the ginkgo.skip list (see driver.go). +// To run this suite you must explicitly ask for it by setting the +// -t/--test flag or ginkgo.focus flag. +var _ = Describe("Load", func() { + var c *client.Client + var nodeCount int + var ns string + + BeforeEach(func() { + var err error + c, err = loadClient() + expectNoError(err) + nodes, err := c.Nodes().List(labels.Everything(), fields.Everything()) + expectNoError(err) + nodeCount = len(nodes.Items) + Expect(nodeCount).NotTo(BeZero()) + nsForTesting, err := createTestingNS("load", c) + ns = nsForTesting.Name + expectNoError(err) + }) + + // TODO add flag that allows to skip cleanup on failure + AfterEach(func() { + By(fmt.Sprintf("Destroying namespace for this suite %v", ns)) + if err := c.Namespaces().Delete(ns); err != nil { + Failf("Couldn't delete ns %s", err) + } + }) + + type Load struct { + skip bool + podsPerNode int + } + + loadTests := []Load{ + {podsPerNode: 30, skip: true}, + } + + for _, testArg := range loadTests { + name := fmt.Sprintf("should be able to handle %v pods per node", testArg.podsPerNode) + if testArg.skip { + name = "[Skipped] " + name + } + + It(name, func() { + totalPods := testArg.podsPerNode * nodeCount + smallRCCount, mediumRCCount, bigRCCount := computeRCCounts(totalPods) + threads := smallRCCount + mediumRCCount + bigRCCount + + var wg sync.WaitGroup + wg.Add(threads) + + // Run RC load for all kinds of RC. + runRCLoad(c, &wg, ns, smallRCSize, smallRCCount) + runRCLoad(c, &wg, ns, mediumRCSize, mediumRCCount) + runRCLoad(c, &wg, ns, bigRCSize, bigRCCount) + + // Wait for all the pods from all the RC's to return. + wg.Wait() + // TODO verify latency metrics + }) + } +}) + +func computeRCCounts(total int) (int, int, int) { + // Small RCs owns ~0.5 of total number of pods, medium and big RCs ~0.25 each. + // For example for 3000 pods (100 nodes, 30 pods per node) there are: + // - 500 small RCs each 5 pods + // - 25 medium RCs each 30 pods + // - 3 big RCs each 250 pods + bigRCCount := total / 4 / bigRCSize + mediumRCCount := (total - bigRCCount*bigRCSize) / 3 / mediumRCSize + smallRCCount := (total - bigRCCount*bigRCSize - mediumRCCount*mediumRCSize) / smallRCSize + return smallRCCount, mediumRCCount, bigRCCount +} + +// The function creates a RC and then every few second resize it and with 0.1 probability deletes it. +func playWithRC(c *client.Client, wg *sync.WaitGroup, ns string, size int) { + defer GinkgoRecover() + defer wg.Done() + rcExist := false + var name string + // Once every 1-2 minutes perform resize of RC. + for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) { + if !rcExist { + name = "load-test-" + string(util.NewUUID()) + expectNoError(RunRC(c, name, ns, image, size)) + rcExist = true + } + // Resize RC to a random size between 0.5x and 1.5x of the original size. + newSize := uint(rand.Intn(size+1) + size/2) + expectNoError(ResizeRC(c, ns, name, newSize)) + // With probability 0.1 remove this RC. + if rand.Intn(10) == 0 { + expectNoError(DeleteRC(c, ns, name)) + rcExist = false + } + } + if rcExist { + expectNoError(DeleteRC(c, ns, name)) + } +} + +func runRCLoad(c *client.Client, wg *sync.WaitGroup, ns string, size, count int) { + By(fmt.Sprintf("Running %v Replication Controllers with size %v and playing with them", count, size)) + for i := 0; i < count; i++ { + go playWithRC(c, wg, ns, size) + } +} diff --git a/test/e2e/util.go b/test/e2e/util.go index d6c7bda21d3..29b08e8e726 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -36,9 +36,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd" "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "golang.org/x/crypto/ssh" @@ -409,41 +409,6 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod * } } -// Delete a Replication Controller and all pods it spawned -func DeleteRC(c *client.Client, ns, name string) error { - rc, err := c.ReplicationControllers(ns).Get(name) - if err != nil { - return fmt.Errorf("Failed to find replication controller %s in namespace %s: %v", name, ns, err) - } - - rc.Spec.Replicas = 0 - - if _, err := c.ReplicationControllers(ns).Update(rc); err != nil { - return fmt.Errorf("Failed to resize replication controller %s to zero: %v", name, err) - } - - // Wait up to 20 minutes until all replicas are killed. - endTime := time.Now().Add(time.Minute * 20) - for { - if time.Now().After(endTime) { - return fmt.Errorf("Timeout while waiting for replication controller %s replicas to 0", name) - } - remainingTime := endTime.Sub(time.Now()) - err := wait.Poll(time.Second, remainingTime, client.ControllerHasDesiredReplicas(c, rc)) - if err != nil { - Logf("Error while waiting for replication controller %s replicas to read 0: %v", name, err) - } else { - break - } - } - - // Delete the replication controller. - if err := c.ReplicationControllers(ns).Delete(name); err != nil { - return fmt.Errorf("Failed to delete replication controller %s: %v", name, err) - } - return nil -} - // RunRC Launches (and verifies correctness) of a Replication Controller // It will waits for all pods it spawns to become "Running". // It's the caller's responsibility to clean up externally (i.e. use the @@ -574,6 +539,54 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error return nil } +func ResizeRC(c *client.Client, ns, name string, size uint) error { + By(fmt.Sprintf("Resizing replication controller %s in namespace %s to %d", name, ns, size)) + resizer, err := kubectl.ResizerFor("ReplicationController", kubectl.NewResizerClient(c)) + if err != nil { + return err + } + waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) + if err = resizer.Resize(ns, name, size, nil, nil, waitForReplicas); err != nil { + return err + } + return waitForRCPodsRunning(c, ns, name) +} + +// Wait up to 10 minutes for pods to become Running. +func waitForRCPodsRunning(c *client.Client, ns, rcName string) error { + running := false + label := labels.SelectorFromSet(labels.Set(map[string]string{"name": rcName})) + for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) { + pods, err := listPods(c, ns, label, fields.Everything()) + if err != nil { + Logf("Error listing pods: %v", err) + continue + } + for _, p := range pods.Items { + if p.Status.Phase != api.PodRunning { + continue + } + } + running = true + break + } + if !running { + return fmt.Errorf("Timeout while waiting for replication controller %s pods to be running", rcName) + } + return nil +} + +// Delete a Replication Controller and all pods it spawned +func DeleteRC(c *client.Client, ns, name string) error { + By(fmt.Sprintf("Deleting replication controller %s in namespace %s", name, ns)) + reaper, err := kubectl.ReaperFor("ReplicationController", c) + if err != nil { + return err + } + _, err = reaper.Stop(ns, name, api.NewDeleteOptions(0)) + return err +} + // Convenient wrapper around listing pods supporting retries. func listPods(c *client.Client, namespace string, label labels.Selector, field fields.Selector) (*api.PodList, error) { maxRetries := 4