diff --git a/staging/src/k8s.io/apiserver/pkg/storage/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/BUILD index 0e2f07eaf9e..ec50e3eb8a7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/BUILD @@ -28,6 +28,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index 82b477ab210..ac5e0160822 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -850,18 +850,28 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { return } - object, err := c.copier.Copy(event.Object) - if err != nil { - glog.Errorf("unexpected copy error: %v", err) - return - } var watchEvent watch.Event switch { case curObjPasses && !oldObjPasses: + object, err := c.copier.Copy(event.Object) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err)) + return + } watchEvent = watch.Event{Type: watch.Added, Object: object} case curObjPasses && oldObjPasses: + object, err := c.copier.Copy(event.Object) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err)) + return + } watchEvent = watch.Event{Type: watch.Modified, Object: object} case !curObjPasses && oldObjPasses: + object, err := c.copier.Copy(event.PrevObject) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err)) + return + } watchEvent = watch.Event{Type: watch.Deleted, Object: object} } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go index e6680c8c6a6..9a80b5f0da1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go @@ -17,13 +17,17 @@ limitations under the License. package storage import ( + "reflect" "sync" "testing" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/pkg/api/v1" ) @@ -55,3 +59,120 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err) } } + +func TestCacheWatcherHandlesFiltering(t *testing.T) { + filter := func(_ string, _ labels.Set, field fields.Set) bool { + return field["spec.nodeName"] == "host" + } + forget := func(bool) {} + + testCases := []struct { + events []*watchCacheEvent + expected []watch.Event + }{ + // properly handle starting with the filter, then being deleted, then re-added + { + events: []*watchCacheEvent{ + { + Type: watch.Added, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + }, + }, + expected: []watch.Event{ + {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, + {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, + {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, + }, + }, + // properly handle ignoring changes prior to the filter, then getting added, then deleted + { + events: []*watchCacheEvent{ + { + Type: watch.Added, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + }, + { + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + }, + }, + expected: []watch.Event{ + {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, + {Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, + {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, + }, + }, + } + +TestCase: + for i, testCase := range testCases { + // set the size of the buffer of w.result to 0, so that the writes to + // w.result is blocked. + for j := range testCase.events { + testCase.events[j].ResourceVersion = uint64(j) + 1 + } + w := newCacheWatcher(scheme.Scheme, 0, 0, testCase.events, filter, forget) + ch := w.ResultChan() + for j, event := range testCase.expected { + e := <-ch + if !reflect.DeepEqual(event, e) { + t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e)) + break TestCase + } + } + select { + case obj, ok := <-ch: + t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok) + break TestCase + default: + } + w.Stop() + } +}