diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index e3c787d0842..27ead5a1d78 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -682,12 +682,16 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b } func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool { - f := SimpleFilter(p) filterFunc := func(objKey string, obj runtime.Object) bool { if !hasPathPrefix(objKey, key) { return false } - return f(obj) + matches, err := p.Matches(obj) + if err != nil { + glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err) + return false + } + return matches } return filterFunc } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go index 322ccefabd1..6f37f99d2dc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go @@ -240,7 +240,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri return nil, err } key = path.Join(h.pathPrefix, key) - w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h) + w := newEtcdWatcher(false, h.quorum, nil, pred, h.codec, h.versioner, nil, h.transformer, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil } @@ -255,7 +255,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion return nil, err } key = path.Join(h.pathPrefix, key) - w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h) + w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred, h.codec, h.versioner, nil, h.transformer, h) go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV) return w, nil } @@ -359,7 +359,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion nodes := make([]*etcd.Node, 0) nodes = append(nodes, response.Node) - if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil { + if err := h.decodeNodeList(nodes, pred, listPtr); err != nil { return err } trace.Step("Object decoded") @@ -370,7 +370,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion } // decodeNodeList walks the tree of each node in the list and decodes into the specified object -func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error { +func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, pred storage.SelectionPredicate, slicePtr interface{}) error { trace := utiltrace.New("decodeNodeList " + getTypeName(slicePtr)) defer trace.LogIfLong(400 * time.Millisecond) v, err := conversion.EnforcePtr(slicePtr) @@ -383,13 +383,13 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun // IMPORTANT: do not log each key as a discrete step in the trace log // as it produces an immense amount of log spam when there is a large // amount of content in the list. - if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil { + if err := h.decodeNodeList(node.Nodes, pred, slicePtr); err != nil { return err } continue } - if obj, found := h.getFromCache(node.ModifiedIndex, filter); found { - // obj != nil iff it matches the filter function. + if obj, found := h.getFromCache(node.ModifiedIndex, pred); found { + // obj != nil iff it matches the pred function. if obj != nil { v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } @@ -407,7 +407,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun } // being unable to set the version does not prevent the object from being extracted _ = h.versioner.UpdateObject(obj, node.ModifiedIndex) - if filter(obj) { + if matched, err := pred.Matches(obj); err == nil && matched { v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } if node.ModifiedIndex != 0 { @@ -439,7 +439,7 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin if err != nil { return err } - if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil { + if err := h.decodeNodeList(nodes, pred, listPtr); err != nil { return err } trace.Step("Node list decoded") @@ -590,7 +590,7 @@ func (h *etcdHelper) GuaranteedUpdate( // their Node.ModifiedIndex, which is unique across all types. // All implementations must be thread-safe. type etcdCache interface { - getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) + getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) addToCache(index uint64, obj runtime.Object) } @@ -598,14 +598,14 @@ func getTypeName(obj interface{}) string { return reflect.TypeOf(obj).String() } -func (h *etcdHelper) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) { +func (h *etcdHelper) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) { startTime := time.Now() defer func() { metrics.ObserveGetCache(startTime) }() obj, found := h.cache.Get(index) if found { - if !filter(obj.(runtime.Object)) { + if matched, err := pred.Matches(obj.(runtime.Object)); err != nil || !matched { return nil, true } // We should not return the object itself to avoid polluting the cache if someone diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go index 1cd368bd86e..d86cd50ebab 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go @@ -78,7 +78,7 @@ type etcdWatcher struct { list bool // If we're doing a recursive watch, should be true. quorum bool // If we enable quorum, shoule be true include includeFunc - filter storage.FilterFunc + pred storage.SelectionPredicate etcdIncoming chan *etcd.Response etcdError chan error @@ -105,11 +105,9 @@ const watchWaitDuration = 100 * time.Millisecond // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. // The versioner must be able to handle the objects that transform creates. -func newEtcdWatcher( - list bool, quorum bool, include includeFunc, filter storage.FilterFunc, +func newEtcdWatcher(list bool, quorum bool, include includeFunc, pred storage.SelectionPredicate, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, - valueTransformer ValueTransformer, - cache etcdCache) *etcdWatcher { + valueTransformer ValueTransformer, cache etcdCache) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, @@ -119,7 +117,7 @@ func newEtcdWatcher( list: list, quorum: quorum, include: include, - filter: filter, + pred: pred, // Buffer this channel, so that the etcd client is not forced // to context switch with every object it gets, and so that a // long time spent decoding an object won't block the *next* @@ -315,7 +313,7 @@ func (w *etcdWatcher) translate() { // decodeObject extracts an object from the provided etcd node or returns an error. func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) { - if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found { + if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found { return obj, nil } @@ -365,7 +363,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) { // the resourceVersion to resume will never be able to get past a bad value. return } - if !w.filter(obj) { + if matched, err := w.pred.Matches(obj); err != nil || !matched { return } action := watch.Added @@ -391,7 +389,10 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { // the resourceVersion to resume will never be able to get past a bad value. return } - curObjPasses := w.filter(curObj) + curObjPasses := false + if matched, err := w.pred.Matches(curObj); err == nil && matched { + curObjPasses = true + } oldObjPasses := false var oldObj runtime.Object if res.PrevNode != nil && res.PrevNode.Value != "" { @@ -400,10 +401,12 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil { utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err)) } - oldObjPasses = w.filter(oldObj) + if matched, err := w.pred.Matches(oldObj); err == nil && matched { + oldObjPasses = true + } } } - // Some changes to an object may cause it to start or stop matching a filter. + // Some changes to an object may cause it to start or stop matching a pred. // We need to report those as adds/deletes. So we have to check both the previous // and current value of the object. switch { @@ -423,7 +426,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) { Object: oldObj, }) } - // Do nothing if neither new nor old object passed the filter. + // Do nothing if neither new nor old object passed the pred. } func (w *etcdWatcher) sendDelete(res *etcd.Response) { @@ -449,7 +452,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { // the resourceVersion to resume will never be able to get past a bad value. return } - if !w.filter(obj) { + if matched, err := w.pred.Matches(obj); err != nil || !matched { return } w.emit(watch.Event{ diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go index aae3813a042..4c4d038dce5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher_test.go @@ -23,6 +23,8 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apitesting "k8s.io/apimachinery/pkg/api/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" @@ -41,7 +43,7 @@ var versioner = APIObjectVersioner{} // Implements etcdCache interface as empty methods (i.e. does not cache any objects) type fakeEtcdCache struct{} -func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) { +func (f *fakeEtcdCache) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) { return nil, false } @@ -58,7 +60,7 @@ func TestWatchInterpretations(t *testing.T) { podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} podBaz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}} - // All of these test cases will be run with the firstLetterIsB Filter. + // All of these test cases will be run with the firstLetterIsB SelectionPredicate. table := map[string]struct { actions []string // Run this test item for every action here. prevNodeValue string @@ -128,8 +130,21 @@ func TestWatchInterpretations(t *testing.T) { expectEmit: false, }, } - firstLetterIsB := func(obj runtime.Object) bool { - return obj.(*example.Pod).Name[0] == 'b' + + // Should use fieldSelector here. + // But for the sake of tests (simplifying the codes), use labelSelector to support set-based requirements + selector, err := labels.Parse("metadata.name in (bar, baz)") + if err != nil { + t.Fatal(err) + } + firstLetterIsB := storage.SelectionPredicate{ + Label: selector, + Field: fields.Everything(), + IncludeUninitialized: true, + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return labels.Set{"metadata.name": pod.Name}, nil, pod.Initializers != nil, nil + }, } for name, item := range table { for _, action := range item.actions { @@ -173,7 +188,7 @@ func TestWatchInterpretations(t *testing.T) { func TestWatchInterpretation_ResponseNotSet(t *testing.T) { _, codecs := testScheme(t) codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) - w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -189,7 +204,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -205,7 +220,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -228,10 +243,17 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { _, codecs := testScheme(t) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - filter := func(obj runtime.Object) bool { - return obj.(*example.Pod).Name != "bar" + selector, _ := fields.ParseSelector("metadata.name!=bar") + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: selector, + IncludeUninitialized: true, + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil + }, } - w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) + w := newEtcdWatcher(false, false, nil, pred, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{}) eventChan := make(chan watch.Event, 1) w.emit = func(e watch.Event) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 2350a5526b8..ff654adec57 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -403,7 +403,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin if err != nil || v.Kind() != reflect.Slice { panic("need ptr to slice") } - if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), storage.SimpleFilter(pred), s.codec, s.versioner); err != nil { + if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil { return err } // update version with cluster level revision @@ -492,8 +492,6 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor } keyPrefix := key - filter := storage.SimpleFilter(pred) - // set the appropriate clientv3 options to filter the returned data set var paging bool options := make([]clientv3.OpOption, 0, 4) @@ -587,7 +585,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor continue } - if err := appendListItem(v, data, uint64(kv.ModRevision), filter, s.codec, s.versioner); err != nil { + if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner); err != nil { return err } } @@ -774,14 +772,14 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP } // appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice. -func appendListItem(v reflect.Value, data []byte, rev uint64, filter storage.FilterFunc, codec runtime.Codec, versioner storage.Versioner) error { +func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner) error { obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object)) if err != nil { return err } // being unable to set the version does not prevent the object from being extracted versioner.UpdateObject(obj, rev) - if filter(obj) { + if matched, err := pred.Matches(obj); err == nil && matched { v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } return nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 366e161cfa0..38aae2f1fa6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -72,7 +72,7 @@ type watchChan struct { key string initialRev int64 recursive bool - internalFilter storage.FilterFunc + internalPred storage.SelectionPredicate ctx context.Context cancel context.CancelFunc incomingEventChan chan *event @@ -111,14 +111,14 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re key: key, initialRev: rev, recursive: recursive, - internalFilter: storage.SimpleFilter(pred), + internalPred: pred, incomingEventChan: make(chan *event, incomingBufSize), resultChan: make(chan watch.Event, outgoingBufSize), errChan: make(chan error, 1), } if pred.Empty() { // The filter doesn't filter out any object. - wc.internalFilter = nil + wc.internalPred = storage.Everything } wc.ctx, wc.cancel = context.WithCancel(ctx) return wc @@ -250,14 +250,15 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) { } func (wc *watchChan) filter(obj runtime.Object) bool { - if wc.internalFilter == nil { + if wc.internalPred.Empty() { return true } - return wc.internalFilter(obj) + matched, err := wc.internalPred.Matches(obj) + return err == nil && matched } func (wc *watchChan) acceptAll() bool { - return wc.internalFilter == nil + return wc.internalPred.Empty() } // transform transforms an event into a result for user if not filtered. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 0d81f05c3d9..987d84f5780 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -70,10 +70,6 @@ type MatchValue struct { // to that function. type TriggerPublisherFunc func(obj runtime.Object) []MatchValue -// FilterFunc takes an API object and returns true if the object satisfies some requirements. -// TODO: We will remove this type and use SelectionPredicate everywhere. -type FilterFunc func(obj runtime.Object) bool - // Everything accepts all objects. var Everything = SelectionPredicate{ Label: labels.Everything(), diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go index 83e423a97ab..9c8c3d5994c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go @@ -96,7 +96,7 @@ func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) { } matched := s.Label.Matches(labels) if matched && s.Field != nil { - matched = (matched && s.Field.Matches(fields)) + matched = matched && s.Field.Matches(fields) } return matched, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/util.go b/staging/src/k8s.io/apiserver/pkg/storage/util.go index 3e0b7211b95..ebe54ba2cc7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/util.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/util.go @@ -22,8 +22,6 @@ import ( "strings" "sync/atomic" - "github.com/golang/glog" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/validation/path" "k8s.io/apimachinery/pkg/runtime" @@ -40,19 +38,6 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc { } } -// SimpleFilter converts a selection predicate into a FilterFunc. -// It ignores any error from Matches(). -func SimpleFilter(p SelectionPredicate) FilterFunc { - return func(obj runtime.Object) bool { - matches, err := p.Matches(obj) - if err != nil { - glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err) - return false - } - return matches - } -} - func EverythingFunc(runtime.Object) bool { return true }