diff --git a/test/e2e/apimachinery/BUILD b/test/e2e/apimachinery/BUILD index cbbf6662b21..b009a6f0478 100644 --- a/test/e2e/apimachinery/BUILD +++ b/test/e2e/apimachinery/BUILD @@ -19,6 +19,7 @@ go_library( "initializers.go", "namespace.go", "table_conversion.go", + "watch.go", "webhook.go", ], importpath = "k8s.io/kubernetes/test/e2e/apimachinery", @@ -47,6 +48,7 @@ go_library( "//vendor/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library", "//vendor/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library", "//vendor/k8s.io/apiextensions-apiserver/test/integration/testserver:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", diff --git a/test/e2e/apimachinery/watch.go b/test/e2e/apimachinery/watch.go new file mode 100644 index 00000000000..6a46c6621b2 --- /dev/null +++ b/test/e2e/apimachinery/watch.go @@ -0,0 +1,209 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apimachinery + +import ( + "time" + + "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + watchPodLabelKey = "watch-this-pod" + watchPodLabelValueA = "AAA" + watchPodLabelValueB = "BBB" +) + +var _ = SIGDescribe("Watchers", func() { + f := framework.NewDefaultFramework("watch") + + It("should observe add, update, and delete events on pods", func() { + c := f.ClientSet + ns := f.Namespace.Name + + By("creating multiple similar watches on pods") + watchA, err := watchPodsWithLabels(f, watchPodLabelValueA) + Expect(err).NotTo(HaveOccurred()) + + watchB, err := watchPodsWithLabels(f, watchPodLabelValueB) + Expect(err).NotTo(HaveOccurred()) + + watchAB, err := watchPodsWithLabels(f, watchPodLabelValueA, watchPodLabelValueB) + Expect(err).NotTo(HaveOccurred()) + + By("creating, modifying, and deleting pods") + testPodA := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "e2e-watch-test-pod-a", + Labels: map[string]string{ + watchPodLabelKey: watchPodLabelValueA, + }, + }, + Spec: v1.PodSpec{ + ActiveDeadlineSeconds: int64ptr(20), + Containers: []v1.Container{ + { + Name: "example", + Image: framework.GetPauseImageName(c), + }, + }, + }, + } + testPodB := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "e2e-watch-test-pod-b", + Labels: map[string]string{ + watchPodLabelKey: watchPodLabelValueB, + }, + }, + Spec: v1.PodSpec{ + ActiveDeadlineSeconds: int64ptr(20), + Containers: []v1.Container{ + { + Name: "example", + Image: framework.GetPauseImageName(c), + }, + }, + }, + } + testPodA, err = c.CoreV1().Pods(ns).Create(testPodA) + Expect(err).NotTo(HaveOccurred()) + expectEvent(watchA, watch.Added, testPodA) + expectEvent(watchAB, watch.Added, testPodA) + expectNoEvent(watchB, watch.Added, testPodA) + + testPodA, err = updatePod(f, testPodA.GetName(), func(p *v1.Pod) { + p.Spec.ActiveDeadlineSeconds = int64ptr(10) + }) + Expect(err).NotTo(HaveOccurred()) + expectEvent(watchA, watch.Modified, testPodA) + expectEvent(watchAB, watch.Modified, testPodA) + expectNoEvent(watchB, watch.Modified, testPodA) + + testPodA, err = updatePod(f, testPodA.GetName(), func(p *v1.Pod) { + p.Spec.ActiveDeadlineSeconds = int64ptr(5) + }) + Expect(err).NotTo(HaveOccurred()) + expectEvent(watchA, watch.Modified, testPodA) + expectEvent(watchAB, watch.Modified, testPodA) + expectNoEvent(watchB, watch.Modified, testPodA) + + err = c.CoreV1().Pods(ns).Delete(testPodA.GetName(), nil) + Expect(err).NotTo(HaveOccurred()) + expectEvent(watchA, watch.Deleted, nil) + expectEvent(watchAB, watch.Deleted, nil) + expectNoEvent(watchB, watch.Deleted, nil) + + testPodB, err = c.CoreV1().Pods(ns).Create(testPodB) + Expect(err).NotTo(HaveOccurred()) + expectEvent(watchB, watch.Added, testPodB) + expectEvent(watchAB, watch.Added, testPodB) + expectNoEvent(watchA, watch.Added, testPodB) + + err = c.CoreV1().Pods(ns).Delete(testPodB.GetName(), nil) + Expect(err).NotTo(HaveOccurred()) + expectEvent(watchB, watch.Deleted, nil) + expectEvent(watchAB, watch.Deleted, nil) + expectNoEvent(watchA, watch.Deleted, nil) + }) +}) + +func watchPodsWithLabels(f *framework.Framework, labels ...string) (watch.Interface, error) { + c := f.ClientSet + ns := f.Namespace.Name + opts := metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: watchPodLabelKey, + Operator: metav1.LabelSelectorOpIn, + Values: labels, + }, + }, + }), + } + return c.CoreV1().Pods(ns).Watch(opts) +} + +func int64ptr(i int) *int64 { + i64 := int64(i) + return &i64 +} + +type updatePodFunc func(p *v1.Pod) + +func updatePod(f *framework.Framework, name string, update updatePodFunc) (*v1.Pod, error) { + c := f.ClientSet + ns := f.Namespace.Name + var p *v1.Pod + pollErr := wait.PollImmediate(2*time.Second, 1*time.Minute, func() (bool, error) { + var err error + if p, err = c.CoreV1().Pods(ns).Get(name, metav1.GetOptions{}); err != nil { + return false, err + } + update(p) + if p, err = c.CoreV1().Pods(ns).Update(p); err == nil { + return true, nil + } + // Only retry update on conflict + if !errors.IsConflict(err) { + return false, err + } + return false, nil + }) + return p, pollErr +} + +func expectEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) { + if event, ok := waitForEvent(w, eventType, object); !ok { + framework.Failf("Timed out waiting for expected event: %v", event) + } +} + +func expectNoEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) { + if event, ok := waitForEvent(w, eventType, object); ok { + framework.Failf("Unexpected event occurred: %v", event) + } +} + +func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject runtime.Object) (watch.Event, bool) { + stopTimer := time.NewTimer(10 * time.Second) + for { + select { + case actual := <-w.ResultChan(): + if expectType == actual.Type && (expectObject == nil || apiequality.Semantic.DeepEqual(expectObject, actual.Object)) { + return actual, true + } + case <-stopTimer.C: + expected := watch.Event{ + Type: expectType, + Object: expectObject, + } + return expected, false + } + } +}