diff --git a/test/integration/client/dynamic_client_test.go b/test/integration/client/dynamic_client_test.go index 787e1fb7ba4..451490200cc 100644 --- a/test/integration/client/dynamic_client_test.go +++ b/test/integration/client/dynamic_client_test.go @@ -17,14 +17,19 @@ limitations under the License. package client import ( + "fmt" "reflect" "testing" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -123,6 +128,98 @@ func TestDynamicClient(t *testing.T) { } } +func TestDynamicClientWatch(t *testing.T) { + _, s, closeFn := framework.RunAMaster(nil) + defer closeFn() + + ns := framework.CreateTestingNamespace("dynamic-watch", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + + gv := &schema.GroupVersion{Group: "", Version: "v1"} + config := &restclient.Config{ + Host: s.URL, + ContentConfig: restclient.ContentConfig{GroupVersion: gv}, + } + + client := clientset.NewForConfigOrDie(config) + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + t.Fatalf("unexpected error creating dynamic client: %v", err) + } + + resource := v1.SchemeGroupVersion.WithResource("events") + + mkEvent := func(i int) *v1.Event { + name := fmt.Sprintf("event-%v", i) + return &v1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns.Name, + Name: name, + }, + InvolvedObject: v1.ObjectReference{ + Namespace: ns.Name, + Name: name, + }, + Reason: fmt.Sprintf("event %v", i), + } + } + + rv1 := "" + for i := 0; i < 10; i++ { + event := mkEvent(i) + got, err := client.CoreV1().Events(ns.Name).Create(event) + if err != nil { + t.Fatalf("Failed creating event %#q: %v", event, err) + } + if rv1 == "" { + rv1 = got.ResourceVersion + if rv1 == "" { + t.Fatal("did not get a resource version.") + } + } + t.Logf("Created event %#v", got.ObjectMeta) + } + + w, err := dynamicClient.Resource(resource).Namespace(ns.Name).Watch(metav1.ListOptions{ + ResourceVersion: rv1, + Watch: true, + FieldSelector: fields.OneTermEqualSelector("metadata.name", "event-9").String(), + }) + + if err != nil { + t.Fatalf("Failed watch: %v", err) + } + defer w.Stop() + + select { + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("watch took longer than %s", wait.ForeverTestTimeout.String()) + case got, ok := <-w.ResultChan(): + if !ok { + t.Fatal("Watch channel closed unexpectedly.") + } + + // We expect to see an ADD of event-9 and only event-9. (This + // catches a bug where all the events would have been sent down + // the channel.) + if e, a := watch.Added, got.Type; e != a { + t.Errorf("Wanted %v, got %v", e, a) + } + + unstructured, ok := got.Object.(*unstructured.Unstructured) + if !ok { + t.Fatalf("Unexpected watch event containing object %#q", got.Object) + } + event, err := unstructuredToEvent(unstructured) + if err != nil { + t.Fatalf("unexpected error converting Unstructured to v1.Event: %v", err) + } + if e, a := "event-9", event.Name; e != a { + t.Errorf("Wanted %v, got %v", e, a) + } + } +} + func unstructuredToPod(obj *unstructured.Unstructured) (*v1.Pod, error) { json, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) if err != nil { @@ -134,3 +231,13 @@ func unstructuredToPod(obj *unstructured.Unstructured) (*v1.Pod, error) { pod.APIVersion = "" return pod, err } + +func unstructuredToEvent(obj *unstructured.Unstructured) (*v1.Event, error) { + json, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj) + if err != nil { + return nil, err + } + event := new(v1.Event) + err = runtime.DecodeInto(testapi.Default.Codec(), json, event) + return event, err +}