diff --git a/pkg/client/fake_pods.go b/pkg/client/fake_pods.go index a5e5e97924d..d39491211af 100644 --- a/pkg/client/fake_pods.go +++ b/pkg/client/fake_pods.go @@ -19,6 +19,7 @@ package client import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // FakePods implements PodsInterface. Meant to be embedded into a struct to get a default @@ -53,6 +54,11 @@ func (c *FakePods) Update(pod *api.Pod) (*api.Pod, error) { return &api.Pod{}, nil } +func (c *FakePods) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-pods", Value: resourceVersion}) + return c.Fake.Watch, c.Fake.Err +} + func (c *FakePods) Bind(bind *api.Binding) error { c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name}) return nil diff --git a/pkg/client/pods.go b/pkg/client/pods.go index 30366081550..14a7a359e06 100644 --- a/pkg/client/pods.go +++ b/pkg/client/pods.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // PodsNamespacer has methods to work with Pod resources in a namespace @@ -36,7 +37,7 @@ type PodInterface interface { Delete(name string) error Create(pod *api.Pod) (*api.Pod, error) Update(pod *api.Pod) (*api.Pod, error) - + Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) Bind(binding *api.Binding) error } @@ -95,6 +96,18 @@ func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) { return } +// Watch returns a watch.Interface that watches the requested pods. +func (c *pods) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + return c.r.Get(). + Prefix("watch"). + Namespace(c.ns). + Resource("pods"). + Param("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} + // Bind applies the provided binding to the named pod in the current namespace (binding.Namespace is ignored). func (c *pods) Bind(binding *api.Binding) error { return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error() diff --git a/test/e2e/pods.go b/test/e2e/pods.go index 3dca34a6abb..97f3f48025f 100644 --- a/test/e2e/pods.go +++ b/test/e2e/pods.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -122,27 +123,70 @@ var _ = Describe("Pods", func() { }, } + By("setting up watch") + pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value}))) + if err != nil { + Fail(fmt.Sprintf("Failed to query for pods: %v", err)) + } + Expect(len(pods.Items)).To(Equal(0)) + w, err := podClient.Watch( + labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), labels.Everything(), pods.ListMeta.ResourceVersion) + if err != nil { + Fail(fmt.Sprintf("Failed to set up watch: %v", err)) + } + By("submitting the pod to kubernetes") // We call defer here in case there is a problem with // the test so we can ensure that we clean up after // ourselves defer podClient.Delete(pod.Name) - _, err := podClient.Create(pod) + _, err = podClient.Create(pod) if err != nil { Fail(fmt.Sprintf("Failed to create pod: %v", err)) } By("verifying the pod is in kubernetes") - pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value}))) + pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value}))) if err != nil { Fail(fmt.Sprintf("Failed to query for pods: %v", err)) } Expect(len(pods.Items)).To(Equal(1)) + By("veryfying pod creation was observed") + select { + case event, _ := <-w.ResultChan(): + if event.Type != watch.Added { + Fail(fmt.Sprintf("Failed to observe pod creation: %v", event)) + } + case <-time.After(podStartTimeout): + Fail("Timeout while waiting for pod creation") + } + By("deleting the pod") podClient.Delete(pod.Name) pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value}))) + if err != nil { + Fail(fmt.Sprintf("Failed to delete pod: %v", err)) + } Expect(len(pods.Items)).To(Equal(0)) + + By("veryfying pod deletion was observed") + deleted := false + timeout := false + timer := time.After(podStartTimeout) + for !deleted && !timeout { + select { + case event, _ := <-w.ResultChan(): + if event.Type == watch.Deleted { + deleted = true + } + case <-timer: + timeout = true + } + } + if !deleted { + Fail("Failed to observe pod deletion") + } }) It("should be updated", func() {