mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Implementation of e2e test that resizes cluster (works on GCE).
This commit is contained in:
parent
d9d12fd3f7
commit
ef965a60f2
@ -77,6 +77,12 @@ else
|
||||
)
|
||||
fi
|
||||
|
||||
if [[ -n "${NODE_INSTANCE_PREFIX:-}" ]]; then
|
||||
NODE_INSTANCE_GROUP="${NODE_INSTANCE_PREFIX}-group"
|
||||
else
|
||||
NODE_INSTANCE_GROUP=""
|
||||
fi
|
||||
|
||||
ginkgo_args=()
|
||||
if [[ ${GINKGO_PARALLEL} =~ ^[yY]$ ]]; then
|
||||
ginkgo_args+=("-p")
|
||||
@ -93,5 +99,7 @@ fi
|
||||
--gce-zone="${ZONE:-}" \
|
||||
--kube-master="${KUBE_MASTER:-}" \
|
||||
--repo-root="${KUBE_VERSION_ROOT}" \
|
||||
--node-instance-group="${NODE_INSTANCE_GROUP:-}" \
|
||||
--num-nodes="${NUM_MINIONS:-}" \
|
||||
${E2E_REPORT_DIR+"--report-dir=${E2E_REPORT_DIR}"} \
|
||||
"${@:-}"
|
||||
|
@ -84,6 +84,8 @@ func init() {
|
||||
flag.StringVar(&cloudConfig.MasterName, "kube-master", "", "Name of the kubernetes master. Only required if provider is gce or gke")
|
||||
flag.StringVar(&cloudConfig.ProjectID, "gce-project", "", "The GCE project being used, if applicable")
|
||||
flag.StringVar(&cloudConfig.Zone, "gce-zone", "", "GCE zone being used, if applicable")
|
||||
flag.StringVar(&cloudConfig.NodeInstanceGroup, "node-instance-group", "", "Name of the managed instance group for nodes. Valid only for gce")
|
||||
flag.IntVar(&cloudConfig.NumNodes, "num-nodes", -1, "Number of nodes in the cluster")
|
||||
}
|
||||
|
||||
func TestE2E(t *testing.T) {
|
||||
|
@ -141,7 +141,7 @@ func ServeImageOrFail(c *client.Client, test string, image string) {
|
||||
By("Trying to dial each unique pod")
|
||||
retryTimeout := 2 * time.Minute
|
||||
retryInterval := 5 * time.Second
|
||||
err = wait.Poll(retryInterval, retryTimeout, responseChecker{c, ns, label, name, pods}.checkAllResponses)
|
||||
err = wait.Poll(retryInterval, retryTimeout, podResponseChecker{c, ns, label, name, pods}.checkAllResponses)
|
||||
if err != nil {
|
||||
Failf("Did not get expected responses within the timeout period of %.2f seconds.", retryTimeout.Seconds())
|
||||
}
|
||||
@ -155,45 +155,3 @@ func isElementOf(podUID types.UID, pods *api.PodList) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type responseChecker struct {
|
||||
c *client.Client
|
||||
ns string
|
||||
label labels.Selector
|
||||
controllerName string
|
||||
pods *api.PodList
|
||||
}
|
||||
|
||||
func (r responseChecker) checkAllResponses() (done bool, err error) {
|
||||
successes := 0
|
||||
currentPods, err := r.c.Pods(r.ns).List(r.label, fields.Everything())
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
for i, pod := range r.pods.Items {
|
||||
// Check that the replica list remains unchanged, otherwise we have problems.
|
||||
if !isElementOf(pod.UID, currentPods) {
|
||||
return false, fmt.Errorf("Pod with UID %s is no longer a member of the replica set. Must have been restarted for some reason. Current replica set: %v", pod.UID, currentPods)
|
||||
}
|
||||
body, err := r.c.Get().
|
||||
Prefix("proxy").
|
||||
Namespace(api.NamespaceDefault).
|
||||
Resource("pods").
|
||||
Name(string(pod.Name)).
|
||||
Do().
|
||||
Raw()
|
||||
if err != nil {
|
||||
Logf("Controller %s: Failed to GET from replica %d (%s): %v:", r.controllerName, i+1, pod.Name, err)
|
||||
continue
|
||||
}
|
||||
// The body should be the pod name.
|
||||
if string(body) != pod.Name {
|
||||
Logf("Controller %s: Replica %d expected response %s but got %s", r.controllerName, i+1, pod.Name, string(body))
|
||||
continue
|
||||
}
|
||||
successes++
|
||||
Logf("Controller %s: Got expected result from replica %d: %s, %d of %d required successes so far", r.controllerName, i+1, string(body), successes, len(r.pods.Items))
|
||||
}
|
||||
if successes < len(r.pods.Items) {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
338
test/e2e/resize_nodes.go
Normal file
338
test/e2e/resize_nodes.go
Normal file
@ -0,0 +1,338 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func resizeNodeInstanceGroup(size int) error {
|
||||
// TODO: make this hit the compute API directly instread of shelling out to gcloud.
|
||||
output, err := exec.Command("gcloud", "preview", "managed-instance-groups", "--project="+testContext.CloudConfig.ProjectID, "--zone="+testContext.CloudConfig.Zone,
|
||||
"resize", testContext.CloudConfig.NodeInstanceGroup, fmt.Sprintf("--new-size=%v", size)).CombinedOutput()
|
||||
if err != nil {
|
||||
Logf("Failed to resize node instance group: %v", string(output))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func nodeInstanceGroupSize() (int, error) {
|
||||
// TODO: make this hit the compute API directly instread of shelling out to gcloud.
|
||||
output, err := exec.Command("gcloud", "preview", "managed-instance-groups", "--project="+testContext.CloudConfig.ProjectID,
|
||||
"--zone="+testContext.CloudConfig.Zone, "describe", testContext.CloudConfig.NodeInstanceGroup).CombinedOutput()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
pattern := "currentSize: "
|
||||
i := strings.Index(string(output), pattern)
|
||||
if i == -1 {
|
||||
return -1, fmt.Errorf("could not find '%s' in the output '%s'", pattern, output)
|
||||
}
|
||||
truncated := output[i+len(pattern):]
|
||||
j := strings.Index(string(truncated), "\n")
|
||||
if j == -1 {
|
||||
return -1, fmt.Errorf("could not find new line in the truncated output '%s'", truncated)
|
||||
}
|
||||
|
||||
currentSize, err := strconv.Atoi(string(truncated[:j]))
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return currentSize, nil
|
||||
}
|
||||
|
||||
func waitForNodeInstanceGroupSize(size int) error {
|
||||
for start := time.Now(); time.Since(start) < 4*time.Minute; time.Sleep(5 * time.Second) {
|
||||
currentSize, err := nodeInstanceGroupSize()
|
||||
if err != nil {
|
||||
Logf("Failed to get node instance group size: %v", err)
|
||||
continue
|
||||
}
|
||||
if currentSize != size {
|
||||
Logf("Waiting for node istance group size %d, current size %d", size, currentSize)
|
||||
continue
|
||||
}
|
||||
Logf("Node instance group has reached the desired size %d", size)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("timeout waiting for node instance group size to be %d", size)
|
||||
}
|
||||
|
||||
func waitForClusterSize(c *client.Client, size int) error {
|
||||
for start := time.Now(); time.Since(start) < 2*time.Minute; time.Sleep(20 * time.Second) {
|
||||
nodes, err := c.Nodes().List(labels.Everything(), fields.Everything())
|
||||
if err != nil {
|
||||
Logf("Failed to list nodes: %v", err)
|
||||
continue
|
||||
}
|
||||
if len(nodes.Items) == size {
|
||||
Logf("Cluster has reached the desired size %d", size)
|
||||
return nil
|
||||
}
|
||||
Logf("Waiting for cluster size %d, current size %d", size, len(nodes.Items))
|
||||
}
|
||||
return fmt.Errorf("timeout waiting for cluster size to be %d", size)
|
||||
}
|
||||
|
||||
func newServiceWithNameSelector(name string) *api.Service {
|
||||
return &api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "test-service",
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{
|
||||
"name": name,
|
||||
},
|
||||
Ports: []api.ServicePort{{
|
||||
Port: 9376,
|
||||
TargetPort: util.NewIntOrStringFromInt(9376),
|
||||
}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func createServiceWithNameSelector(c *client.Client, ns, name string) error {
|
||||
_, err := c.Services(ns).Create(newServiceWithNameSelector(name))
|
||||
return err
|
||||
}
|
||||
|
||||
func newReplicationControllerWithNameSelector(name string, replicas int, image string) *api.ReplicationController {
|
||||
return &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Replicas: replicas,
|
||||
Selector: map[string]string{
|
||||
"name": name,
|
||||
},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{"name": name},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: name,
|
||||
Image: image,
|
||||
Ports: []api.ContainerPort{{ContainerPort: 9376}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func createServeHostnameReplicationController(c *client.Client, ns, name string, replicas int) (*api.ReplicationController, error) {
|
||||
By(fmt.Sprintf("creating replication controller %s", name))
|
||||
return c.ReplicationControllers(ns).Create(newReplicationControllerWithNameSelector(name, replicas, "gcr.io/google_containers/serve_hostname:1.1"))
|
||||
}
|
||||
|
||||
func resizeReplicationController(c *client.Client, ns, name string, replicas int) error {
|
||||
rc, err := c.ReplicationControllers(ns).Get(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rc.Spec.Replicas = replicas
|
||||
_, err = c.ReplicationControllers(rc.Namespace).Update(rc)
|
||||
return err
|
||||
}
|
||||
|
||||
func waitForPodsCreated(c *client.Client, ns, name string, replicas int) (*api.PodList, error) {
|
||||
// List the pods, making sure we observe all the replicas.
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
|
||||
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
|
||||
pods, err := c.Pods(ns).List(label, fields.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Logf("Controller %s: Found %d pods out of %d", name, len(pods.Items), replicas)
|
||||
if len(pods.Items) == replicas {
|
||||
return pods, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("controller %s: Gave up waiting for %d pods to come up", name, replicas)
|
||||
}
|
||||
|
||||
func waitForPodsRunning(c *client.Client, pods *api.PodList) []error {
|
||||
// Wait for the pods to enter the running state. Waiting loops until the pods
|
||||
// are running so non-running pods cause a timeout for this test.
|
||||
By("ensuring each pod is running")
|
||||
e := []error{}
|
||||
for _, pod := range pods.Items {
|
||||
// TODO: make waiting parallel.
|
||||
err := waitForPodRunningInNamespace(c, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
e = append(e, err)
|
||||
}
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func verifyPodsResponding(c *client.Client, ns, name string, pods *api.PodList) error {
|
||||
By("trying to dial each unique pod")
|
||||
retryTimeout := 2 * time.Minute
|
||||
retryInterval := 5 * time.Second
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
|
||||
return wait.Poll(retryInterval, retryTimeout, podResponseChecker{c, ns, label, name, pods}.checkAllResponses)
|
||||
}
|
||||
|
||||
var _ = Describe("ResizeNodes", func() {
|
||||
supportedProviders := []string{"gce"}
|
||||
var testName string
|
||||
var c *client.Client
|
||||
var ns string
|
||||
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
c, err = loadClient()
|
||||
expectNoError(err)
|
||||
testingNs, err := createTestingNS("resize-nodes", c)
|
||||
ns = testingNs.Name
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
if !providerIs(supportedProviders...) {
|
||||
return
|
||||
}
|
||||
By(fmt.Sprintf("destroying namespace for this suite %s", ns))
|
||||
if err := c.Namespaces().Delete(ns); err != nil {
|
||||
Failf("Couldn't delete namespace '%s', %v", ns, err)
|
||||
}
|
||||
By("restoring the original node instance group size")
|
||||
if err := resizeNodeInstanceGroup(testContext.CloudConfig.NumNodes); err != nil {
|
||||
Failf("Couldn't restore the original node instance group size: %v", err)
|
||||
}
|
||||
if err := waitForNodeInstanceGroupSize(testContext.CloudConfig.NumNodes); err != nil {
|
||||
Failf("Couldn't restore the original node instance group size: %v", err)
|
||||
}
|
||||
if err := waitForClusterSize(c, testContext.CloudConfig.NumNodes); err != nil {
|
||||
Failf("Couldn't restore the original cluster size: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
testName = "should be able to delete nodes."
|
||||
It(testName, func() {
|
||||
Logf("starting test %s", testName)
|
||||
|
||||
if !providerIs(supportedProviders...) {
|
||||
By(fmt.Sprintf("Skipping %s test, which is only supported for providers %v (not %s)",
|
||||
testName, supportedProviders, testContext.Provider))
|
||||
return
|
||||
}
|
||||
|
||||
if testContext.CloudConfig.NumNodes < 2 {
|
||||
By(fmt.Sprintf("skipping %s test, which requires at lease 2 nodes (not %d)",
|
||||
testName, testContext.CloudConfig.NumNodes))
|
||||
return
|
||||
}
|
||||
|
||||
// Create a replication controller for a service that serves its hostname.
|
||||
// The source for the Docker containter kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
|
||||
name := "my-hostname-delete-node-" + string(util.NewUUID())
|
||||
replicas := testContext.CloudConfig.NumNodes
|
||||
createServeHostnameReplicationController(c, ns, name, replicas)
|
||||
pods, err := waitForPodsCreated(c, ns, name, replicas)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
e := waitForPodsRunning(c, pods)
|
||||
if len(e) > 0 {
|
||||
Failf("Failed to wait for pods running: %v", e)
|
||||
}
|
||||
err = verifyPodsResponding(c, ns, name, pods)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("decreasing cluster size to %d", replicas-1))
|
||||
err = resizeNodeInstanceGroup(replicas - 1)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = waitForNodeInstanceGroupSize(replicas - 1)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = waitForClusterSize(c, replicas-1)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By("verifying whether the pods from the removed node are recreated")
|
||||
pods, err = waitForPodsCreated(c, ns, name, replicas)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
e = waitForPodsRunning(c, pods)
|
||||
if len(e) > 0 {
|
||||
Failf("Failed to wait for pods running: %v", e)
|
||||
}
|
||||
err = verifyPodsResponding(c, ns, name, pods)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
testName = "should be able to add nodes."
|
||||
It(testName, func() {
|
||||
Logf("starting test %s", testName)
|
||||
|
||||
if !providerIs(supportedProviders...) {
|
||||
By(fmt.Sprintf("Skipping %s test, which is only supported for providers %v (not %s)",
|
||||
testName, supportedProviders, testContext.Provider))
|
||||
return
|
||||
}
|
||||
|
||||
// Create a replication controller for a service that serves its hostname.
|
||||
// The source for the Docker containter kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
|
||||
name := "my-hostname-add-node-" + string(util.NewUUID())
|
||||
createServiceWithNameSelector(c, ns, name)
|
||||
replicas := testContext.CloudConfig.NumNodes
|
||||
createServeHostnameReplicationController(c, ns, name, replicas)
|
||||
pods, err := waitForPodsCreated(c, ns, name, replicas)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
e := waitForPodsRunning(c, pods)
|
||||
if len(e) > 0 {
|
||||
Failf("Failed to wait for pods running: %v", e)
|
||||
}
|
||||
err = verifyPodsResponding(c, ns, name, pods)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("increasing cluster size to %d", replicas+1))
|
||||
err = resizeNodeInstanceGroup(replicas + 1)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = waitForNodeInstanceGroupSize(replicas + 1)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = waitForClusterSize(c, replicas+1)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("increasing size of the replication controller to %d and verifying all pods are running", replicas+1))
|
||||
resizeReplicationController(c, ns, name, replicas+1)
|
||||
pods, err = waitForPodsCreated(c, ns, name, replicas+1)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
e = waitForPodsRunning(c, pods)
|
||||
if len(e) > 0 {
|
||||
Failf("Failed to wait for pods running: %v", e)
|
||||
}
|
||||
err = verifyPodsResponding(c, ns, name, pods)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
})
|
@ -60,9 +60,11 @@ const (
|
||||
)
|
||||
|
||||
type CloudConfig struct {
|
||||
ProjectID string
|
||||
Zone string
|
||||
MasterName string
|
||||
ProjectID string
|
||||
Zone string
|
||||
MasterName string
|
||||
NodeInstanceGroup string
|
||||
NumNodes int
|
||||
|
||||
Provider cloudprovider.Interface
|
||||
}
|
||||
@ -264,6 +266,50 @@ func waitForPodSuccess(c *client.Client, podName string, contName string) error
|
||||
return waitForPodSuccessInNamespace(c, podName, contName, api.NamespaceDefault)
|
||||
}
|
||||
|
||||
// Context for checking pods responses by issuing GETs to them and verifying if the answer with pod name.
|
||||
type podResponseChecker struct {
|
||||
c *client.Client
|
||||
ns string
|
||||
label labels.Selector
|
||||
controllerName string
|
||||
pods *api.PodList
|
||||
}
|
||||
|
||||
// checkAllResponses issues GETs to all pods in the context and verify they reply with pod name.
|
||||
func (r podResponseChecker) checkAllResponses() (done bool, err error) {
|
||||
successes := 0
|
||||
currentPods, err := r.c.Pods(r.ns).List(r.label, fields.Everything())
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
for i, pod := range r.pods.Items {
|
||||
// Check that the replica list remains unchanged, otherwise we have problems.
|
||||
if !isElementOf(pod.UID, currentPods) {
|
||||
return false, fmt.Errorf("pod with UID %s is no longer a member of the replica set. Must have been restarted for some reason. Current replica set: %v", pod.UID, currentPods)
|
||||
}
|
||||
body, err := r.c.Get().
|
||||
Prefix("proxy").
|
||||
Namespace(r.ns).
|
||||
Resource("pods").
|
||||
Name(string(pod.Name)).
|
||||
Do().
|
||||
Raw()
|
||||
if err != nil {
|
||||
Logf("Controller %s: Failed to GET from replica %d (%s): %v:", r.controllerName, i+1, pod.Name, err)
|
||||
continue
|
||||
}
|
||||
// The body should be the pod name.
|
||||
if string(body) != pod.Name {
|
||||
Logf("Controller %s: Replica %d expected response %s but got %s", r.controllerName, i+1, pod.Name, string(body))
|
||||
continue
|
||||
}
|
||||
successes++
|
||||
Logf("Controller %s: Got expected result from replica %d: %s, %d of %d required successes so far", r.controllerName, i+1, string(body), successes, len(r.pods.Items))
|
||||
}
|
||||
if successes < len(r.pods.Items) {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func loadConfig() (*client.Config, error) {
|
||||
switch {
|
||||
case testContext.KubeConfig != "":
|
||||
|
Loading…
Reference in New Issue
Block a user