Change filter to interface in storage.Interface

This commit is contained in:
Wojciech Tyczynski 2016-06-03 14:09:16 +02:00
parent d4df168186
commit 7f7ef0879f
13 changed files with 122 additions and 81 deletions

View File

@ -190,10 +190,10 @@ func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object,
// ListPredicate returns a list of all the items matching m. // ListPredicate returns a list of all the items matching m.
func (e *Store) ListPredicate(ctx api.Context, m generic.Matcher, options *api.ListOptions) (runtime.Object, error) { func (e *Store) ListPredicate(ctx api.Context, m generic.Matcher, options *api.ListOptions) (runtime.Object, error) {
list := e.NewListFunc() list := e.NewListFunc()
filterFunc := e.filterAndDecorateFunction(m) filter := e.createFilter(m)
if name, ok := m.MatchesSingle(); ok { if name, ok := m.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil { if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, filterFunc, list) err := e.Storage.GetToList(ctx, key, filter, list)
return list, storeerr.InterpretListError(err, e.QualifiedResource) return list, storeerr.InterpretListError(err, e.QualifiedResource)
} }
// if we cannot extract a key based on the current context, the optimization is skipped // if we cannot extract a key based on the current context, the optimization is skipped
@ -202,7 +202,7 @@ func (e *Store) ListPredicate(ctx api.Context, m generic.Matcher, options *api.L
if options == nil { if options == nil {
options = &api.ListOptions{ResourceVersion: "0"} options = &api.ListOptions{ResourceVersion: "0"}
} }
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, filterFunc, list) err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, filter, list)
return list, storeerr.InterpretListError(err, e.QualifiedResource) return list, storeerr.InterpretListError(err, e.QualifiedResource)
} }
@ -798,23 +798,23 @@ func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interfac
// WatchPredicate starts a watch for the items that m matches. // WatchPredicate starts a watch for the items that m matches.
func (e *Store) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { func (e *Store) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
filterFunc := e.filterAndDecorateFunction(m) filter := e.createFilter(m)
if name, ok := m.MatchesSingle(); ok { if name, ok := m.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil { if key, err := e.KeyFunc(ctx, name); err == nil {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return e.Storage.Watch(ctx, key, resourceVersion, filterFunc) return e.Storage.Watch(ctx, key, resourceVersion, filter)
} }
// if we cannot extract a key based on the current context, the optimization is skipped // if we cannot extract a key based on the current context, the optimization is skipped
} }
return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc) return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filter)
} }
func (e *Store) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool { func (e *Store) createFilter(m generic.Matcher) storage.Filter {
return func(obj runtime.Object) bool { filterFunc := func(obj runtime.Object) bool {
matches, err := m.Matches(obj) matches, err := m.Matches(obj)
if err != nil { if err != nil {
glog.Errorf("unable to match watch: %v", err) glog.Errorf("unable to match watch: %v", err)
@ -828,6 +828,7 @@ func (e *Store) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object
} }
return matches return matches
} }
return storage.NewSimpleFilter(filterFunc)
} }
// calculateTTL is a helper for retrieving the updated TTL for an object or returning an error // calculateTTL is a helper for retrieving the updated TTL for an object or returning an error

View File

@ -200,7 +200,7 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) { func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) {
watchRV, err := ParseWatchResourceVersion(resourceVersion) watchRV, err := ParseWatchResourceVersion(resourceVersion)
if err != nil { if err != nil {
return nil, err return nil, err
@ -232,7 +232,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) { func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) {
return c.Watch(ctx, key, resourceVersion, filter) return c.Watch(ctx, key, resourceVersion, filter)
} }
@ -242,12 +242,12 @@ func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ign
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error { func (c *Cacher) GetToList(ctx context.Context, key string, filter Filter, listObj runtime.Object) error {
return c.storage.GetToList(ctx, key, filter, listObj) return c.storage.GetToList(ctx, key, filter, listObj)
} }
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error { func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter Filter, listObj runtime.Object) error {
if resourceVersion == "" { if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying // If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). // storage (for backward compatibility).
@ -285,7 +285,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
if !ok { if !ok {
return fmt.Errorf("non runtime.Object returned from storage: %v", obj) return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
} }
if filterFunc(object) { if filterFunc.Filter(object) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem())) listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
} }
} }
@ -350,8 +350,8 @@ func forgetWatcher(c *Cacher, index int) func(bool) {
} }
} }
func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter FilterFunc) FilterFunc { func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter Filter) Filter {
return func(obj runtime.Object) bool { filterFunc := func(obj runtime.Object) bool {
objKey, err := keyFunc(obj) objKey, err := keyFunc(obj)
if err != nil { if err != nil {
glog.Errorf("invalid object for filter: %v", obj) glog.Errorf("invalid object for filter: %v", obj)
@ -360,8 +360,9 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
if !strings.HasPrefix(objKey, key) { if !strings.HasPrefix(objKey, key) {
return false return false
} }
return filter(obj) return filter.Filter(obj)
} }
return NewSimpleFilter(filterFunc)
} }
// Returns resource version to which the underlying cache is synced. // Returns resource version to which the underlying cache is synced.
@ -450,12 +451,12 @@ type cacheWatcher struct {
sync.Mutex sync.Mutex
input chan watchCacheEvent input chan watchCacheEvent
result chan watch.Event result chan watch.Event
filter FilterFunc filter Filter
stopped bool stopped bool
forget func(bool) forget func(bool)
} }
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter Filter, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{ watcher := &cacheWatcher{
input: make(chan watchCacheEvent, 10), input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10), result: make(chan watch.Event, 10),
@ -527,10 +528,10 @@ func (c *cacheWatcher) add(event watchCacheEvent) {
} }
func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object) curObjPasses := event.Type != watch.Deleted && c.filter.Filter(event.Object)
oldObjPasses := false oldObjPasses := false
if event.PrevObject != nil { if event.PrevObject != nil {
oldObjPasses = c.filter(event.PrevObject) oldObjPasses = c.filter.Filter(event.PrevObject)
} }
if !curObjPasses && !oldObjPasses { if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object. // Watcher is not interested in that object.

View File

@ -185,7 +185,7 @@ type injectListError struct {
storage.Interface storage.Interface
} }
func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error { func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
if self.errors > 0 { if self.errors > 0 {
self.errors-- self.errors--
return fmt.Errorf("injected error") return fmt.Errorf("injected error")
@ -332,7 +332,7 @@ func TestFiltering(t *testing.T) {
// Set up Watch for object "podFoo" with label filter set. // Set up Watch for object "podFoo" with label filter set.
selector := labels.SelectorFromSet(labels.Set{"filter": "foo"}) selector := labels.SelectorFromSet(labels.Set{"filter": "foo"})
filter := func(obj runtime.Object) bool { filterFunc := func(obj runtime.Object) bool {
metadata, err := meta.Accessor(obj) metadata, err := meta.Accessor(obj)
if err != nil { if err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
@ -340,6 +340,7 @@ func TestFiltering(t *testing.T) {
} }
return selector.Matches(labels.Set(metadata.GetLabels())) return selector.Matches(labels.Set(metadata.GetLabels()))
} }
filter := storage.NewSimpleFilter(filterFunc)
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)

View File

@ -223,7 +223,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
if ctx == nil { if ctx == nil {
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
@ -238,7 +238,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
if ctx == nil { if ctx == nil {
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
@ -318,7 +318,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error { func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error {
if ctx == nil { if ctx == nil {
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
@ -358,7 +358,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
} }
// decodeNodeList walks the tree of each node in the list and decodes into the specified object // decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error { func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.Filter, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr)) trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(400 * time.Millisecond) defer trace.LogIfLong(400 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr) v, err := conversion.EnforcePtr(slicePtr)
@ -387,7 +387,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
} }
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.ModifiedIndex) _ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
if filter(obj) { if filter.Filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} }
if node.ModifiedIndex != 0 { if node.ModifiedIndex != 0 {
@ -400,7 +400,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
} }
// Implements storage.Interface. // Implements storage.Interface.
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error { func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
if ctx == nil { if ctx == nil {
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
@ -569,7 +569,7 @@ func (h *etcdHelper) prefixEtcdKey(key string) string {
// their Node.ModifiedIndex, which is unique across all types. // their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe. // All implementations must be thread-safe.
type etcdCache interface { type etcdCache interface {
getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool)
addToCache(index uint64, obj runtime.Object) addToCache(index uint64, obj runtime.Object)
} }
@ -577,14 +577,14 @@ func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String() return reflect.TypeOf(obj).String()
} }
func (h *etcdHelper) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) { func (h *etcdHelper) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
metrics.ObserveGetCache(startTime) metrics.ObserveGetCache(startTime)
}() }()
obj, found := h.cache.Get(index) obj, found := h.cache.Get(index)
if found { if found {
if !filter(obj.(runtime.Object)) { if !filter.Filter(obj.(runtime.Object)) {
return nil, true return nil, true
} }
// We should not return the object itself to avoid polluting the cache if someone // We should not return the object itself to avoid polluting the cache if someone

View File

@ -154,10 +154,11 @@ func TestListFiltered(t *testing.T) {
} }
createPodList(t, helper, &list) createPodList(t, helper, &list)
filter := func(obj runtime.Object) bool { filterFunc := func(obj runtime.Object) bool {
pod := obj.(*api.Pod) pod := obj.(*api.Pod)
return pod.Name == "bar" return pod.Name == "bar"
} }
filter := storage.NewSimpleFilter(filterFunc)
var got api.PodList var got api.PodList
err := helper.List(context.TODO(), key, "", filter, &got) err := helper.List(context.TODO(), key, "", filter, &got)

View File

@ -88,7 +88,7 @@ type etcdWatcher struct {
list bool // If we're doing a recursive watch, should be true. list bool // If we're doing a recursive watch, should be true.
quorum bool // If we enable quorum, shoule be true quorum bool // If we enable quorum, shoule be true
include includeFunc include includeFunc
filter storage.FilterFunc filter storage.Filter
etcdIncoming chan *etcd.Response etcdIncoming chan *etcd.Response
etcdError chan error etcdError chan error
@ -116,7 +116,7 @@ const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
// The versioner must be able to handle the objects that transform creates. // The versioner must be able to handle the objects that transform creates.
func newEtcdWatcher( func newEtcdWatcher(
list bool, quorum bool, include includeFunc, filter storage.FilterFunc, list bool, quorum bool, include includeFunc, filter storage.Filter,
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
cache etcdCache) *etcdWatcher { cache etcdCache) *etcdWatcher {
w := &etcdWatcher{ w := &etcdWatcher{
@ -364,7 +364,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
// the resourceVersion to resume will never be able to get past a bad value. // the resourceVersion to resume will never be able to get past a bad value.
return return
} }
if !w.filter(obj) { if !w.filter.Filter(obj) {
return return
} }
action := watch.Added action := watch.Added
@ -393,7 +393,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
// the resourceVersion to resume will never be able to get past a bad value. // the resourceVersion to resume will never be able to get past a bad value.
return return
} }
curObjPasses := w.filter(curObj) curObjPasses := w.filter.Filter(curObj)
oldObjPasses := false oldObjPasses := false
var oldObj runtime.Object var oldObj runtime.Object
if res.PrevNode != nil && res.PrevNode.Value != "" { if res.PrevNode != nil && res.PrevNode.Value != "" {
@ -402,7 +402,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil { if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err)) utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
} }
oldObjPasses = w.filter(oldObj) oldObjPasses = w.filter.Filter(oldObj)
} }
} }
// Some changes to an object may cause it to start or stop matching a filter. // Some changes to an object may cause it to start or stop matching a filter.
@ -451,7 +451,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
// the resourceVersion to resume will never be able to get past a bad value. // the resourceVersion to resume will never be able to get past a bad value.
return return
} }
if !w.filter(obj) { if !w.filter.Filter(obj) {
return return
} }
w.emit(watch.Event{ w.emit(watch.Event{

View File

@ -39,7 +39,7 @@ var versioner = APIObjectVersioner{}
// Implements etcdCache interface as empty methods (i.e. does not cache any objects) // Implements etcdCache interface as empty methods (i.e. does not cache any objects)
type fakeEtcdCache struct{} type fakeEtcdCache struct{}
func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) { func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) {
return nil, false return nil, false
} }
@ -48,17 +48,22 @@ func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) {
var _ etcdCache = &fakeEtcdCache{} var _ etcdCache = &fakeEtcdCache{}
// firstLetterIsB implements storage.Filter interface.
type firstLetterIsB struct {
}
func (f *firstLetterIsB) Filter(obj runtime.Object) bool {
return obj.(*api.Pod).Name[0] == 'b'
}
func TestWatchInterpretations(t *testing.T) { func TestWatchInterpretations(t *testing.T) {
codec := testapi.Default.Codec() codec := testapi.Default.Codec()
// Declare some pods to make the test cases compact. // Declare some pods to make the test cases compact.
podFoo := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} podFoo := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
podBar := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}} podBar := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
podBaz := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "baz"}} podBaz := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "baz"}}
firstLetterIsB := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name[0] == 'b'
}
// All of these test cases will be run with the firstLetterIsB FilterFunc. // All of these test cases will be run with the firstLetterIsB Filter.
table := map[string]struct { table := map[string]struct {
actions []string // Run this test item for every action here. actions []string // Run this test item for every action here.
prevNodeValue string prevNodeValue string
@ -131,7 +136,7 @@ func TestWatchInterpretations(t *testing.T) {
for name, item := range table { for name, item := range table {
for _, action := range item.actions { for _, action := range item.actions {
w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(true, false, nil, &firstLetterIsB{}, codec, versioner, nil, &fakeEtcdCache{})
emitCalled := false emitCalled := false
w.emit = func(event watch.Event) { w.emit = func(event watch.Event) {
emitCalled = true emitCalled = true
@ -222,9 +227,10 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
codec := testapi.Default.Codec() codec := testapi.Default.Codec()
filter := func(obj runtime.Object) bool { filterFunc := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name != "bar" return obj.(*api.Pod).Name != "bar"
} }
filter := storage.NewSimpleFilter(filterFunc)
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{}) w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{})
eventChan := make(chan watch.Event, 1) eventChan := make(chan watch.Event, 1)

View File

@ -274,7 +274,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
} }
// GetToList implements storage.Interface.GetToList. // GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error { func (s *store) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj) listPtr, err := meta.GetItemsPtr(listObj)
if err != nil { if err != nil {
return err return err
@ -300,7 +300,7 @@ func (s *store) GetToList(ctx context.Context, key string, filter storage.Filter
} }
// List implements storage.Interface.List. // List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error { func (s *store) List(ctx context.Context, key, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj) listPtr, err := meta.GetItemsPtr(listObj)
if err != nil { if err != nil {
return err return err
@ -332,16 +332,16 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, filter st
} }
// Watch implements storage.Interface.Watch. // Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, filter, false) return s.watch(ctx, key, resourceVersion, filter, false)
} }
// WatchList implements storage.Interface.WatchList. // WatchList implements storage.Interface.WatchList.
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, filter, true) return s.watch(ctx, key, resourceVersion, filter, true)
} }
func (s *store) watch(ctx context.Context, key string, rv string, filter storage.FilterFunc, recursive bool) (watch.Interface, error) { func (s *store) watch(ctx context.Context, key string, rv string, filter storage.Filter, recursive bool) (watch.Interface, error) {
rev, err := storage.ParseWatchResourceVersion(rv) rev, err := storage.ParseWatchResourceVersion(rv)
if err != nil { if err != nil {
return nil, err return nil, err
@ -435,7 +435,7 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev. // decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev.
// On success, ListPtr would be set to the list of objects. // On success, ListPtr would be set to the list of objects.
func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error { func decodeList(elems []*elemForDecode, filter storage.Filter, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error {
v, err := conversion.EnforcePtr(ListPtr) v, err := conversion.EnforcePtr(ListPtr)
if err != nil || v.Kind() != reflect.Slice { if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice") panic("need ptr to slice")
@ -447,7 +447,7 @@ func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr inter
} }
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
versioner.UpdateObject(obj, elem.rev) versioner.UpdateObject(obj, elem.rev)
if filter(obj) { if filter.Filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} }
} }

View File

@ -226,15 +226,15 @@ func TestGetToList(t *testing.T) {
tests := []struct { tests := []struct {
key string key string
filter storage.FilterFunc filter func(runtime.Object) bool
expectedOut []*api.Pod expectedOut []*api.Pod
}{{ // test GetToList on existing key }{{ // test GetToList on existing key
key: key, key: key,
filter: storage.Everything, filter: storage.EverythingFunc,
expectedOut: []*api.Pod{storedObj}, expectedOut: []*api.Pod{storedObj},
}, { // test GetToList on non-existing key }, { // test GetToList on non-existing key
key: "/non-existing", key: "/non-existing",
filter: storage.Everything, filter: storage.EverythingFunc,
expectedOut: nil, expectedOut: nil,
}, { // test GetToList with filter to reject the pod }, { // test GetToList with filter to reject the pod
key: "/non-existing", key: "/non-existing",
@ -250,7 +250,8 @@ func TestGetToList(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
out := &api.PodList{} out := &api.PodList{}
err := store.GetToList(ctx, tt.key, tt.filter, out) filter := storage.NewSimpleFilter(tt.filter)
err := store.GetToList(ctx, tt.key, filter, out)
if err != nil { if err != nil {
t.Fatalf("GetToList failed: %v", err) t.Fatalf("GetToList failed: %v", err)
} }
@ -487,15 +488,15 @@ func TestList(t *testing.T) {
tests := []struct { tests := []struct {
prefix string prefix string
filter storage.FilterFunc filter func(runtime.Object) bool
expectedOut []*api.Pod expectedOut []*api.Pod
}{{ // test List on existing key }{{ // test List on existing key
prefix: "/one-level/", prefix: "/one-level/",
filter: storage.Everything, filter: storage.EverythingFunc,
expectedOut: []*api.Pod{preset[0].storedObj}, expectedOut: []*api.Pod{preset[0].storedObj},
}, { // test List on non-existing key }, { // test List on non-existing key
prefix: "/non-existing/", prefix: "/non-existing/",
filter: storage.Everything, filter: storage.EverythingFunc,
expectedOut: nil, expectedOut: nil,
}, { // test List with filter }, { // test List with filter
prefix: "/one-level/", prefix: "/one-level/",
@ -509,13 +510,14 @@ func TestList(t *testing.T) {
expectedOut: nil, expectedOut: nil,
}, { // test List with multiple levels of directories and expect flattened result }, { // test List with multiple levels of directories and expect flattened result
prefix: "/two-level/", prefix: "/two-level/",
filter: storage.Everything, filter: storage.EverythingFunc,
expectedOut: []*api.Pod{preset[1].storedObj, preset[2].storedObj}, expectedOut: []*api.Pod{preset[1].storedObj, preset[2].storedObj},
}} }}
for i, tt := range tests { for i, tt := range tests {
out := &api.PodList{} out := &api.PodList{}
err := store.List(ctx, tt.prefix, "0", tt.filter, out) filter := storage.NewSimpleFilter(tt.filter)
err := store.List(ctx, tt.prefix, "0", filter, out)
if err != nil { if err != nil {
t.Fatalf("List failed: %v", err) t.Fatalf("List failed: %v", err)
} }

View File

@ -53,7 +53,7 @@ type watchChan struct {
key string key string
initialRev int64 initialRev int64
recursive bool recursive bool
filter storage.FilterFunc filter storage.Filter
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
incomingEventChan chan *event incomingEventChan chan *event
@ -76,7 +76,7 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.
// If recursive is false, it watches on given key. // If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself. // If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// filter must be non-nil. Only if filter returns true will the changes be returned. // filter must be non-nil. Only if filter returns true will the changes be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) (watch.Interface, error) { func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, filter storage.Filter) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") { if recursive && !strings.HasSuffix(key, "/") {
key += "/" key += "/"
} }
@ -85,7 +85,7 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo
return wc, nil return wc, nil
} }
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) *watchChan { func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.Filter) *watchChan {
wc := &watchChan{ wc := &watchChan{
watcher: w, watcher: w,
key: key, key: key,
@ -221,7 +221,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
switch { switch {
case e.isDeleted: case e.isDeleted:
if !wc.filter(oldObj) { if !wc.filter.Filter(oldObj) {
return nil return nil
} }
res = &watch.Event{ res = &watch.Event{
@ -229,7 +229,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
Object: oldObj, Object: oldObj,
} }
case e.isCreated: case e.isCreated:
if !wc.filter(curObj) { if !wc.filter.Filter(curObj) {
return nil return nil
} }
res = &watch.Event{ res = &watch.Event{
@ -237,8 +237,8 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
Object: curObj, Object: curObj,
} }
default: default:
curObjPasses := wc.filter(curObj) curObjPasses := wc.filter.Filter(curObj)
oldObjPasses := wc.filter(oldObj) oldObjPasses := wc.filter.Filter(oldObj)
switch { switch {
case curObjPasses && oldObjPasses: case curObjPasses && oldObjPasses:
res = &watch.Event{ res = &watch.Event{

View File

@ -57,12 +57,12 @@ func testWatch(t *testing.T, recursive bool) {
tests := []struct { tests := []struct {
key string key string
filter storage.FilterFunc filter func(runtime.Object) bool
watchTests []*testWatchStruct watchTests []*testWatchStruct
}{{ // create a key }{{ // create a key
key: "/somekey-1", key: "/somekey-1",
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}}, watchTests: []*testWatchStruct{{podFoo, true, watch.Added}},
filter: storage.Everything, filter: storage.EverythingFunc,
}, { // create a key but obj gets filtered }, { // create a key but obj gets filtered
key: "/somekey-2", key: "/somekey-2",
watchTests: []*testWatchStruct{{podFoo, false, ""}}, watchTests: []*testWatchStruct{{podFoo, false, ""}},
@ -77,7 +77,7 @@ func testWatch(t *testing.T, recursive bool) {
}, { // update }, { // update
key: "/somekey-4", key: "/somekey-4",
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}}, watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}},
filter: storage.Everything, filter: storage.EverythingFunc,
}, { // delete because of being filtered }, { // delete because of being filtered
key: "/somekey-5", key: "/somekey-5",
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}}, watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}},
@ -87,7 +87,8 @@ func testWatch(t *testing.T, recursive bool) {
}, },
}} }}
for i, tt := range tests { for i, tt := range tests {
w, err := store.watch(ctx, tt.key, "0", tt.filter, recursive) filter := storage.NewSimpleFilter(tt.filter)
w, err := store.watch(ctx, tt.key, "0", filter, recursive)
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }

View File

@ -51,12 +51,21 @@ type ResponseMeta struct {
ResourceVersion uint64 ResourceVersion uint64
} }
// FilterFunc is a predicate which takes an API object and returns true // Filter is interface that is used to pass filtering mechanism.
// if and only if the object should remain in the set. type Filter interface {
type FilterFunc func(obj runtime.Object) bool // Filter is a predicate which takes an API object and returns true
// if and only if the object should remain in the set.
Filter(obj runtime.Object) bool
}
// Everything is a FilterFunc which accepts all objects. // Everything is a Filter which accepts all objects.
func Everything(runtime.Object) bool { var Everything Filter = everything{}
// everything is implementation of Everything.
type everything struct {
}
func (e everything) Filter(_ runtime.Object) bool {
return true return true
} }
@ -102,14 +111,14 @@ type Interface interface {
// resourceVersion may be used to specify what version to begin watching, // resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1 // which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates). // (e.g. reconnecting without missing any updates).
Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) Watch(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API // WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item passing 'filter' are sent down to returned watch.Interface. // objects and any item passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching, // resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1 // which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates). // (e.g. reconnecting without missing any updates).
WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) WatchList(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either // Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound. // return a zero object of the requested type, or an error, depending on ignoreNotFound.
@ -118,13 +127,13 @@ type Interface interface {
// GetToList unmarshals json found at key and opaque it into *List api object // GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition). // (an object that satisfies the runtime.IsList definition).
GetToList(ctx context.Context, key string, filter FilterFunc, listObj runtime.Object) error GetToList(ctx context.Context, key string, filter Filter, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them // List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition). // into *List api object (an object that satisfies runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will // The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'. // be have at least 'resourceVersion'.
List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error List(ctx context.Context, key string, resourceVersion string, filter Filter, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict. // retrying the update until success if there is index conflict.

View File

@ -36,6 +36,25 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
} }
} }
// SimpleFilter implements Filter interface.
type SimpleFilter struct {
filterFunc func(runtime.Object) bool
}
func (s *SimpleFilter) Filter(obj runtime.Object) bool {
return s.filterFunc(obj)
}
func NewSimpleFilter(filterFunc func(runtime.Object) bool) Filter {
return &SimpleFilter{
filterFunc: filterFunc,
}
}
func EverythingFunc(runtime.Object) bool {
return true
}
// ParseWatchResourceVersion takes a resource version argument and converts it to // ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is // the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch // an opaque value, the default watch behavior for non-zero watch is to watch