diff --git a/test/e2e/common/pods.go b/test/e2e/common/pods.go index b420e1e69a3..d7cf8d6c348 100644 --- a/test/e2e/common/pods.go +++ b/test/e2e/common/pods.go @@ -29,11 +29,14 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/test/e2e/framework" @@ -192,8 +195,30 @@ var _ = framework.KubeDescribe("Pods", func() { LabelSelector: selector.String(), ResourceVersion: pods.ListMeta.ResourceVersion, } - w, err := podClient.Watch(options) - Expect(err).NotTo(HaveOccurred(), "failed to set up watch") + + listCompleted := make(chan bool, 1) + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.LabelSelector = selector.String() + podList, err := podClient.List(options) + if err == nil { + select { + case listCompleted <- true: + framework.Logf("observed the pod list") + return podList, err + default: + framework.Logf("channel blocked") + } + } + return podList, err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = selector.String() + return podClient.Watch(options) + }, + } + _, _, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.Pod{}) + defer w.Stop() By("submitting the pod to kubernetes") podClient.Create(pod) @@ -207,12 +232,17 @@ var _ = framework.KubeDescribe("Pods", func() { By("verifying pod creation was observed") select { - case event, _ := <-w.ResultChan(): - if event.Type != watch.Added { - framework.Failf("Failed to observe pod creation: %v", event) + case <-listCompleted: + select { + case event, _ := <-w.ResultChan(): + if event.Type != watch.Added { + framework.Failf("Failed to observe pod creation: %v", event) + } + case <-time.After(framework.PodStartTimeout): + framework.Failf("Timeout while waiting for pod creation") } - case <-time.After(framework.PodStartTimeout): - framework.Failf("Timeout while waiting for pod creation") + case <-time.After(10 * time.Second): + framework.Failf("Timeout while waiting to observe pod list") } // We need to wait for the pod to be running, otherwise the deletion @@ -221,7 +251,6 @@ var _ = framework.KubeDescribe("Pods", func() { // save the running pod pod, err = podClient.Get(pod.Name, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred(), "failed to GET scheduled pod") - framework.Logf("running pod: %#v", pod) By("deleting the pod gracefully") err = podClient.Delete(pod.Name, metav1.NewDeleteOptions(30)) diff --git a/test/e2e/node/BUILD b/test/e2e/node/BUILD index 6053a44869a..0b03a9115cb 100644 --- a/test/e2e/node/BUILD +++ b/test/e2e/node/BUILD @@ -37,7 +37,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//test/e2e/common:go_default_library", "//test/e2e/framework:go_default_library", diff --git a/test/e2e/node/pods.go b/test/e2e/node/pods.go index b8ba1f8cad1..d5e41404f83 100644 --- a/test/e2e/node/pods.go +++ b/test/e2e/node/pods.go @@ -18,6 +18,7 @@ package node import ( "crypto/tls" + "encoding/json" "fmt" "net/http" "regexp" @@ -30,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" @@ -74,7 +74,7 @@ var _ = SIGDescribe("Pods Extended", func() { }, } - By("setting up watch") + By("setting up selector") selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": value})) options := metav1.ListOptions{LabelSelector: selector.String()} pods, err := podClient.List(options) @@ -84,8 +84,6 @@ var _ = SIGDescribe("Pods Extended", func() { LabelSelector: selector.String(), ResourceVersion: pods.ListMeta.ResourceVersion, } - w, err := podClient.Watch(options) - Expect(err).NotTo(HaveOccurred(), "failed to set up watch") By("submitting the pod to kubernetes") podClient.Create(pod) @@ -97,16 +95,6 @@ var _ = SIGDescribe("Pods Extended", func() { Expect(err).NotTo(HaveOccurred(), "failed to query for pod") Expect(len(pods.Items)).To(Equal(1)) - By("verifying pod creation was observed") - select { - case event, _ := <-w.ResultChan(): - if event.Type != watch.Added { - framework.Failf("Failed to observe pod creation: %v", event) - } - case <-time.After(framework.PodStartTimeout): - framework.Failf("Timeout while waiting for pod creation") - } - // We need to wait for the pod to be running, otherwise the deletion // may be carried out immediately rather than gracefully. framework.ExpectNoError(f.WaitForPodRunning(pod.Name)) @@ -143,10 +131,15 @@ var _ = SIGDescribe("Pods Extended", func() { By("deleting the pod gracefully") rsp, err := client.Do(req) Expect(err).NotTo(HaveOccurred(), "failed to use http client to send delete") + Expect(rsp.StatusCode).Should(Equal(http.StatusOK), "failed to delete gracefully by client request") + var lastPod v1.Pod + err = json.NewDecoder(rsp.Body).Decode(&lastPod) + Expect(err).NotTo(HaveOccurred(), "failed to decode graceful termination proxy response") defer rsp.Body.Close() By("verifying the kubelet observed the termination notice") + Expect(wait.Poll(time.Second*5, time.Second*30, func() (bool, error) { podList, err := framework.GetKubeletPods(f.ClientSet, pod.Spec.NodeName) if err != nil { @@ -161,32 +154,12 @@ var _ = SIGDescribe("Pods Extended", func() { framework.Logf("deletion has not yet been observed") return false, nil } - return true, nil + return false, nil } framework.Logf("no pod exists with the name we were looking for, assuming the termination request was observed and completed") return true, nil })).NotTo(HaveOccurred(), "kubelet never observed the termination notice") - By("verifying pod deletion was observed") - deleted := false - timeout := false - var lastPod *v1.Pod - timer := time.After(2 * time.Minute) - for !deleted && !timeout { - select { - case event, _ := <-w.ResultChan(): - if event.Type == watch.Deleted { - lastPod = event.Object.(*v1.Pod) - deleted = true - } - case <-timer: - timeout = true - } - } - if !deleted { - framework.Failf("Failed to observe pod deletion") - } - Expect(lastPod.DeletionTimestamp).ToNot(BeNil()) Expect(lastPod.Spec.TerminationGracePeriodSeconds).ToNot(BeZero())