From b675b2230c89185c5e97d64ee56eb25e9f3ca22f Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 10 Oct 2016 09:40:19 +0200 Subject: [PATCH] Avoid unnecessary decoding in etcd3 client --- pkg/storage/etcd3/store.go | 2 +- pkg/storage/etcd3/watcher.go | 58 ++++++++++++++++++++----------- pkg/storage/etcd3/watcher_test.go | 4 +-- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index 052bd3b2a08..9eb5fda3c6f 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -328,7 +328,7 @@ func (s *store) watch(ctx context.Context, key string, rv string, pred storage.S return nil, err } key = keyWithPrefix(s.pathPrefix, key) - return s.watcher.Watch(ctx, key, int64(rev), recursive, storage.SimpleFilter(pred)) + return s.watcher.Watch(ctx, key, int64(rev), recursive, pred) } func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index 3c41b29e240..7ccb61cb011 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -51,7 +51,7 @@ type watchChan struct { key string initialRev int64 recursive bool - filter storage.FilterFunc + internalFilter storage.FilterFunc ctx context.Context cancel context.CancelFunc incomingEventChan chan *event @@ -73,27 +73,31 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage. // If rev is non-zero, it will watch events happened after given revision. // If recursive is false, it watches on given key. // If recursive is true, it watches any children and directories under the key, excluding the root key itself. -// filter must be non-nil. Only if filter returns true will the changes be returned. -func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) (watch.Interface, error) { +// pred must be non-nil. Only if pred matches the change, it will be returned. +func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) { if recursive && !strings.HasSuffix(key, "/") { key += "/" } - wc := w.createWatchChan(ctx, key, rev, recursive, filter) + wc := w.createWatchChan(ctx, key, rev, recursive, pred) go wc.run() return wc, nil } -func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) *watchChan { +func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan { wc := &watchChan{ watcher: w, key: key, initialRev: rev, recursive: recursive, - filter: filter, + internalFilter: storage.SimpleFilter(pred), incomingEventChan: make(chan *event, incomingBufSize), resultChan: make(chan watch.Event, outgoingBufSize), errChan: make(chan error, 1), } + if pred.Label.Empty() && pred.Field.Empty() { + // The filter doesn't filter out any object. + wc.internalFilter = nil + } wc.ctx, wc.cancel = context.WithCancel(ctx) return wc } @@ -231,16 +235,20 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) { } } +func (wc *watchChan) filter(obj runtime.Object) bool { + if wc.internalFilter == nil { + return true + } + return wc.internalFilter(obj) +} + +func (wc *watchChan) acceptAll() bool { + return wc.internalFilter == nil +} + // transform transforms an event into a result for user if not filtered. -// TODO (Optimization): -// - Save remote round-trip. -// Currently, DELETE and PUT event don't contain the previous value. -// We need to do another Get() in order to get previous object and have logic upon it. -// We could potentially do some optimizations: -// - For PUT, we can save current and previous objects into the value. -// - For DELETE, See https://github.com/coreos/etcd/issues/4620 func (wc *watchChan) transform(e *event) (res *watch.Event) { - curObj, oldObj, err := prepareObjs(wc.ctx, e, wc.watcher.client, wc.watcher.codec, wc.watcher.versioner) + curObj, oldObj, err := wc.prepareObjs(e) if err != nil { glog.Errorf("failed to prepare current and previous objects: %v", err) wc.sendError(err) @@ -265,6 +273,13 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { Object: curObj, } default: + if wc.acceptAll() { + res = &watch.Event{ + Type: watch.Modified, + Object: curObj, + } + return res + } curObjPasses := wc.filter(curObj) oldObjPasses := wc.filter(oldObj) switch { @@ -332,19 +347,22 @@ func (wc *watchChan) sendEvent(e *event) { } } -func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) (curObj runtime.Object, oldObj runtime.Object, err error) { +func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) { if !e.isDeleted { - curObj, err = decodeObj(codec, versioner, e.value, e.rev) + curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.value, e.rev) if err != nil { return nil, nil, err } } - if len(e.prevValue) > 0 { + // We need to decode prevValue, only if this is deletion event or + // the underlying filter doesn't accept all objects (otherwise we + // know that the filter for previous object will return true and + // we need the object only to compute whether it was filtered out + // before). + if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { // Note that this sends the *old* object with the etcd revision for the time at // which it gets deleted. - // We assume old object is returned only in Deleted event. Users (e.g. cacher) need - // to have larger than previous rev to tell the ordering. - oldObj, err = decodeObj(codec, versioner, e.prevValue, e.rev) + oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.prevValue, e.rev) if err != nil { return nil, nil, err } diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 0ce078b2881..b0bdf9631d0 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -197,7 +197,7 @@ func TestWatchContextCancel(t *testing.T) { cancel() // When we watch with a canceled context, we should detect that it's context canceled. // We won't take it as error and also close the watcher. - w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.SimpleFilter(storage.Everything)) + w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything) if err != nil { t.Fatal(err) } @@ -216,7 +216,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { origCtx, store, cluster := testSetup(t) defer cluster.Terminate(t) ctx, cancel := context.WithCancel(origCtx) - w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.SimpleFilter(storage.Everything)) + w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything) // make resutlChan and errChan blocking to ensure ordering. w.resultChan = make(chan watch.Event) w.errChan = make(chan error)