From f01bc0a62f950883a2de0fefc2d8830d44e916a4 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Mon, 22 Aug 2016 11:45:03 -0700 Subject: [PATCH] decouple Decorator from filter: - Error from Decorator will be returned instead of assuming false filtering - For List, Decorator is called on return list object - For Watch, we implement a new watcher to pipe through decorator --- .../generic/registry/decorated_watcher.go | 98 +++++++++++++++++++ .../registry/decorated_watcher_test.go | 74 ++++++++++++++ pkg/registry/generic/registry/store.go | 45 ++++++--- 3 files changed, 203 insertions(+), 14 deletions(-) create mode 100644 pkg/registry/generic/registry/decorated_watcher.go create mode 100644 pkg/registry/generic/registry/decorated_watcher_test.go diff --git a/pkg/registry/generic/registry/decorated_watcher.go b/pkg/registry/generic/registry/decorated_watcher.go new file mode 100644 index 00000000000..2bf3049af5a --- /dev/null +++ b/pkg/registry/generic/registry/decorated_watcher.go @@ -0,0 +1,98 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "net/http" + + "golang.org/x/net/context" + + "k8s.io/kubernetes/pkg/api/rest" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/watch" +) + +type decoratedWatcher struct { + w watch.Interface + decorator rest.ObjectFunc + cancel context.CancelFunc + resultCh chan watch.Event +} + +func newDecoratedWatcher(w watch.Interface, decorator rest.ObjectFunc) *decoratedWatcher { + ctx, cancel := context.WithCancel(context.Background()) + d := &decoratedWatcher{ + w: w, + decorator: decorator, + cancel: cancel, + resultCh: make(chan watch.Event), + } + go d.run(ctx) + return d +} + +func (d *decoratedWatcher) run(ctx context.Context) { + var recv, send watch.Event + for { + select { + case recv = <-d.w.ResultChan(): + switch recv.Type { + case watch.Added, watch.Modified, watch.Deleted: + err := d.decorator(recv.Object) + if err != nil { + send = makeStatusErrorEvent(err) + break + } + send = recv + case watch.Error: + send = recv + } + select { + case d.resultCh <- send: + if send.Type == watch.Error { + d.cancel() + } + case <-ctx.Done(): + } + case <-ctx.Done(): + d.w.Stop() + close(d.resultCh) + return + } + } +} + +func (d *decoratedWatcher) Stop() { + d.cancel() +} + +func (d *decoratedWatcher) ResultChan() <-chan watch.Event { + return d.resultCh +} + +func makeStatusErrorEvent(err error) watch.Event { + status := &unversioned.Status{ + Status: unversioned.StatusFailure, + Message: err.Error(), + Code: http.StatusInternalServerError, + Reason: unversioned.StatusReasonInternalError, + } + return watch.Event{ + Type: watch.Error, + Object: status, + } +} diff --git a/pkg/registry/generic/registry/decorated_watcher_test.go b/pkg/registry/generic/registry/decorated_watcher_test.go new file mode 100644 index 00000000000..9d3a2674939 --- /dev/null +++ b/pkg/registry/generic/registry/decorated_watcher_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "fmt" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" +) + +func TestDecoratedWatcher(t *testing.T) { + w := watch.NewFake() + decorator := func(obj runtime.Object) error { + pod := obj.(*api.Pod) + pod.Annotations = map[string]string{"decorated": "true"} + return nil + } + dw := newDecoratedWatcher(w, decorator) + defer dw.Stop() + + go w.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + select { + case e := <-dw.ResultChan(): + pod, ok := e.Object.(*api.Pod) + if !ok { + t.Errorf("Should received object of type *api.Pod, get type (%T)", e.Object) + return + } + if pod.Annotations["decorated"] != "true" { + t.Errorf("pod.Annotations[\"decorated\"], want=%s, get=%s", "true", pod.Labels["decorated"]) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timeout after %v", wait.ForeverTestTimeout) + } +} + +func TestDecoratedWatcherError(t *testing.T) { + w := watch.NewFake() + expErr := fmt.Errorf("expected error") + decorator := func(obj runtime.Object) error { + return expErr + } + dw := newDecoratedWatcher(w, decorator) + defer dw.Stop() + + go w.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + select { + case e := <-dw.ResultChan(): + if e.Type != watch.Error { + t.Errorf("event type want=%v, get=%v", watch.Error, e.Type) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timeout after %v", wait.ForeverTestTimeout) + } +} diff --git a/pkg/registry/generic/registry/store.go b/pkg/registry/generic/registry/store.go index 9cf3872d8c3..f575bbd79f6 100644 --- a/pkg/registry/generic/registry/store.go +++ b/pkg/registry/generic/registry/store.go @@ -94,11 +94,11 @@ type Store struct { // DeleteCollection call. DeleteCollectionWorkers int - // Called on all objects returned from the underlying store, after - // the exit hooks are invoked. Decorators are intended for integrations - // that are above storage and should only be used for specific cases where - // storage of the value is not appropriate, since they cannot - // be watched. + // Decorator is called as exit hook on object returned from the underlying storage. + // The returned object could be individual object (e.g. Pod) or the list type (e.g. PodList). + // Decorator is intended for integrations that are above storage and + // should only be used for specific cases where storage of the value is + // not appropriate, since they cannot be watched. Decorator rest.ObjectFunc // Allows extended behavior during creation, required CreateStrategy rest.RESTCreateStrategy @@ -184,7 +184,16 @@ func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object, if options != nil && options.FieldSelector != nil { field = options.FieldSelector } - return e.ListPredicate(ctx, e.PredicateFunc(label, field), options) + out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options) + if err != nil { + return nil, err + } + if e.Decorator != nil { + if err := e.Decorator(out); err != nil { + return nil, err + } + } + return out, nil } // ListPredicate returns a list of all the items matching m. @@ -825,12 +834,26 @@ func (e *Store) WatchPredicate(ctx api.Context, m *generic.SelectionPredicate, r if err != nil { return nil, err } - return e.Storage.Watch(ctx, key, resourceVersion, filter) + w, err := e.Storage.Watch(ctx, key, resourceVersion, filter) + if err != nil { + return nil, err + } + if e.Decorator != nil { + return newDecoratedWatcher(w, e.Decorator), nil + } + return w, nil } // if we cannot extract a key based on the current context, the optimization is skipped } - return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filter) + w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filter) + if err != nil { + return nil, err + } + if e.Decorator != nil { + return newDecoratedWatcher(w, e.Decorator), nil + } + return w, nil } func (e *Store) createFilter(m *generic.SelectionPredicate) storage.Filter { @@ -840,12 +863,6 @@ func (e *Store) createFilter(m *generic.SelectionPredicate) storage.Filter { glog.Errorf("unable to match watch: %v", err) return false } - if matches && e.Decorator != nil { - if err := e.Decorator(obj); err != nil { - glog.Errorf("unable to decorate watch: %v", err) - return false - } - } return matches } return storage.NewSimpleFilter(filterFunc, m.MatcherIndex)