Merge pull request #31189 from hongchaodeng/r1

Automatic merge from submit-queue

api storage: Decouple Decorator from Filter

Continue #28249

What?
This PR decouples Decorator from Filter, i.e. remove Decorator in createFilter().
- For List, Decorator is called on returned list object.
- For Watch, we implement a new watcher to pipe through decorator. Error will be returned as a watch event.

Why?
- We want to change filter to SelectionPredicate struct. But Decorator is designed to be coupled with filtering.
- Per the discussion in #28249, decorator shouldn't be coupled to filter and error from Decorator should be returned instead of assuming false filtering.
This commit is contained in:
Kubernetes Submit Queue 2016-09-08 04:25:05 -07:00 committed by GitHub
commit c27326a26c
3 changed files with 203 additions and 14 deletions

View File

@ -0,0 +1,98 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"net/http"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/watch"
)
type decoratedWatcher struct {
w watch.Interface
decorator rest.ObjectFunc
cancel context.CancelFunc
resultCh chan watch.Event
}
func newDecoratedWatcher(w watch.Interface, decorator rest.ObjectFunc) *decoratedWatcher {
ctx, cancel := context.WithCancel(context.Background())
d := &decoratedWatcher{
w: w,
decorator: decorator,
cancel: cancel,
resultCh: make(chan watch.Event),
}
go d.run(ctx)
return d
}
func (d *decoratedWatcher) run(ctx context.Context) {
var recv, send watch.Event
for {
select {
case recv = <-d.w.ResultChan():
switch recv.Type {
case watch.Added, watch.Modified, watch.Deleted:
err := d.decorator(recv.Object)
if err != nil {
send = makeStatusErrorEvent(err)
break
}
send = recv
case watch.Error:
send = recv
}
select {
case d.resultCh <- send:
if send.Type == watch.Error {
d.cancel()
}
case <-ctx.Done():
}
case <-ctx.Done():
d.w.Stop()
close(d.resultCh)
return
}
}
}
func (d *decoratedWatcher) Stop() {
d.cancel()
}
func (d *decoratedWatcher) ResultChan() <-chan watch.Event {
return d.resultCh
}
func makeStatusErrorEvent(err error) watch.Event {
status := &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Code: http.StatusInternalServerError,
Reason: unversioned.StatusReasonInternalError,
}
return watch.Event{
Type: watch.Error,
Object: status,
}
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
func TestDecoratedWatcher(t *testing.T) {
w := watch.NewFake()
decorator := func(obj runtime.Object) error {
pod := obj.(*api.Pod)
pod.Annotations = map[string]string{"decorated": "true"}
return nil
}
dw := newDecoratedWatcher(w, decorator)
defer dw.Stop()
go w.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
select {
case e := <-dw.ResultChan():
pod, ok := e.Object.(*api.Pod)
if !ok {
t.Errorf("Should received object of type *api.Pod, get type (%T)", e.Object)
return
}
if pod.Annotations["decorated"] != "true" {
t.Errorf("pod.Annotations[\"decorated\"], want=%s, get=%s", "true", pod.Labels["decorated"])
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
}
}
func TestDecoratedWatcherError(t *testing.T) {
w := watch.NewFake()
expErr := fmt.Errorf("expected error")
decorator := func(obj runtime.Object) error {
return expErr
}
dw := newDecoratedWatcher(w, decorator)
defer dw.Stop()
go w.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
select {
case e := <-dw.ResultChan():
if e.Type != watch.Error {
t.Errorf("event type want=%v, get=%v", watch.Error, e.Type)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timeout after %v", wait.ForeverTestTimeout)
}
}

View File

@ -94,11 +94,11 @@ type Store struct {
// DeleteCollection call.
DeleteCollectionWorkers int
// Called on all objects returned from the underlying store, after
// the exit hooks are invoked. Decorators are intended for integrations
// that are above storage and should only be used for specific cases where
// storage of the value is not appropriate, since they cannot
// be watched.
// Decorator is called as exit hook on object returned from the underlying storage.
// The returned object could be individual object (e.g. Pod) or the list type (e.g. PodList).
// Decorator is intended for integrations that are above storage and
// should only be used for specific cases where storage of the value is
// not appropriate, since they cannot be watched.
Decorator rest.ObjectFunc
// Allows extended behavior during creation, required
CreateStrategy rest.RESTCreateStrategy
@ -184,7 +184,16 @@ func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object,
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
return e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
if err != nil {
return nil, err
}
if e.Decorator != nil {
if err := e.Decorator(out); err != nil {
return nil, err
}
}
return out, nil
}
// ListPredicate returns a list of all the items matching m.
@ -839,12 +848,26 @@ func (e *Store) WatchPredicate(ctx api.Context, m *generic.SelectionPredicate, r
if err != nil {
return nil, err
}
return e.Storage.Watch(ctx, key, resourceVersion, filter)
w, err := e.Storage.Watch(ctx, key, resourceVersion, filter)
if err != nil {
return nil, err
}
if e.Decorator != nil {
return newDecoratedWatcher(w, e.Decorator), nil
}
return w, nil
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filter)
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filter)
if err != nil {
return nil, err
}
if e.Decorator != nil {
return newDecoratedWatcher(w, e.Decorator), nil
}
return w, nil
}
func (e *Store) createFilter(m *generic.SelectionPredicate) storage.Filter {
@ -854,12 +877,6 @@ func (e *Store) createFilter(m *generic.SelectionPredicate) storage.Filter {
glog.Errorf("unable to match watch: %v", err)
return false
}
if matches && e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
glog.Errorf("unable to decorate watch: %v", err)
return false
}
}
return matches
}
return storage.NewSimpleFilter(filterFunc, m.MatcherIndex)