diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index c70359a6056..c38a696aa18 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -190,10 +190,10 @@ func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object, // ListPredicate returns a list of all the items matching m. func (e *Store) ListPredicate(ctx api.Context, m generic.Matcher, options *api.ListOptions) (runtime.Object, error) { list := e.NewListFunc() - filterFunc := e.filterAndDecorateFunction(m) + filter := e.createFilter(m) if name, ok := m.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { - err := e.Storage.GetToList(ctx, key, filterFunc, list) + err := e.Storage.GetToList(ctx, key, filter, list) return list, storeerr.InterpretListError(err, e.QualifiedResource) } // if we cannot extract a key based on the current context, the optimization is skipped @@ -202,7 +202,7 @@ func (e *Store) ListPredicate(ctx api.Context, m generic.Matcher, options *api.L if options == nil { options = &api.ListOptions{ResourceVersion: "0"} } - err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, filterFunc, list) + err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, filter, list) return list, storeerr.InterpretListError(err, e.QualifiedResource) } @@ -798,23 +798,23 @@ func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interfac // WatchPredicate starts a watch for the items that m matches. func (e *Store) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { - filterFunc := e.filterAndDecorateFunction(m) + filter := e.createFilter(m) if name, ok := m.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { if err != nil { return nil, err } - return e.Storage.Watch(ctx, key, resourceVersion, filterFunc) + return e.Storage.Watch(ctx, key, resourceVersion, filter) } // if we cannot extract a key based on the current context, the optimization is skipped } - return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc) + return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filter) } -func (e *Store) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool { - return func(obj runtime.Object) bool { +func (e *Store) createFilter(m generic.Matcher) storage.Filter { + filterFunc := func(obj runtime.Object) bool { matches, err := m.Matches(obj) if err != nil { glog.Errorf("unable to match watch: %v", err) @@ -828,6 +828,7 @@ func (e *Store) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object } return matches } + return storage.NewSimpleFilter(filterFunc) } // calculateTTL is a helper for retrieving the updated TTL for an object or returning an error diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index e258e5f1395..a0f4685eceb 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -200,7 +200,7 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre } // Implements storage.Interface. -func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) { +func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) { watchRV, err := ParseWatchResourceVersion(resourceVersion) if err != nil { return nil, err @@ -232,7 +232,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, } // Implements storage.Interface. -func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) { +func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) { return c.Watch(ctx, key, resourceVersion, filter) } @@ -242,12 +242,12 @@ func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ign } // Implements storage.Interface. -func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error { +func (c *Cacher) GetToList(ctx context.Context, key string, filter Filter, listObj runtime.Object) error { return c.storage.GetToList(ctx, key, filter, listObj) } // Implements storage.Interface. -func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error { +func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter Filter, listObj runtime.Object) error { if resourceVersion == "" { // If resourceVersion is not specified, serve it from underlying // storage (for backward compatibility). @@ -285,7 +285,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f if !ok { return fmt.Errorf("non runtime.Object returned from storage: %v", obj) } - if filterFunc(object) { + if filterFunc.Filter(object) { listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem())) } } @@ -350,8 +350,8 @@ func forgetWatcher(c *Cacher, index int) func(bool) { } } -func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter FilterFunc) FilterFunc { - return func(obj runtime.Object) bool { +func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter Filter) Filter { + filterFunc := func(obj runtime.Object) bool { objKey, err := keyFunc(obj) if err != nil { glog.Errorf("invalid object for filter: %v", obj) @@ -360,8 +360,9 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi if !strings.HasPrefix(objKey, key) { return false } - return filter(obj) + return filter.Filter(obj) } + return NewSimpleFilter(filterFunc) } // Returns resource version to which the underlying cache is synced. @@ -450,12 +451,12 @@ type cacheWatcher struct { sync.Mutex input chan watchCacheEvent result chan watch.Event - filter FilterFunc + filter Filter stopped bool forget func(bool) } -func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter Filter, forget func(bool)) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan watchCacheEvent, 10), result: make(chan watch.Event, 10), @@ -527,10 +528,10 @@ func (c *cacheWatcher) add(event watchCacheEvent) { } func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { - curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) + curObjPasses := event.Type != watch.Deleted && c.filter.Filter(event.Object) oldObjPasses := false if event.PrevObject != nil { - oldObjPasses = c.filter(event.PrevObject) + oldObjPasses = c.filter.Filter(event.PrevObject) } if !curObjPasses && !oldObjPasses { // Watcher is not interested in that object. diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index b723a5ac240..3a461d38368 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -185,7 +185,7 @@ type injectListError struct { storage.Interface } -func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error { +func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error { if self.errors > 0 { self.errors-- return fmt.Errorf("injected error") @@ -332,7 +332,7 @@ func TestFiltering(t *testing.T) { // Set up Watch for object "podFoo" with label filter set. selector := labels.SelectorFromSet(labels.Set{"filter": "foo"}) - filter := func(obj runtime.Object) bool { + filterFunc := func(obj runtime.Object) bool { metadata, err := meta.Accessor(obj) if err != nil { t.Errorf("Unexpected error: %v", err) @@ -340,6 +340,7 @@ func TestFiltering(t *testing.T) { } return selector.Matches(labels.Set(metadata.GetLabels())) } + filter := storage.NewSimpleFilter(filterFunc) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter) if err != nil { t.Fatalf("Unexpected error: %v", err) diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 00cecf7b958..62d01aa4090 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -223,7 +223,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, } // Implements storage.Interface. -func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { +func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) { if ctx == nil { glog.Errorf("Context is nil") } @@ -238,7 +238,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri } // Implements storage.Interface. -func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { +func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) { if ctx == nil { glog.Errorf("Context is nil") } @@ -318,7 +318,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run } // Implements storage.Interface. -func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error { +func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error { if ctx == nil { glog.Errorf("Context is nil") } @@ -358,7 +358,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F } // decodeNodeList walks the tree of each node in the list and decodes into the specified object -func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error { +func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.Filter, slicePtr interface{}) error { trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr)) defer trace.LogIfLong(400 * time.Millisecond) v, err := conversion.EnforcePtr(slicePtr) @@ -387,7 +387,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun } // being unable to set the version does not prevent the object from being extracted _ = h.versioner.UpdateObject(obj, node.ModifiedIndex) - if filter(obj) { + if filter.Filter(obj) { v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } if node.ModifiedIndex != 0 { @@ -400,7 +400,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun } // Implements storage.Interface. -func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error { +func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error { if ctx == nil { glog.Errorf("Context is nil") } @@ -569,7 +569,7 @@ func (h *etcdHelper) prefixEtcdKey(key string) string { // their Node.ModifiedIndex, which is unique across all types. // All implementations must be thread-safe. type etcdCache interface { - getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) + getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) addToCache(index uint64, obj runtime.Object) } @@ -577,14 +577,14 @@ func getTypeName(obj interface{}) string { return reflect.TypeOf(obj).String() } -func (h *etcdHelper) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) { +func (h *etcdHelper) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) { startTime := time.Now() defer func() { metrics.ObserveGetCache(startTime) }() obj, found := h.cache.Get(index) if found { - if !filter(obj.(runtime.Object)) { + if !filter.Filter(obj.(runtime.Object)) { return nil, true } // We should not return the object itself to avoid polluting the cache if someone diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index c3d7f4cda25..0b6444845b7 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -154,10 +154,11 @@ func TestListFiltered(t *testing.T) { } createPodList(t, helper, &list) - filter := func(obj runtime.Object) bool { + filterFunc := func(obj runtime.Object) bool { pod := obj.(*api.Pod) return pod.Name == "bar" } + filter := storage.NewSimpleFilter(filterFunc) var got api.PodList err := helper.List(context.TODO(), key, "", filter, &got) diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index e290e8ac420..e6cbd396967 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -88,7 +88,7 @@ type etcdWatcher struct { list bool // If we're doing a recursive watch, should be true. quorum bool // If we enable quorum, shoule be true include includeFunc - filter storage.FilterFunc + filter storage.Filter etcdIncoming chan *etcd.Response etcdError chan error @@ -116,7 +116,7 @@ const watchWaitDuration = 100 * time.Millisecond // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. // The versioner must be able to handle the objects that transform creates. func newEtcdWatcher( - list bool, quorum bool, include includeFunc, filter storage.FilterFunc, + list bool, quorum bool, include includeFunc, filter storage.Filter, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher { w := &etcdWatcher{ @@ -364,7 +364,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) { // the resourceVersion to resume will never be able to get past a bad value. return } - if !w.filter(obj) { + if !w.filter.Filter(obj) { return } action := watch.Added @@ -393,7 +393,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { // the resourceVersion to resume will never be able to get past a bad value. return } - curObjPasses := w.filter(curObj) + curObjPasses := w.filter.Filter(curObj) oldObjPasses := false var oldObj runtime.Object if res.PrevNode != nil && res.PrevNode.Value != "" { @@ -402,7 +402,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil { utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err)) } - oldObjPasses = w.filter(oldObj) + oldObjPasses = w.filter.Filter(oldObj) } } // Some changes to an object may cause it to start or stop matching a filter. @@ -451,7 +451,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { // the resourceVersion to resume will never be able to get past a bad value. return } - if !w.filter(obj) { + if !w.filter.Filter(obj) { return } w.emit(watch.Event{ diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index 80c95975e73..5c9ab590d50 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -39,7 +39,7 @@ var versioner = APIObjectVersioner{} // Implements etcdCache interface as empty methods (i.e. does not cache any objects) type fakeEtcdCache struct{} -func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) { +func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) { return nil, false } @@ -48,17 +48,22 @@ func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) { var _ etcdCache = &fakeEtcdCache{} +// firstLetterIsB implements storage.Filter interface. +type firstLetterIsB struct { +} + +func (f *firstLetterIsB) Filter(obj runtime.Object) bool { + return obj.(*api.Pod).Name[0] == 'b' +} + func TestWatchInterpretations(t *testing.T) { codec := testapi.Default.Codec() // Declare some pods to make the test cases compact. podFoo := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} podBar := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}} podBaz := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "baz"}} - firstLetterIsB := func(obj runtime.Object) bool { - return obj.(*api.Pod).Name[0] == 'b' - } - // All of these test cases will be run with the firstLetterIsB FilterFunc. + // All of these test cases will be run with the firstLetterIsB Filter. table := map[string]struct { actions []string // Run this test item for every action here. prevNodeValue string @@ -131,7 +136,7 @@ func TestWatchInterpretations(t *testing.T) { for name, item := range table { for _, action := range item.actions { - w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(true, false, nil, &firstLetterIsB{}, codec, versioner, nil, &fakeEtcdCache{}) emitCalled := false w.emit = func(event watch.Event) { emitCalled = true @@ -222,9 +227,10 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { codec := testapi.Default.Codec() - filter := func(obj runtime.Object) bool { + filterFunc := func(obj runtime.Object) bool { return obj.(*api.Pod).Name != "bar" } + filter := storage.NewSimpleFilter(filterFunc) w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{}) eventChan := make(chan watch.Event, 1) diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 689b8ac0d77..88984756a6f 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -274,7 +274,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob } // GetToList implements storage.Interface.GetToList. -func (s *store) GetToList(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error { +func (s *store) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error { listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err @@ -300,7 +300,7 @@ func (s *store) GetToList(ctx context.Context, key string, filter storage.Filter } // List implements storage.Interface.List. -func (s *store) List(ctx context.Context, key, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error { +func (s *store) List(ctx context.Context, key, resourceVersion string, filter storage.Filter, listObj runtime.Object) error { listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err @@ -332,16 +332,16 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, filter st } // Watch implements storage.Interface.Watch. -func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { +func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) { return s.watch(ctx, key, resourceVersion, filter, false) } // WatchList implements storage.Interface.WatchList. -func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { +func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) { return s.watch(ctx, key, resourceVersion, filter, true) } -func (s *store) watch(ctx context.Context, key string, rv string, filter storage.FilterFunc, recursive bool) (watch.Interface, error) { +func (s *store) watch(ctx context.Context, key string, rv string, filter storage.Filter, recursive bool) (watch.Interface, error) { rev, err := storage.ParseWatchResourceVersion(rv) if err != nil { return nil, err @@ -435,7 +435,7 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP // decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev. // On success, ListPtr would be set to the list of objects. -func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { +func decodeList(elems []*elemForDecode, filter storage.Filter, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { v, err := conversion.EnforcePtr(ListPtr) if err != nil || v.Kind() != reflect.Slice { panic("need ptr to slice") @@ -447,7 +447,7 @@ func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr inter } // being unable to set the version does not prevent the object from being extracted versioner.UpdateObject(obj, elem.rev) - if filter(obj) { + if filter.Filter(obj) { v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } } diff --git a/pkg/storage/etcd3/store_test.go b/pkg/storage/etcd3/store_test.go index fe5aabd5e7f..36c0d02b2e3 100644 --- a/pkg/storage/etcd3/store_test.go +++ b/pkg/storage/etcd3/store_test.go @@ -226,15 +226,15 @@ func TestGetToList(t *testing.T) { tests := []struct { key string - filter storage.FilterFunc + filter func(runtime.Object) bool expectedOut []*api.Pod }{{ // test GetToList on existing key key: key, - filter: storage.Everything, + filter: storage.EverythingFunc, expectedOut: []*api.Pod{storedObj}, }, { // test GetToList on non-existing key key: "/non-existing", - filter: storage.Everything, + filter: storage.EverythingFunc, expectedOut: nil, }, { // test GetToList with filter to reject the pod key: "/non-existing", @@ -250,7 +250,8 @@ func TestGetToList(t *testing.T) { for i, tt := range tests { out := &api.PodList{} - err := store.GetToList(ctx, tt.key, tt.filter, out) + filter := storage.NewSimpleFilter(tt.filter) + err := store.GetToList(ctx, tt.key, filter, out) if err != nil { t.Fatalf("GetToList failed: %v", err) } @@ -487,15 +488,15 @@ func TestList(t *testing.T) { tests := []struct { prefix string - filter storage.FilterFunc + filter func(runtime.Object) bool expectedOut []*api.Pod }{{ // test List on existing key prefix: "/one-level/", - filter: storage.Everything, + filter: storage.EverythingFunc, expectedOut: []*api.Pod{preset[0].storedObj}, }, { // test List on non-existing key prefix: "/non-existing/", - filter: storage.Everything, + filter: storage.EverythingFunc, expectedOut: nil, }, { // test List with filter prefix: "/one-level/", @@ -509,13 +510,14 @@ func TestList(t *testing.T) { expectedOut: nil, }, { // test List with multiple levels of directories and expect flattened result prefix: "/two-level/", - filter: storage.Everything, + filter: storage.EverythingFunc, expectedOut: []*api.Pod{preset[1].storedObj, preset[2].storedObj}, }} for i, tt := range tests { out := &api.PodList{} - err := store.List(ctx, tt.prefix, "0", tt.filter, out) + filter := storage.NewSimpleFilter(tt.filter) + err := store.List(ctx, tt.prefix, "0", filter, out) if err != nil { t.Fatalf("List failed: %v", err) } diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 5b1c5dc90f2..02bc31437d8 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -53,7 +53,7 @@ type watchChan struct { key string initialRev int64 recursive bool - filter storage.FilterFunc + filter storage.Filter ctx context.Context cancel context.CancelFunc incomingEventChan chan *event @@ -76,7 +76,7 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage. // If recursive is false, it watches on given key. // If recursive is true, it watches any children and directories under the key, excluding the root key itself. // filter must be non-nil. Only if filter returns true will the changes be returned. -func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) (watch.Interface, error) { +func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, filter storage.Filter) (watch.Interface, error) { if recursive && !strings.HasSuffix(key, "/") { key += "/" } @@ -85,7 +85,7 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo return wc, nil } -func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) *watchChan { +func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.Filter) *watchChan { wc := &watchChan{ watcher: w, key: key, @@ -221,7 +221,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { switch { case e.isDeleted: - if !wc.filter(oldObj) { + if !wc.filter.Filter(oldObj) { return nil } res = &watch.Event{ @@ -229,7 +229,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { Object: oldObj, } case e.isCreated: - if !wc.filter(curObj) { + if !wc.filter.Filter(curObj) { return nil } res = &watch.Event{ @@ -237,8 +237,8 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { Object: curObj, } default: - curObjPasses := wc.filter(curObj) - oldObjPasses := wc.filter(oldObj) + curObjPasses := wc.filter.Filter(curObj) + oldObjPasses := wc.filter.Filter(oldObj) switch { case curObjPasses && oldObjPasses: res = &watch.Event{ diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 44b22c83237..166f32042d3 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -57,12 +57,12 @@ func testWatch(t *testing.T, recursive bool) { tests := []struct { key string - filter storage.FilterFunc + filter func(runtime.Object) bool watchTests []*testWatchStruct }{{ // create a key key: "/somekey-1", watchTests: []*testWatchStruct{{podFoo, true, watch.Added}}, - filter: storage.Everything, + filter: storage.EverythingFunc, }, { // create a key but obj gets filtered key: "/somekey-2", watchTests: []*testWatchStruct{{podFoo, false, ""}}, @@ -77,7 +77,7 @@ func testWatch(t *testing.T, recursive bool) { }, { // update key: "/somekey-4", watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}}, - filter: storage.Everything, + filter: storage.EverythingFunc, }, { // delete because of being filtered key: "/somekey-5", watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}}, @@ -87,7 +87,8 @@ func testWatch(t *testing.T, recursive bool) { }, }} for i, tt := range tests { - w, err := store.watch(ctx, tt.key, "0", tt.filter, recursive) + filter := storage.NewSimpleFilter(tt.filter) + w, err := store.watch(ctx, tt.key, "0", filter, recursive) if err != nil { t.Fatalf("Watch failed: %v", err) } diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 8673a101fbc..f8da6124f3b 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -51,12 +51,21 @@ type ResponseMeta struct { ResourceVersion uint64 } -// FilterFunc is a predicate which takes an API object and returns true -// if and only if the object should remain in the set. -type FilterFunc func(obj runtime.Object) bool +// Filter is interface that is used to pass filtering mechanism. +type Filter interface { + // Filter is a predicate which takes an API object and returns true + // if and only if the object should remain in the set. + Filter(obj runtime.Object) bool +} -// Everything is a FilterFunc which accepts all objects. -func Everything(runtime.Object) bool { +// Everything is a Filter which accepts all objects. +var Everything Filter = everything{} + +// everything is implementation of Everything. +type everything struct { +} + +func (e everything) Filter(_ runtime.Object) bool { return true } @@ -102,14 +111,14 @@ type Interface interface { // resourceVersion may be used to specify what version to begin watching, // which should be the current resourceVersion, and no longer rv+1 // (e.g. reconnecting without missing any updates). - Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) + Watch(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) // WatchList begins watching the specified key's items. Items are decoded into API // objects and any item passing 'filter' are sent down to returned watch.Interface. // resourceVersion may be used to specify what version to begin watching, // which should be the current resourceVersion, and no longer rv+1 // (e.g. reconnecting without missing any updates). - WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) + WatchList(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) // Get unmarshals json found at key into objPtr. On a not found error, will either // return a zero object of the requested type, or an error, depending on ignoreNotFound. @@ -118,13 +127,13 @@ type Interface interface { // GetToList unmarshals json found at key and opaque it into *List api object // (an object that satisfies the runtime.IsList definition). - GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error + GetToList(ctx context.Context, key string, filter Filter, listObj runtime.Object) error // List unmarshalls jsons found at directory defined by key and opaque them // into *List api object (an object that satisfies runtime.IsList definition). // The returned contents may be delayed, but it is guaranteed that they will // be have at least 'resourceVersion'. - List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error + List(ctx context.Context, key string, resourceVersion string, filter Filter, listObj runtime.Object) error // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // retrying the update until success if there is index conflict. diff --git a/pkg/storage/util.go b/pkg/storage/util.go index 86ebc0e42ec..c4418c235c7 100644 --- a/pkg/storage/util.go +++ b/pkg/storage/util.go @@ -36,6 +36,25 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc { } } +// SimpleFilter implements Filter interface. +type SimpleFilter struct { + filterFunc func(runtime.Object) bool +} + +func (s *SimpleFilter) Filter(obj runtime.Object) bool { + return s.filterFunc(obj) +} + +func NewSimpleFilter(filterFunc func(runtime.Object) bool) Filter { + return &SimpleFilter{ + filterFunc: filterFunc, + } +} + +func EverythingFunc(runtime.Object) bool { + return true +} + // ParseWatchResourceVersion takes a resource version argument and converts it to // the etcd version we should pass to helper.Watch(). Because resourceVersion is // an opaque value, the default watch behavior for non-zero watch is to watch