Switch List/Watch to ListPredicate/WatchPredicate

This commit is contained in:
Clayton Coleman 2015-03-03 21:14:15 -05:00
parent 576bbb565e
commit a52b0f2619
10 changed files with 51 additions and 26 deletions

View File

@ -131,13 +131,13 @@ func (rs *REST) getAttrs(obj runtime.Object) (objLabels, objFields labels.Set, e
} }
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return rs.registry.List(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}) return rs.registry.ListPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs})
} }
// Watch returns Events events via a watch.Interface. // Watch returns Events events via a watch.Interface.
// It implements apiserver.ResourceWatcher. // It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) return rs.registry.WatchPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion)
} }
// New returns a new api.Event // New returns a new api.Event

View File

@ -23,6 +23,7 @@ import (
kubeerr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" kubeerr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
@ -70,6 +71,9 @@ type Etcd struct {
// is an operation against an existing object. // is an operation against an existing object.
TTLFunc func(obj runtime.Object, update bool) (uint64, error) TTLFunc func(obj runtime.Object, update bool) (uint64, error)
// Returns a matcher corresponding to the provided labels and fields.
PredicateFunc func(label, field labels.Selector) generic.Matcher
// Called on all objects returned from the underlying store, after // Called on all objects returned from the underlying store, after
// the exit hooks are invoked. Decorators are intended for integrations // the exit hooks are invoked. Decorators are intended for integrations
// that are above etcd and should only be used for specific cases where // that are above etcd and should only be used for specific cases where
@ -119,10 +123,23 @@ func NamespaceKeyFunc(ctx api.Context, prefix string, name string) (string, erro
return key, nil return key, nil
} }
// List returns a list of all the items matching m. // New implements RESTStorage
// TODO: rename this to ListPredicate, take the default predicate function on the constructor, and func (e *Etcd) New() runtime.Object {
// introduce a List method that uses the default predicate function return e.NewFunc()
func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) { }
// NewList implements RESTLister
func (e *Etcd) NewList() runtime.Object {
return e.NewListFunc()
}
// List returns a list of items matching labels and field
func (e *Etcd) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return e.ListPredicate(ctx, e.PredicateFunc(label, field))
}
// ListPredicate returns a list of all the items matching m.
func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object, error) {
list := e.NewListFunc() list := e.NewListFunc()
err := e.Helper.ExtractToList(e.KeyRootFunc(ctx), list) err := e.Helper.ExtractToList(e.KeyRootFunc(ctx), list)
if err != nil { if err != nil {
@ -339,11 +356,15 @@ func (e *Etcd) Delete(ctx api.Context, name string) (runtime.Object, error) {
return &api.Status{Status: api.StatusSuccess}, nil return &api.Status{Status: api.StatusSuccess}, nil
} }
// Watch starts a watch for the items that m matches. // WatchPredicate starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list. // TODO: Detect if m references a single object instead of a list.
// TODO: rename this to WatchPredicate, take the default predicate function on the constructor, and func (e *Etcd) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
// introduce a Watch method that uses the default predicate function return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { }
// WatchPredicate starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list.
func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName) version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -172,7 +172,7 @@ func TestEtcdList(t *testing.T) {
for name, item := range table { for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t) fakeClient, registry := NewTestGenericEtcdRegistry(t)
fakeClient.Data[registry.KeyRootFunc(api.NewContext())] = item.in fakeClient.Data[registry.KeyRootFunc(api.NewContext())] = item.in
list, err := registry.List(api.NewContext(), item.m) list, err := registry.ListPredicate(api.NewContext(), item.m)
if e, a := item.succeed, err == nil; e != a { if e, a := item.succeed, err == nil; e != a {
t.Errorf("%v: expected %v, got %v", name, e, a) t.Errorf("%v: expected %v, got %v", name, e, a)
continue continue
@ -660,7 +660,7 @@ func TestEtcdWatch(t *testing.T) {
} }
fakeClient, registry := NewTestGenericEtcdRegistry(t) fakeClient, registry := NewTestGenericEtcdRegistry(t)
wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, "1") wi, err := registry.WatchPredicate(api.NewContext(), EverythingMatcher{}, "1")
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@ -75,12 +75,12 @@ type DecoratorFunc func(obj runtime.Object) error
// layer. // layer.
// DEPRECATED: replace with direct implementation of RESTStorage // DEPRECATED: replace with direct implementation of RESTStorage
type Registry interface { type Registry interface {
List(api.Context, Matcher) (runtime.Object, error) ListPredicate(api.Context, Matcher) (runtime.Object, error)
CreateWithName(ctx api.Context, id string, obj runtime.Object) error CreateWithName(ctx api.Context, id string, obj runtime.Object) error
UpdateWithName(ctx api.Context, id string, obj runtime.Object) error UpdateWithName(ctx api.Context, id string, obj runtime.Object) error
Get(ctx api.Context, id string) (runtime.Object, error) Get(ctx api.Context, id string) (runtime.Object, error)
Delete(ctx api.Context, id string) (runtime.Object, error) Delete(ctx api.Context, id string) (runtime.Object, error)
Watch(ctx api.Context, m Matcher, resourceVersion string) (watch.Interface, error) WatchPredicate(ctx api.Context, m Matcher, resourceVersion string) (watch.Interface, error)
} }
// FilterList filters any list object that conforms to the api conventions, // FilterList filters any list object that conforms to the api conventions,

View File

@ -135,11 +135,11 @@ func (rs *REST) getAttrs(obj runtime.Object) (objLabels, objFields labels.Set, e
} }
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return rs.registry.List(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}) return rs.registry.ListPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs})
} }
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) return rs.registry.WatchPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion)
} }
// New returns a new api.LimitRange // New returns a new api.LimitRange

View File

@ -109,11 +109,11 @@ func (rs *REST) getAttrs(obj runtime.Object) (objLabels, objFields labels.Set, e
} }
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return rs.registry.List(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}) return rs.registry.ListPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs})
} }
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) return rs.registry.WatchPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion)
} }
// New returns a new api.Namespace // New returns a new api.Namespace

View File

@ -24,6 +24,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/constraint" "github.com/GoogleCloudPlatform/kubernetes/pkg/constraint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@ -53,6 +54,9 @@ func NewREST(h tools.EtcdHelper, factory pod.BoundPodFactory) (*REST, *BindingRE
ObjectNameFunc: func(obj runtime.Object) (string, error) { ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*api.Pod).Name, nil return obj.(*api.Pod).Name, nil
}, },
PredicateFunc: func(label, field labels.Selector) generic.Matcher {
return pod.MatchPod(label, field)
},
EndpointName: "pods", EndpointName: "pods",
Helper: h, Helper: h,
@ -92,12 +96,12 @@ func (r *REST) NewList() runtime.Object {
// List obtains a list of pods with labels that match selector. // List obtains a list of pods with labels that match selector.
func (r *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { func (r *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return r.store.List(ctx, pod.MatchPod(label, field)) return r.store.List(ctx, label, field)
} }
// Watch begins watching for new, changed, or deleted pods. // Watch begins watching for new, changed, or deleted pods.
func (r *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { func (r *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return r.store.Watch(ctx, pod.MatchPod(label, field), resourceVersion) return r.store.Watch(ctx, label, field, resourceVersion)
} }
// Get gets a specific pod specified by its ID. // Get gets a specific pod specified by its ID.

View File

@ -42,7 +42,7 @@ func NewGeneric(list runtime.Object) *GenericRegistry {
} }
} }
func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) { func (r *GenericRegistry) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object, error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
if r.Err != nil { if r.Err != nil {
@ -51,7 +51,7 @@ func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Obje
return generic.FilterList(r.ObjectList, m, nil) return generic.FilterList(r.ObjectList, m, nil)
} }
func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { func (r *GenericRegistry) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :( // TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.Broadcaster.Watch(), nil return r.Broadcaster.Watch(), nil
} }

View File

@ -138,11 +138,11 @@ func (rs *REST) getAttrs(obj runtime.Object) (objLabels, objFields labels.Set, e
} }
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return rs.registry.List(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}) return rs.registry.ListPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs})
} }
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) return rs.registry.WatchPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion)
} }
// New returns a new api.ResourceQuota // New returns a new api.ResourceQuota

View File

@ -145,11 +145,11 @@ func (rs *REST) getAttrs(obj runtime.Object) (objLabels, objFields labels.Set, e
} }
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
return rs.registry.List(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}) return rs.registry.ListPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs})
} }
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) return rs.registry.WatchPredicate(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion)
} }
// New returns a new api.Secret // New returns a new api.Secret