add watch integration test for dynamic client

This commit is contained in:
xuzhonghu 2018-07-26 16:12:31 +08:00
parent 85b8a23f19
commit b4a73d50c0

View File

@ -17,14 +17,19 @@ limitations under the License.
package client
import (
"fmt"
"reflect"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
@ -123,6 +128,98 @@ func TestDynamicClient(t *testing.T) {
}
}
func TestDynamicClientWatch(t *testing.T) {
_, s, closeFn := framework.RunAMaster(nil)
defer closeFn()
ns := framework.CreateTestingNamespace("dynamic-watch", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
gv := &schema.GroupVersion{Group: "", Version: "v1"}
config := &restclient.Config{
Host: s.URL,
ContentConfig: restclient.ContentConfig{GroupVersion: gv},
}
client := clientset.NewForConfigOrDie(config)
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatalf("unexpected error creating dynamic client: %v", err)
}
resource := v1.SchemeGroupVersion.WithResource("events")
mkEvent := func(i int) *v1.Event {
name := fmt.Sprintf("event-%v", i)
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns.Name,
Name: name,
},
InvolvedObject: v1.ObjectReference{
Namespace: ns.Name,
Name: name,
},
Reason: fmt.Sprintf("event %v", i),
}
}
rv1 := ""
for i := 0; i < 10; i++ {
event := mkEvent(i)
got, err := client.CoreV1().Events(ns.Name).Create(event)
if err != nil {
t.Fatalf("Failed creating event %#q: %v", event, err)
}
if rv1 == "" {
rv1 = got.ResourceVersion
if rv1 == "" {
t.Fatal("did not get a resource version.")
}
}
t.Logf("Created event %#v", got.ObjectMeta)
}
w, err := dynamicClient.Resource(resource).Namespace(ns.Name).Watch(metav1.ListOptions{
ResourceVersion: rv1,
Watch: true,
FieldSelector: fields.OneTermEqualSelector("metadata.name", "event-9").String(),
})
if err != nil {
t.Fatalf("Failed watch: %v", err)
}
defer w.Stop()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("watch took longer than %s", wait.ForeverTestTimeout.String())
case got, ok := <-w.ResultChan():
if !ok {
t.Fatal("Watch channel closed unexpectedly.")
}
// We expect to see an ADD of event-9 and only event-9. (This
// catches a bug where all the events would have been sent down
// the channel.)
if e, a := watch.Added, got.Type; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
unstructured, ok := got.Object.(*unstructured.Unstructured)
if !ok {
t.Fatalf("Unexpected watch event containing object %#q", got.Object)
}
event, err := unstructuredToEvent(unstructured)
if err != nil {
t.Fatalf("unexpected error converting Unstructured to v1.Event: %v", err)
}
if e, a := "event-9", event.Name; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
}
}
func unstructuredToPod(obj *unstructured.Unstructured) (*v1.Pod, error) {
json, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
@ -134,3 +231,13 @@ func unstructuredToPod(obj *unstructured.Unstructured) (*v1.Pod, error) {
pod.APIVersion = ""
return pod, err
}
func unstructuredToEvent(obj *unstructured.Unstructured) (*v1.Event, error) {
json, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, err
}
event := new(v1.Event)
err = runtime.DecodeInto(testapi.Default.Codec(), json, event)
return event, err
}