mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 13:02:14 +00:00
remove FilterFunc and use SelectionPredicate everywhere
This commit is contained in:
parent
b18d86d5cc
commit
3ae7bdd211
@ -682,12 +682,16 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
|
|||||||
}
|
}
|
||||||
|
|
||||||
func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool {
|
func filterFunction(key string, p SelectionPredicate) func(string, runtime.Object) bool {
|
||||||
f := SimpleFilter(p)
|
|
||||||
filterFunc := func(objKey string, obj runtime.Object) bool {
|
filterFunc := func(objKey string, obj runtime.Object) bool {
|
||||||
if !hasPathPrefix(objKey, key) {
|
if !hasPathPrefix(objKey, key) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return f(obj)
|
matches, err := p.Matches(obj)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return matches
|
||||||
}
|
}
|
||||||
return filterFunc
|
return filterFunc
|
||||||
}
|
}
|
||||||
|
@ -240,7 +240,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
key = path.Join(h.pathPrefix, key)
|
key = path.Join(h.pathPrefix, key)
|
||||||
w := newEtcdWatcher(false, h.quorum, nil, storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h)
|
w := newEtcdWatcher(false, h.quorum, nil, pred, h.codec, h.versioner, nil, h.transformer, h)
|
||||||
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
@ -255,7 +255,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
key = path.Join(h.pathPrefix, key)
|
key = path.Join(h.pathPrefix, key)
|
||||||
w := newEtcdWatcher(true, h.quorum, exceptKey(key), storage.SimpleFilter(pred), h.codec, h.versioner, nil, h.transformer, h)
|
w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred, h.codec, h.versioner, nil, h.transformer, h)
|
||||||
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
@ -359,7 +359,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion
|
|||||||
nodes := make([]*etcd.Node, 0)
|
nodes := make([]*etcd.Node, 0)
|
||||||
nodes = append(nodes, response.Node)
|
nodes = append(nodes, response.Node)
|
||||||
|
|
||||||
if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil {
|
if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
trace.Step("Object decoded")
|
trace.Step("Object decoded")
|
||||||
@ -370,7 +370,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion
|
|||||||
}
|
}
|
||||||
|
|
||||||
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
|
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
|
||||||
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error {
|
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, pred storage.SelectionPredicate, slicePtr interface{}) error {
|
||||||
trace := utiltrace.New("decodeNodeList " + getTypeName(slicePtr))
|
trace := utiltrace.New("decodeNodeList " + getTypeName(slicePtr))
|
||||||
defer trace.LogIfLong(400 * time.Millisecond)
|
defer trace.LogIfLong(400 * time.Millisecond)
|
||||||
v, err := conversion.EnforcePtr(slicePtr)
|
v, err := conversion.EnforcePtr(slicePtr)
|
||||||
@ -383,13 +383,13 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
|
|||||||
// IMPORTANT: do not log each key as a discrete step in the trace log
|
// IMPORTANT: do not log each key as a discrete step in the trace log
|
||||||
// as it produces an immense amount of log spam when there is a large
|
// as it produces an immense amount of log spam when there is a large
|
||||||
// amount of content in the list.
|
// amount of content in the list.
|
||||||
if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil {
|
if err := h.decodeNodeList(node.Nodes, pred, slicePtr); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if obj, found := h.getFromCache(node.ModifiedIndex, filter); found {
|
if obj, found := h.getFromCache(node.ModifiedIndex, pred); found {
|
||||||
// obj != nil iff it matches the filter function.
|
// obj != nil iff it matches the pred function.
|
||||||
if obj != nil {
|
if obj != nil {
|
||||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||||
}
|
}
|
||||||
@ -407,7 +407,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
|
|||||||
}
|
}
|
||||||
// being unable to set the version does not prevent the object from being extracted
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
_ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
|
_ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
|
||||||
if filter(obj) {
|
if matched, err := pred.Matches(obj); err == nil && matched {
|
||||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||||
}
|
}
|
||||||
if node.ModifiedIndex != 0 {
|
if node.ModifiedIndex != 0 {
|
||||||
@ -439,7 +439,7 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := h.decodeNodeList(nodes, storage.SimpleFilter(pred), listPtr); err != nil {
|
if err := h.decodeNodeList(nodes, pred, listPtr); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
trace.Step("Node list decoded")
|
trace.Step("Node list decoded")
|
||||||
@ -590,7 +590,7 @@ func (h *etcdHelper) GuaranteedUpdate(
|
|||||||
// their Node.ModifiedIndex, which is unique across all types.
|
// their Node.ModifiedIndex, which is unique across all types.
|
||||||
// All implementations must be thread-safe.
|
// All implementations must be thread-safe.
|
||||||
type etcdCache interface {
|
type etcdCache interface {
|
||||||
getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool)
|
getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool)
|
||||||
addToCache(index uint64, obj runtime.Object)
|
addToCache(index uint64, obj runtime.Object)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -598,14 +598,14 @@ func getTypeName(obj interface{}) string {
|
|||||||
return reflect.TypeOf(obj).String()
|
return reflect.TypeOf(obj).String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *etcdHelper) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) {
|
func (h *etcdHelper) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
metrics.ObserveGetCache(startTime)
|
metrics.ObserveGetCache(startTime)
|
||||||
}()
|
}()
|
||||||
obj, found := h.cache.Get(index)
|
obj, found := h.cache.Get(index)
|
||||||
if found {
|
if found {
|
||||||
if !filter(obj.(runtime.Object)) {
|
if matched, err := pred.Matches(obj.(runtime.Object)); err != nil || !matched {
|
||||||
return nil, true
|
return nil, true
|
||||||
}
|
}
|
||||||
// We should not return the object itself to avoid polluting the cache if someone
|
// We should not return the object itself to avoid polluting the cache if someone
|
||||||
|
@ -78,7 +78,7 @@ type etcdWatcher struct {
|
|||||||
list bool // If we're doing a recursive watch, should be true.
|
list bool // If we're doing a recursive watch, should be true.
|
||||||
quorum bool // If we enable quorum, shoule be true
|
quorum bool // If we enable quorum, shoule be true
|
||||||
include includeFunc
|
include includeFunc
|
||||||
filter storage.FilterFunc
|
pred storage.SelectionPredicate
|
||||||
|
|
||||||
etcdIncoming chan *etcd.Response
|
etcdIncoming chan *etcd.Response
|
||||||
etcdError chan error
|
etcdError chan error
|
||||||
@ -105,11 +105,9 @@ const watchWaitDuration = 100 * time.Millisecond
|
|||||||
|
|
||||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
|
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
|
||||||
// The versioner must be able to handle the objects that transform creates.
|
// The versioner must be able to handle the objects that transform creates.
|
||||||
func newEtcdWatcher(
|
func newEtcdWatcher(list bool, quorum bool, include includeFunc, pred storage.SelectionPredicate,
|
||||||
list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
|
|
||||||
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
|
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
|
||||||
valueTransformer ValueTransformer,
|
valueTransformer ValueTransformer, cache etcdCache) *etcdWatcher {
|
||||||
cache etcdCache) *etcdWatcher {
|
|
||||||
w := &etcdWatcher{
|
w := &etcdWatcher{
|
||||||
encoding: encoding,
|
encoding: encoding,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
@ -119,7 +117,7 @@ func newEtcdWatcher(
|
|||||||
list: list,
|
list: list,
|
||||||
quorum: quorum,
|
quorum: quorum,
|
||||||
include: include,
|
include: include,
|
||||||
filter: filter,
|
pred: pred,
|
||||||
// Buffer this channel, so that the etcd client is not forced
|
// Buffer this channel, so that the etcd client is not forced
|
||||||
// to context switch with every object it gets, and so that a
|
// to context switch with every object it gets, and so that a
|
||||||
// long time spent decoding an object won't block the *next*
|
// long time spent decoding an object won't block the *next*
|
||||||
@ -315,7 +313,7 @@ func (w *etcdWatcher) translate() {
|
|||||||
|
|
||||||
// decodeObject extracts an object from the provided etcd node or returns an error.
|
// decodeObject extracts an object from the provided etcd node or returns an error.
|
||||||
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
||||||
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.SimpleFilter(storage.Everything)); found {
|
if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -365,7 +363,7 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
|
|||||||
// the resourceVersion to resume will never be able to get past a bad value.
|
// the resourceVersion to resume will never be able to get past a bad value.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !w.filter(obj) {
|
if matched, err := w.pred.Matches(obj); err != nil || !matched {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
action := watch.Added
|
action := watch.Added
|
||||||
@ -391,7 +389,10 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
|||||||
// the resourceVersion to resume will never be able to get past a bad value.
|
// the resourceVersion to resume will never be able to get past a bad value.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
curObjPasses := w.filter(curObj)
|
curObjPasses := false
|
||||||
|
if matched, err := w.pred.Matches(curObj); err == nil && matched {
|
||||||
|
curObjPasses = true
|
||||||
|
}
|
||||||
oldObjPasses := false
|
oldObjPasses := false
|
||||||
var oldObj runtime.Object
|
var oldObj runtime.Object
|
||||||
if res.PrevNode != nil && res.PrevNode.Value != "" {
|
if res.PrevNode != nil && res.PrevNode.Value != "" {
|
||||||
@ -400,10 +401,12 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
|||||||
if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
|
if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
|
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
|
||||||
}
|
}
|
||||||
oldObjPasses = w.filter(oldObj)
|
if matched, err := w.pred.Matches(oldObj); err == nil && matched {
|
||||||
|
oldObjPasses = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Some changes to an object may cause it to start or stop matching a filter.
|
}
|
||||||
|
// Some changes to an object may cause it to start or stop matching a pred.
|
||||||
// We need to report those as adds/deletes. So we have to check both the previous
|
// We need to report those as adds/deletes. So we have to check both the previous
|
||||||
// and current value of the object.
|
// and current value of the object.
|
||||||
switch {
|
switch {
|
||||||
@ -423,7 +426,7 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
|||||||
Object: oldObj,
|
Object: oldObj,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// Do nothing if neither new nor old object passed the filter.
|
// Do nothing if neither new nor old object passed the pred.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
||||||
@ -449,7 +452,7 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
|||||||
// the resourceVersion to resume will never be able to get past a bad value.
|
// the resourceVersion to resume will never be able to get past a bad value.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !w.filter(obj) {
|
if matched, err := w.pred.Matches(obj); err != nil || !matched {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.emit(watch.Event{
|
w.emit(watch.Event{
|
||||||
|
@ -23,6 +23,8 @@ import (
|
|||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
apitesting "k8s.io/apimachinery/pkg/api/testing"
|
apitesting "k8s.io/apimachinery/pkg/api/testing"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
@ -41,7 +43,7 @@ var versioner = APIObjectVersioner{}
|
|||||||
// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
|
// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
|
||||||
type fakeEtcdCache struct{}
|
type fakeEtcdCache struct{}
|
||||||
|
|
||||||
func (f *fakeEtcdCache) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) {
|
func (f *fakeEtcdCache) getFromCache(index uint64, pred storage.SelectionPredicate) (runtime.Object, bool) {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,7 +60,7 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
|
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
|
||||||
podBaz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}}
|
podBaz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}}
|
||||||
|
|
||||||
// All of these test cases will be run with the firstLetterIsB Filter.
|
// All of these test cases will be run with the firstLetterIsB SelectionPredicate.
|
||||||
table := map[string]struct {
|
table := map[string]struct {
|
||||||
actions []string // Run this test item for every action here.
|
actions []string // Run this test item for every action here.
|
||||||
prevNodeValue string
|
prevNodeValue string
|
||||||
@ -128,8 +130,21 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
expectEmit: false,
|
expectEmit: false,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
firstLetterIsB := func(obj runtime.Object) bool {
|
|
||||||
return obj.(*example.Pod).Name[0] == 'b'
|
// Should use fieldSelector here.
|
||||||
|
// But for the sake of tests (simplifying the codes), use labelSelector to support set-based requirements
|
||||||
|
selector, err := labels.Parse("metadata.name in (bar, baz)")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
firstLetterIsB := storage.SelectionPredicate{
|
||||||
|
Label: selector,
|
||||||
|
Field: fields.Everything(),
|
||||||
|
IncludeUninitialized: true,
|
||||||
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return labels.Set{"metadata.name": pod.Name}, nil, pod.Initializers != nil, nil
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for name, item := range table {
|
for name, item := range table {
|
||||||
for _, action := range item.actions {
|
for _, action := range item.actions {
|
||||||
@ -173,7 +188,7 @@ func TestWatchInterpretations(t *testing.T) {
|
|||||||
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
|
||||||
_, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
||||||
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -189,7 +204,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
|
|||||||
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
||||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||||
for _, action := range actions {
|
for _, action := range actions {
|
||||||
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -205,7 +220,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
|||||||
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
|
||||||
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
actions := []string{"create", "set", "compareAndSwap", "delete"}
|
||||||
for _, action := range actions {
|
for _, action := range actions {
|
||||||
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
t.Errorf("Unexpected emit: %v", e)
|
t.Errorf("Unexpected emit: %v", e)
|
||||||
}
|
}
|
||||||
@ -228,10 +243,17 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
|
|||||||
func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
|
func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
|
||||||
_, codecs := testScheme(t)
|
_, codecs := testScheme(t)
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
filter := func(obj runtime.Object) bool {
|
selector, _ := fields.ParseSelector("metadata.name!=bar")
|
||||||
return obj.(*example.Pod).Name != "bar"
|
pred := storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: selector,
|
||||||
|
IncludeUninitialized: true,
|
||||||
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||||
|
},
|
||||||
}
|
}
|
||||||
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
w := newEtcdWatcher(false, false, nil, pred, codec, versioner, nil, prefixTransformer{prefix: "test!"}, &fakeEtcdCache{})
|
||||||
|
|
||||||
eventChan := make(chan watch.Event, 1)
|
eventChan := make(chan watch.Event, 1)
|
||||||
w.emit = func(e watch.Event) {
|
w.emit = func(e watch.Event) {
|
||||||
|
@ -403,7 +403,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
|
|||||||
if err != nil || v.Kind() != reflect.Slice {
|
if err != nil || v.Kind() != reflect.Slice {
|
||||||
panic("need ptr to slice")
|
panic("need ptr to slice")
|
||||||
}
|
}
|
||||||
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), storage.SimpleFilter(pred), s.codec, s.versioner); err != nil {
|
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// update version with cluster level revision
|
// update version with cluster level revision
|
||||||
@ -492,8 +492,6 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
|||||||
}
|
}
|
||||||
keyPrefix := key
|
keyPrefix := key
|
||||||
|
|
||||||
filter := storage.SimpleFilter(pred)
|
|
||||||
|
|
||||||
// set the appropriate clientv3 options to filter the returned data set
|
// set the appropriate clientv3 options to filter the returned data set
|
||||||
var paging bool
|
var paging bool
|
||||||
options := make([]clientv3.OpOption, 0, 4)
|
options := make([]clientv3.OpOption, 0, 4)
|
||||||
@ -587,7 +585,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := appendListItem(v, data, uint64(kv.ModRevision), filter, s.codec, s.versioner); err != nil {
|
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -774,14 +772,14 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
|
|||||||
}
|
}
|
||||||
|
|
||||||
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
|
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
|
||||||
func appendListItem(v reflect.Value, data []byte, rev uint64, filter storage.FilterFunc, codec runtime.Codec, versioner storage.Versioner) error {
|
func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner) error {
|
||||||
obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
|
obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// being unable to set the version does not prevent the object from being extracted
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
versioner.UpdateObject(obj, rev)
|
versioner.UpdateObject(obj, rev)
|
||||||
if filter(obj) {
|
if matched, err := pred.Matches(obj); err == nil && matched {
|
||||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -72,7 +72,7 @@ type watchChan struct {
|
|||||||
key string
|
key string
|
||||||
initialRev int64
|
initialRev int64
|
||||||
recursive bool
|
recursive bool
|
||||||
internalFilter storage.FilterFunc
|
internalPred storage.SelectionPredicate
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
incomingEventChan chan *event
|
incomingEventChan chan *event
|
||||||
@ -111,14 +111,14 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
|
|||||||
key: key,
|
key: key,
|
||||||
initialRev: rev,
|
initialRev: rev,
|
||||||
recursive: recursive,
|
recursive: recursive,
|
||||||
internalFilter: storage.SimpleFilter(pred),
|
internalPred: pred,
|
||||||
incomingEventChan: make(chan *event, incomingBufSize),
|
incomingEventChan: make(chan *event, incomingBufSize),
|
||||||
resultChan: make(chan watch.Event, outgoingBufSize),
|
resultChan: make(chan watch.Event, outgoingBufSize),
|
||||||
errChan: make(chan error, 1),
|
errChan: make(chan error, 1),
|
||||||
}
|
}
|
||||||
if pred.Empty() {
|
if pred.Empty() {
|
||||||
// The filter doesn't filter out any object.
|
// The filter doesn't filter out any object.
|
||||||
wc.internalFilter = nil
|
wc.internalPred = storage.Everything
|
||||||
}
|
}
|
||||||
wc.ctx, wc.cancel = context.WithCancel(ctx)
|
wc.ctx, wc.cancel = context.WithCancel(ctx)
|
||||||
return wc
|
return wc
|
||||||
@ -250,14 +250,15 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wc *watchChan) filter(obj runtime.Object) bool {
|
func (wc *watchChan) filter(obj runtime.Object) bool {
|
||||||
if wc.internalFilter == nil {
|
if wc.internalPred.Empty() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return wc.internalFilter(obj)
|
matched, err := wc.internalPred.Matches(obj)
|
||||||
|
return err == nil && matched
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wc *watchChan) acceptAll() bool {
|
func (wc *watchChan) acceptAll() bool {
|
||||||
return wc.internalFilter == nil
|
return wc.internalPred.Empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
// transform transforms an event into a result for user if not filtered.
|
// transform transforms an event into a result for user if not filtered.
|
||||||
|
@ -70,10 +70,6 @@ type MatchValue struct {
|
|||||||
// to that function.
|
// to that function.
|
||||||
type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
|
type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
|
||||||
|
|
||||||
// FilterFunc takes an API object and returns true if the object satisfies some requirements.
|
|
||||||
// TODO: We will remove this type and use SelectionPredicate everywhere.
|
|
||||||
type FilterFunc func(obj runtime.Object) bool
|
|
||||||
|
|
||||||
// Everything accepts all objects.
|
// Everything accepts all objects.
|
||||||
var Everything = SelectionPredicate{
|
var Everything = SelectionPredicate{
|
||||||
Label: labels.Everything(),
|
Label: labels.Everything(),
|
||||||
|
@ -96,7 +96,7 @@ func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) {
|
|||||||
}
|
}
|
||||||
matched := s.Label.Matches(labels)
|
matched := s.Label.Matches(labels)
|
||||||
if matched && s.Field != nil {
|
if matched && s.Field != nil {
|
||||||
matched = (matched && s.Field.Matches(fields))
|
matched = matched && s.Field.Matches(fields)
|
||||||
}
|
}
|
||||||
return matched, nil
|
return matched, nil
|
||||||
}
|
}
|
||||||
|
@ -22,8 +22,6 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
"k8s.io/apimachinery/pkg/api/validation/path"
|
"k8s.io/apimachinery/pkg/api/validation/path"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
@ -40,19 +38,6 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SimpleFilter converts a selection predicate into a FilterFunc.
|
|
||||||
// It ignores any error from Matches().
|
|
||||||
func SimpleFilter(p SelectionPredicate) FilterFunc {
|
|
||||||
return func(obj runtime.Object) bool {
|
|
||||||
matches, err := p.Matches(obj)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("invalid object for matching. Obj: %v. Err: %v", obj, err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return matches
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func EverythingFunc(runtime.Object) bool {
|
func EverythingFunc(runtime.Object) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user