pass SelectionPredicate instead of Filter to storage layer

This commit is contained in:
Hongchao Deng
2016-08-22 20:41:21 -07:00
parent 234be5a1d0
commit 6f3ac807fd
56 changed files with 376 additions and 409 deletions

View File

@@ -202,7 +202,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
}
// Implements storage.Interface.
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
@@ -211,13 +211,13 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
return nil, err
}
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}
// Implements storage.Interface.
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
@@ -226,7 +226,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
return nil, err
}
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
return w, nil
}
@@ -297,7 +297,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
}
// Implements storage.Interface.
func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error {
func (h *etcdHelper) GetToList(ctx context.Context, key string, pred storage.SelectionPredicate, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
@@ -326,7 +326,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
nodes := make([]*etcd.Node, 0)
nodes = append(nodes, response.Node)
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil {
return err
}
trace.Step("Object decoded")
@@ -337,7 +337,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
}
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.Filter, slicePtr interface{}) error {
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(400 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
@@ -366,7 +366,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.Filter, s
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
if filter.Filter(obj) {
if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
if node.ModifiedIndex != 0 {
@@ -379,7 +379,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.Filter, s
}
// Implements storage.Interface.
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
@@ -398,7 +398,7 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin
if err != nil {
return err
}
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil {
return err
}
trace.Step("Node list decoded")
@@ -548,7 +548,7 @@ func (h *etcdHelper) prefixEtcdKey(key string) string {
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool)
getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool)
addToCache(index uint64, obj runtime.Object)
}
@@ -556,14 +556,14 @@ func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *etcdHelper) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) {
func (h *etcdHelper) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
metrics.ObserveGetCache(startTime)
}()
obj, found := h.cache.Get(index)
if found {
if !filter.Filter(obj.(runtime.Object)) {
if !filter(obj.(runtime.Object)) {
return nil, true
}
// We should not return the object itself to avoid polluting the cache if someone

View File

@@ -30,6 +30,8 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/storage"
@@ -154,14 +156,17 @@ func TestListFiltered(t *testing.T) {
}
createPodList(t, helper, &list)
filterFunc := func(obj runtime.Object) bool {
pod := obj.(*api.Pod)
return pod.Name == "bar"
// List only "bar" pod
p := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod)
return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, nil
},
}
filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc)
var got api.PodList
err := helper.List(context.TODO(), key, "", filter, &got)
err := helper.List(context.TODO(), key, "", p, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}

View File

@@ -77,7 +77,7 @@ type etcdWatcher struct {
list bool // If we're doing a recursive watch, should be true.
quorum bool // If we enable quorum, shoule be true
include includeFunc
filter storage.Filter
filter storage.FilterFunc
etcdIncoming chan *etcd.Response
etcdError chan error
@@ -105,7 +105,7 @@ const watchWaitDuration = 100 * time.Millisecond
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
// The versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(
list bool, quorum bool, include includeFunc, filter storage.Filter,
list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
@@ -310,7 +310,7 @@ func (w *etcdWatcher) translate() {
}
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found {
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found {
return obj, nil
}
@@ -355,7 +355,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter.Filter(obj) {
if !w.filter(obj) {
return
}
action := watch.Added
@@ -384,7 +384,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
// the resourceVersion to resume will never be able to get past a bad value.
return
}
curObjPasses := w.filter.Filter(curObj)
curObjPasses := w.filter(curObj)
oldObjPasses := false
var oldObj runtime.Object
if res.PrevNode != nil && res.PrevNode.Value != "" {
@@ -393,7 +393,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
}
oldObjPasses = w.filter.Filter(oldObj)
oldObjPasses = w.filter(oldObj)
}
}
// Some changes to an object may cause it to start or stop matching a filter.
@@ -442,7 +442,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
// the resourceVersion to resume will never be able to get past a bad value.
return
}
if !w.filter.Filter(obj) {
if !w.filter(obj) {
return
}
w.emit(watch.Event{

View File

@@ -37,7 +37,7 @@ var versioner = APIObjectVersioner{}
// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
type fakeEtcdCache struct{}
func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) {
func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) {
return nil, false
}
@@ -46,18 +46,6 @@ func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) {
var _ etcdCache = &fakeEtcdCache{}
// firstLetterIsB implements storage.Filter interface.
type firstLetterIsB struct {
}
func (f *firstLetterIsB) Filter(obj runtime.Object) bool {
return obj.(*api.Pod).Name[0] == 'b'
}
func (f *firstLetterIsB) Trigger() []storage.MatchValue {
return nil
}
func TestWatchInterpretations(t *testing.T) {
codec := testapi.Default.Codec()
// Declare some pods to make the test cases compact.
@@ -135,10 +123,12 @@ func TestWatchInterpretations(t *testing.T) {
expectEmit: false,
},
}
firstLetterIsB := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name[0] == 'b'
}
for name, item := range table {
for _, action := range item.actions {
w := newEtcdWatcher(true, false, nil, &firstLetterIsB{}, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
emitCalled := false
w.emit = func(event watch.Event) {
emitCalled = true
@@ -177,7 +167,7 @@ func TestWatchInterpretations(t *testing.T) {
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
_, codec := testScheme(t)
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@@ -192,7 +182,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
_, codec := testScheme(t)
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@@ -207,7 +197,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
_, codec := testScheme(t)
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
@@ -229,10 +219,9 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
codec := testapi.Default.Codec()
filterFunc := func(obj runtime.Object) bool {
filter := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name != "bar"
}
filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc)
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{})
eventChan := make(chan watch.Event, 1)