From af8ed43b018ae37b7ac79a162c28fb7921049224 Mon Sep 17 00:00:00 2001 From: yue9944882 <291271447@qq.com> Date: Thu, 21 Dec 2017 16:50:16 +0800 Subject: [PATCH] fix(fakeclient): write event to watch channel on add/update/delete fix races with watch call add test for non-namespace resource watch add matching for all-namespace-watch fix delete namespace watch & restrict test fix multiple invocation on same resource & namespace add descriptive doc for tracker.watchers Kubernetes-commit: f57cc0b22d282bc8fe68faf91529e7175bc3918a --- testing/BUILD | 19 ++++ testing/fixture.go | 60 ++++++++++++- testing/fixture_test.go | 192 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 268 insertions(+), 3 deletions(-) create mode 100644 testing/fixture_test.go diff --git a/testing/BUILD b/testing/BUILD index b26e6628..666a449b 100644 --- a/testing/BUILD +++ b/testing/BUILD @@ -3,6 +3,7 @@ package(default_visibility = ["//visibility:public"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -28,6 +29,24 @@ go_library( ], ) +go_test( + name = "go_default_test", + srcs = [ + "fixture_test.go", + ], + embed = [":go_default_library"], + importpath = "k8s.io/client-go/testing", + deps = [ + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + ], +) + filegroup( name = "package-srcs", srcs = glob(["**"]), diff --git a/testing/fixture.go b/testing/fixture.go index 08379fb0..ba8ee508 100644 --- a/testing/fixture.go +++ b/testing/fixture.go @@ -29,6 +29,11 @@ import ( restclient "k8s.io/client-go/rest" ) +// FakeWatchBufferSize is the max num of watch event can be buffered in the +// watch channel. Note that when watch event overflows or exceed this buffer +// size, manipulations via fake client may be blocked. +const FakeWatchBufferSize = 128 + // ObjectTracker keeps track of objects. It is intended to be used to // fake calls to a server by returning objects based on their kind, // namespace and name. @@ -54,6 +59,10 @@ type ObjectTracker interface { // didn't exist in the tracker prior to deletion, Delete returns // no error. Delete(gvr schema.GroupVersionResource, ns, name string) error + + // Watch watches objects from the tracker. Watch returns a channel + // which will push added / modified / deleted object. + Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) } // ObjectScheme abstracts the implementation of common operations on objects. @@ -132,6 +141,13 @@ type tracker struct { decoder runtime.Decoder lock sync.RWMutex objects map[schema.GroupVersionResource][]runtime.Object + // The value type of watchers is a map of which the key is either a namespace or + // all/non namespace aka "" and its value is list of fake watchers. Each of + // fake watcher holds a buffered channel of size "FakeWatchBufferSize" which + // is default to 128. Manipulations on resources will broadcast the notification + // events into the watchers' channel and note that too many unhandled event may + // potentially block the tracker. + watchers map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher } var _ ObjectTracker = &tracker{} @@ -140,9 +156,10 @@ var _ ObjectTracker = &tracker{} // of objects for the fake clientset. Mostly useful for unit tests. func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker { return &tracker{ - scheme: scheme, - decoder: decoder, - objects: make(map[schema.GroupVersionResource][]runtime.Object), + scheme: scheme, + decoder: decoder, + objects: make(map[schema.GroupVersionResource][]runtime.Object), + watchers: make(map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher), } } @@ -185,6 +202,19 @@ func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionK return list.DeepCopyObject(), nil } +func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) { + t.lock.Lock() + defer t.lock.Unlock() + + fakewatcher := watch.NewFakeWithChanSize(FakeWatchBufferSize, true) + + if _, exists := t.watchers[gvr]; !exists { + t.watchers[gvr] = make(map[string][]*watch.FakeWatcher) + } + t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher) + return fakewatcher, nil +} + func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) { errNotFound := errors.NewNotFound(gvr.GroupResource(), name) @@ -263,6 +293,19 @@ func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns return t.add(gvr, obj, ns, true) } +func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.FakeWatcher { + watches := []*watch.FakeWatcher{} + if t.watchers[gvr] != nil { + if w := t.watchers[gvr][ns]; w != nil { + watches = append(watches, w...) + } + if w := t.watchers[gvr][""]; w != nil { + watches = append(watches, w...) + } + } + return watches +} + func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error { t.lock.Lock() defer t.lock.Unlock() @@ -296,6 +339,9 @@ func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns st } if oldMeta.GetNamespace() == newMeta.GetNamespace() && oldMeta.GetName() == newMeta.GetName() { if replaceExisting { + for _, w := range t.getWatches(gvr, ns) { + w.Modify(obj) + } t.objects[gvr][i] = obj return nil } @@ -310,6 +356,10 @@ func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns st t.objects[gvr] = append(t.objects[gvr], obj) + for _, w := range t.getWatches(gvr, ns) { + w.Add(obj) + } + return nil } @@ -342,7 +392,11 @@ func (t *tracker) Delete(gvr schema.GroupVersionResource, ns, name string) error return err } if objMeta.GetNamespace() == ns && objMeta.GetName() == name { + obj := t.objects[gvr][i] t.objects[gvr] = append(t.objects[gvr][:i], t.objects[gvr][i+1:]...) + for _, w := range t.getWatches(gvr, ns) { + w.Delete(obj) + } found = true break } diff --git a/testing/fixture_test.go b/testing/fixture_test.go new file mode 100644 index 00000000..967e0aef --- /dev/null +++ b/testing/fixture_test.go @@ -0,0 +1,192 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + "math/rand" + "strconv" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + serializer "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/watch" +) + +func getArbitraryResource(s schema.GroupVersionResource, name, namespace string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": s.Resource, + "apiVersion": s.Version, + "metadata": map[string]interface{}{ + "name": name, + "namespace": namespace, + "generateName": "test_generateName", + "uid": "test_uid", + "resourceVersion": "test_resourceVersion", + "selfLink": "test_selfLink", + }, + "data": strconv.Itoa(rand.Int()), + }, + } +} + +func TestWatchCallNonNamespace(t *testing.T) { + testResource := schema.GroupVersionResource{Group: "", Version: "test_version", Resource: "test_kind"} + testObj := getArbitraryResource(testResource, "test_name", "test_namespace") + accessor, err := meta.Accessor(testObj) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + ns := accessor.GetNamespace() + scheme := runtime.NewScheme() + codecs := serializer.NewCodecFactory(scheme) + o := NewObjectTracker(scheme, codecs.UniversalDecoder()) + watch, err := o.Watch(testResource, ns) + go func() { + err := o.Create(testResource, testObj, ns) + if err != nil { + t.Errorf("test resource creation failed: %v", err) + } + }() + out := <-watch.ResultChan() + assert.Equal(t, testObj, out.Object, "watched object mismatch") +} + +func TestWatchCallAllNamespace(t *testing.T) { + testResource := schema.GroupVersionResource{Group: "", Version: "test_version", Resource: "test_kind"} + testObj := getArbitraryResource(testResource, "test_name", "test_namespace") + accessor, err := meta.Accessor(testObj) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + ns := accessor.GetNamespace() + scheme := runtime.NewScheme() + codecs := serializer.NewCodecFactory(scheme) + o := NewObjectTracker(scheme, codecs.UniversalDecoder()) + w, err := o.Watch(testResource, "test_namespace") + wAll, err := o.Watch(testResource, "") + go func() { + err := o.Create(testResource, testObj, ns) + assert.NoError(t, err, "test resource creation failed") + }() + out := <-w.ResultChan() + outAll := <-wAll.ResultChan() + assert.Equal(t, watch.Added, out.Type, "watch event mismatch") + assert.Equal(t, watch.Added, outAll.Type, "watch event mismatch") + assert.Equal(t, testObj, out.Object, "watched created object mismatch") + assert.Equal(t, testObj, outAll.Object, "watched created object mismatch") + go func() { + err := o.Update(testResource, testObj, ns) + assert.NoError(t, err, "test resource updating failed") + }() + out = <-w.ResultChan() + outAll = <-wAll.ResultChan() + assert.Equal(t, watch.Modified, out.Type, "watch event mismatch") + assert.Equal(t, watch.Modified, outAll.Type, "watch event mismatch") + assert.Equal(t, testObj, out.Object, "watched updated object mismatch") + assert.Equal(t, testObj, outAll.Object, "watched updated object mismatch") + go func() { + err := o.Delete(testResource, "test_namespace", "test_name") + assert.NoError(t, err, "test resource deletion failed") + }() + out = <-w.ResultChan() + outAll = <-wAll.ResultChan() + assert.Equal(t, watch.Deleted, out.Type, "watch event mismatch") + assert.Equal(t, watch.Deleted, outAll.Type, "watch event mismatch") + assert.Equal(t, testObj, out.Object, "watched deleted object mismatch") + assert.Equal(t, testObj, outAll.Object, "watched deleted object mismatch") +} + +func TestWatchCallMultipleInvocation(t *testing.T) { + cases := []struct { + name string + op watch.EventType + }{ + { + "foo", + watch.Added, + }, + { + "bar", + watch.Added, + }, + { + "bar", + watch.Modified, + }, + { + "foo", + watch.Deleted, + }, + { + "bar", + watch.Deleted, + }, + } + + scheme := runtime.NewScheme() + codecs := serializer.NewCodecFactory(scheme) + testResource := schema.GroupVersionResource{Group: "", Version: "test_version", Resource: "test_kind"} + + o := NewObjectTracker(scheme, codecs.UniversalDecoder()) + watchNamespaces := []string{ + "", + "", + "test_namespace", + "test_namespace", + } + var wg sync.WaitGroup + wg.Add(len(watchNamespaces)) + for idx, watchNamespace := range watchNamespaces { + i := idx + w, err := o.Watch(testResource, watchNamespace) + go func() { + assert.NoError(t, err, "watch invocation failed") + for _, c := range cases { + fmt.Printf("%#v %#v\n", c, i) + event := <-w.ResultChan() + accessor, err := meta.Accessor(event.Object) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + assert.Equal(t, c.op, event.Type, "watch event mismatched") + assert.Equal(t, c.name, accessor.GetName(), "watched object mismatch") + } + wg.Done() + }() + } + for _, c := range cases { + switch c.op { + case watch.Added: + obj := getArbitraryResource(testResource, c.name, "test_namespace") + o.Create(testResource, obj, "test_namespace") + case watch.Modified: + obj := getArbitraryResource(testResource, c.name, "test_namespace") + o.Update(testResource, obj, "test_namespace") + case watch.Deleted: + o.Delete(testResource, "test_namespace", c.name) + } + } + wg.Wait() +}