From f8e1404f871cff0f7c0f0c88cc4d11d008252e1c Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Wed, 9 Mar 2016 15:01:55 -0800 Subject: [PATCH] e2e: seperate wait for termination notice and graceful termination --- test/e2e/kubelet_perf.go | 2 +- test/e2e/kubelet_stats.go | 72 +++------------------------------------ test/e2e/pods.go | 22 ++++++++++++ test/e2e/util.go | 65 +++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 69 deletions(-) diff --git a/test/e2e/kubelet_perf.go b/test/e2e/kubelet_perf.go index 67d3dccbe88..c3de8392d22 100644 --- a/test/e2e/kubelet_perf.go +++ b/test/e2e/kubelet_perf.go @@ -46,7 +46,7 @@ type resourceTest struct { func logPodsOnNodes(c *client.Client, nodeNames []string) { for _, n := range nodeNames { - podList, err := GetKubeletPods(c, n) + podList, err := GetKubeletRunningPods(c, n) if err != nil { Logf("Unable to retrieve kubelet pods for node %v", n) continue diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go index f34415e9545..a23b0a00063 100644 --- a/test/e2e/kubelet_stats.go +++ b/test/e2e/kubelet_stats.go @@ -32,7 +32,6 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/prometheus/common/model" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/restclient" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/server/stats" @@ -42,11 +41,6 @@ import ( "k8s.io/kubernetes/pkg/util/wait" ) -const ( - // timeout for proxy requests. - proxyTimeout = 2 * time.Minute -) - // KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint. // TODO: Get some more structure around the metrics and this type type KubeletMetric struct { @@ -342,46 +336,9 @@ type usageDataPerContainer struct { memWorkSetData []uint64 } -// Performs a get on a node proxy endpoint given the nodename and rest client. -func nodeProxyRequest(c *client.Client, node, endpoint string) (restclient.Result, error) { - // proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call. - // This will leak a goroutine if proxy hangs. #22165 - subResourceProxyAvailable, err := serverVersionGTE(subResourceServiceAndNodeProxyVersion, c) - if err != nil { - return restclient.Result{}, err - } - var result restclient.Result - finished := make(chan struct{}) - go func() { - if subResourceProxyAvailable { - result = c.Get(). - Resource("nodes"). - SubResource("proxy"). - Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)). - Suffix(endpoint). - Do() - - } else { - result = c.Get(). - Prefix("proxy"). - Resource("nodes"). - Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)). - Suffix(endpoint). - Do() - } - finished <- struct{}{} - }() - select { - case <-finished: - return result, nil - case <-time.After(proxyTimeout): - return restclient.Result{}, nil - } -} - // Retrieve metrics from the kubelet server of the given node. func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) { - client, err := nodeProxyRequest(c, node, "metrics") + client, err := NodeProxyRequest(c, node, "metrics") if err != nil { return "", err } @@ -408,7 +365,7 @@ func getKubeletMetricsThroughNode(nodeName string) (string, error) { } func getKubeletHeapStats(c *client.Client, nodeName string) (string, error) { - client, err := nodeProxyRequest(c, nodeName, "debug/pprof/heap") + client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap") if err != nil { return "", err } @@ -423,31 +380,10 @@ func getKubeletHeapStats(c *client.Client, nodeName string) (string, error) { return strings.Join(lines[len(lines)-numLines:], "\n"), nil } -// GetKubeletPods retrieves the list of running pods on the kubelet. The pods -// includes necessary information (e.g., UID, name, namespace for -// pods/containers), but do not contain the full spec. -func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) { - result := &api.PodList{} - client, err := nodeProxyRequest(c, node, "runningpods") - if err != nil { - return &api.PodList{}, err - } - if err = client.Into(result); err != nil { - return &api.PodList{}, err - } - return result, nil -} - func PrintAllKubeletPods(c *client.Client, nodeName string) { - result, err := nodeProxyRequest(c, nodeName, "pods") + podList, err := GetKubeletPods(c, nodeName) if err != nil { - Logf("Unable to retrieve kubelet pods for node %v", nodeName) - return - } - podList := &api.PodList{} - err = result.Into(podList) - if err != nil { - Logf("Unable to cast result to pods for node %v", nodeName) + Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err) return } for _, p := range podList.Items { diff --git a/test/e2e/pods.go b/test/e2e/pods.go index 928df86f2eb..aac3413f0be 100644 --- a/test/e2e/pods.go +++ b/test/e2e/pods.go @@ -351,6 +351,28 @@ var _ = Describe("Pods", func() { Failf("Failed to delete pod: %v", err) } + By("verifying the kubelet observed the termination notice") + pod, err = podClient.Get(pod.Name) + Expect(wait.Poll(time.Second*5, time.Second*30, func() (bool, error) { + podList, err := GetKubeletPods(framework.Client, pod.Spec.NodeName) + if err != nil { + Logf("Unable to retrieve kubelet pods for node %v: %v", pod.Spec.NodeName, err) + return false, nil + } + for _, kubeletPod := range podList.Items { + if pod.Name != kubeletPod.Name { + continue + } + if kubeletPod.ObjectMeta.DeletionTimestamp == nil { + Logf("deletion has not yet been observed") + return false, nil + } + return true, nil + } + Logf("no pod exists with the name we were looking for, assuming the termination request was observed and completed") + return true, nil + })).NotTo(HaveOccurred(), "kubelet never observed the termination notice") + By("verifying pod deletion was observed") deleted := false timeout := false diff --git a/test/e2e/util.go b/test/e2e/util.go index a4344aff325..6ff198d9f96 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -56,6 +56,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/runtime" sshutil "k8s.io/kubernetes/pkg/ssh" "k8s.io/kubernetes/pkg/types" @@ -3497,6 +3498,70 @@ func GetReadyNodes(f *Framework) (nodes *api.NodeList, err error) { return nodes, nil } +// timeout for proxy requests. +const proxyTimeout = 2 * time.Minute + +// NodeProxyRequest performs a get on a node proxy endpoint given the nodename and rest client. +func NodeProxyRequest(c *client.Client, node, endpoint string) (restclient.Result, error) { + // proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call. + // This will leak a goroutine if proxy hangs. #22165 + subResourceProxyAvailable, err := serverVersionGTE(subResourceServiceAndNodeProxyVersion, c) + if err != nil { + return restclient.Result{}, err + } + var result restclient.Result + finished := make(chan struct{}) + go func() { + if subResourceProxyAvailable { + result = c.Get(). + Resource("nodes"). + SubResource("proxy"). + Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)). + Suffix(endpoint). + Do() + + } else { + result = c.Get(). + Prefix("proxy"). + Resource("nodes"). + Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)). + Suffix(endpoint). + Do() + } + finished <- struct{}{} + }() + select { + case <-finished: + return result, nil + case <-time.After(proxyTimeout): + return restclient.Result{}, nil + } +} + +// GetKubeletPods retrieves the list of pods on the kubelet +func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) { + return getKubeletPods(c, node, "pods") +} + +// GetKubeletRunningPods retrieves the list of running pods on the kubelet. The pods +// includes necessary information (e.g., UID, name, namespace for +// pods/containers), but do not contain the full spec. +func GetKubeletRunningPods(c *client.Client, node string) (*api.PodList, error) { + return getKubeletPods(c, node, "runningpods") +} + +func getKubeletPods(c *client.Client, node, resource string) (*api.PodList, error) { + result := &api.PodList{} + client, err := NodeProxyRequest(c, node, resource) + if err != nil { + return &api.PodList{}, err + } + if err = client.Into(result); err != nil { + return &api.PodList{}, err + } + return result, nil +} + // LaunchWebserverPod launches a pod serving http on port 8080 to act // as the target for networking connectivity checks. The ip address // of the created pod will be returned if the pod is launched