decouple Decorator from filter:

- Error from Decorator will be returned instead of assuming false filtering
- For List, Decorator is called on return list object
- For Watch, we implement a new watcher to pipe through decorator
This commit is contained in:
Hongchao Deng 2016-08-22 11:45:03 -07:00
parent 6f90c00108
commit f01bc0a62f
3 changed files with 203 additions and 14 deletions

View File

@ -0,0 +1,98 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"net/http"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/watch"
)
type decoratedWatcher struct {
w watch.Interface
decorator rest.ObjectFunc
cancel context.CancelFunc
resultCh chan watch.Event
}
func newDecoratedWatcher(w watch.Interface, decorator rest.ObjectFunc) *decoratedWatcher {
ctx, cancel := context.WithCancel(context.Background())
d := &decoratedWatcher{
w: w,
decorator: decorator,
cancel: cancel,
resultCh: make(chan watch.Event),
}
go d.run(ctx)
return d
}
func (d *decoratedWatcher) run(ctx context.Context) {
var recv, send watch.Event
for {
select {
case recv = <-d.w.ResultChan():
switch recv.Type {
case watch.Added, watch.Modified, watch.Deleted:
err := d.decorator(recv.Object)
if err != nil {
send = makeStatusErrorEvent(err)
break
}
send = recv
case watch.Error:
send = recv
}
select {
case d.resultCh <- send:
if send.Type == watch.Error {
d.cancel()
}
case <-ctx.Done():
}
case <-ctx.Done():
d.w.Stop()
close(d.resultCh)
return
}
}
}
func (d *decoratedWatcher) Stop() {
d.cancel()
}
func (d *decoratedWatcher) ResultChan() <-chan watch.Event {
return d.resultCh
}
func makeStatusErrorEvent(err error) watch.Event {
status := &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Code: http.StatusInternalServerError,
Reason: unversioned.StatusReasonInternalError,
}
return watch.Event{
Type: watch.Error,
Object: status,
}
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
func TestDecoratedWatcher(t *testing.T) {
w := watch.NewFake()
decorator := func(obj runtime.Object) error {
pod := obj.(*api.Pod)
pod.Annotations = map[string]string{"decorated": "true"}
return nil
}
dw := newDecoratedWatcher(w, decorator)
defer dw.Stop()
go w.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
select {
case e := <-dw.ResultChan():
pod, ok := e.Object.(*api.Pod)
if !ok {
t.Errorf("Should received object of type *api.Pod, get type (%T)", e.Object)
return
}
if pod.Annotations["decorated"] != "true" {
t.Errorf("pod.Annotations[\"decorated\"], want=%s, get=%s", "true", pod.Labels["decorated"])
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
}
}
func TestDecoratedWatcherError(t *testing.T) {
w := watch.NewFake()
expErr := fmt.Errorf("expected error")
decorator := func(obj runtime.Object) error {
return expErr
}
dw := newDecoratedWatcher(w, decorator)
defer dw.Stop()
go w.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
select {
case e := <-dw.ResultChan():
if e.Type != watch.Error {
t.Errorf("event type want=%v, get=%v", watch.Error, e.Type)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
}
}

View File

@ -94,11 +94,11 @@ type Store struct {
// DeleteCollection call.
DeleteCollectionWorkers int
// Called on all objects returned from the underlying store, after
// the exit hooks are invoked. Decorators are intended for integrations
// that are above storage and should only be used for specific cases where
// storage of the value is not appropriate, since they cannot
// be watched.
// Decorator is called as exit hook on object returned from the underlying storage.
// The returned object could be individual object (e.g. Pod) or the list type (e.g. PodList).
// Decorator is intended for integrations that are above storage and
// should only be used for specific cases where storage of the value is
// not appropriate, since they cannot be watched.
Decorator rest.ObjectFunc
// Allows extended behavior during creation, required
CreateStrategy rest.RESTCreateStrategy
@ -184,7 +184,16 @@ func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object,
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
return e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
if err != nil {
return nil, err
}
if e.Decorator != nil {
if err := e.Decorator(out); err != nil {
return nil, err
}
}
return out, nil
}
// ListPredicate returns a list of all the items matching m.
@ -825,12 +834,26 @@ func (e *Store) WatchPredicate(ctx api.Context, m *generic.SelectionPredicate, r
if err != nil {
return nil, err
}
return e.Storage.Watch(ctx, key, resourceVersion, filter)
w, err := e.Storage.Watch(ctx, key, resourceVersion, filter)
if err != nil {
return nil, err
}
if e.Decorator != nil {
return newDecoratedWatcher(w, e.Decorator), nil
}
return w, nil
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filter)
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filter)
if err != nil {
return nil, err
}
if e.Decorator != nil {
return newDecoratedWatcher(w, e.Decorator), nil
}
return w, nil
}
func (e *Store) createFilter(m *generic.SelectionPredicate) storage.Filter {
@ -840,12 +863,6 @@ func (e *Store) createFilter(m *generic.SelectionPredicate) storage.Filter {
glog.Errorf("unable to match watch: %v", err)
return false
}
if matches && e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
glog.Errorf("unable to decorate watch: %v", err)
return false
}
}
return matches
}
return storage.NewSimpleFilter(filterFunc, m.MatcherIndex)