mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Merge pull request #13734 from wojtek-t/filter_in_storage
Auto commit by PR queue bot
This commit is contained in:
commit
647288cde1
@ -150,6 +150,7 @@ func (e *Etcd) List(ctx api.Context, label labels.Selector, field fields.Selecto
|
|||||||
func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object, error) {
|
func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object, error) {
|
||||||
list := e.NewListFunc()
|
list := e.NewListFunc()
|
||||||
trace := util.NewTrace("List " + reflect.TypeOf(list).String())
|
trace := util.NewTrace("List " + reflect.TypeOf(list).String())
|
||||||
|
filterFunc := e.filterAndDecorateFunction(m)
|
||||||
defer trace.LogIfLong(600 * time.Millisecond)
|
defer trace.LogIfLong(600 * time.Millisecond)
|
||||||
if name, ok := m.MatchesSingle(); ok {
|
if name, ok := m.MatchesSingle(); ok {
|
||||||
trace.Step("About to read single object")
|
trace.Step("About to read single object")
|
||||||
@ -157,21 +158,20 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher) (runtime.Object
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = e.Storage.GetToList(key, list)
|
err = e.Storage.GetToList(key, filterFunc, list)
|
||||||
trace.Step("Object extracted")
|
trace.Step("Object extracted")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
trace.Step("About to list directory")
|
trace.Step("About to list directory")
|
||||||
err := e.Storage.List(e.KeyRootFunc(ctx), list)
|
err := e.Storage.List(e.KeyRootFunc(ctx), filterFunc, list)
|
||||||
trace.Step("List extracted")
|
trace.Step("List extracted")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
defer trace.Step("List filtered")
|
return list, nil
|
||||||
return generic.FilterList(list, m, generic.DecoratorFunc(e.Decorator))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create inserts a new item according to the unique key from the object.
|
// Create inserts a new item according to the unique key from the object.
|
||||||
@ -449,8 +449,21 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
filterFunc := e.filterAndDecorateFunction(m)
|
||||||
|
|
||||||
filterFunc := func(obj runtime.Object) bool {
|
if name, ok := m.MatchesSingle(); ok {
|
||||||
|
key, err := e.KeyFunc(ctx, name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return e.Storage.Watch(key, version, filterFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.Storage.WatchList(e.KeyRootFunc(ctx), version, filterFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool {
|
||||||
|
return func(obj runtime.Object) bool {
|
||||||
matches, err := m.Matches(obj)
|
matches, err := m.Matches(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("unable to match watch: %v", err)
|
glog.Errorf("unable to match watch: %v", err)
|
||||||
@ -464,16 +477,6 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio
|
|||||||
}
|
}
|
||||||
return matches
|
return matches
|
||||||
}
|
}
|
||||||
|
|
||||||
if name, ok := m.MatchesSingle(); ok {
|
|
||||||
key, err := e.KeyFunc(ctx, name)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return e.Storage.Watch(key, version, filterFunc)
|
|
||||||
}
|
|
||||||
|
|
||||||
return e.Storage.WatchList(e.KeyRootFunc(ctx), version, filterFunc)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// calculateTTL is a helper for retrieving the updated TTL for an object or returning an error
|
// calculateTTL is a helper for retrieving the updated TTL for an object or returning an error
|
||||||
|
@ -118,38 +118,3 @@ var (
|
|||||||
_ = Matcher(&SelectionPredicate{})
|
_ = Matcher(&SelectionPredicate{})
|
||||||
_ = Matcher(matcherFunc(nil))
|
_ = Matcher(matcherFunc(nil))
|
||||||
)
|
)
|
||||||
|
|
||||||
// DecoratorFunc can mutate the provided object prior to being returned.
|
|
||||||
type DecoratorFunc func(obj runtime.Object) error
|
|
||||||
|
|
||||||
// FilterList filters any list object that conforms to the api conventions,
|
|
||||||
// provided that 'm' works with the concrete type of list. d is an optional
|
|
||||||
// decorator for the returned functions. Only matching items are decorated.
|
|
||||||
func FilterList(list runtime.Object, m Matcher, d DecoratorFunc) (filtered runtime.Object, err error) {
|
|
||||||
// TODO: push a matcher down into tools.etcdHelper to avoid all this
|
|
||||||
// nonsense. This is a lot of unnecessary copies.
|
|
||||||
items, err := runtime.ExtractList(list)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var filteredItems []runtime.Object
|
|
||||||
for _, obj := range items {
|
|
||||||
match, err := m.Matches(obj)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if match {
|
|
||||||
if d != nil {
|
|
||||||
if err := d(obj); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
filteredItems = append(filteredItems, obj)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = runtime.SetList(list, filteredItems)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return list, nil
|
|
||||||
}
|
|
||||||
|
@ -18,7 +18,6 @@ package generic
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"reflect"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
@ -118,44 +117,6 @@ func TestSelectionPredicate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFilterList(t *testing.T) {
|
|
||||||
try := &IgnoredList{
|
|
||||||
Items: []Ignored{
|
|
||||||
{"foo"},
|
|
||||||
{"bar"},
|
|
||||||
{"baz"},
|
|
||||||
{"qux"},
|
|
||||||
{"zot"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
expect := &IgnoredList{
|
|
||||||
Items: []Ignored{
|
|
||||||
{"bar"},
|
|
||||||
{"baz"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
m := MatcherFunc(func(obj runtime.Object) (bool, error) {
|
|
||||||
i, ok := obj.(*Ignored)
|
|
||||||
if !ok {
|
|
||||||
return false, errors.New("wrong type")
|
|
||||||
}
|
|
||||||
return i.ID[0] == 'b', nil
|
|
||||||
})
|
|
||||||
if _, matchesSingleObject := m.MatchesSingle(); matchesSingleObject {
|
|
||||||
t.Errorf("matcher unexpectedly matches only a single object.")
|
|
||||||
}
|
|
||||||
|
|
||||||
got, err := FilterList(try, m, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, a := expect, got; !reflect.DeepEqual(e, a) {
|
|
||||||
t.Errorf("Expected %#v, got %#v", e, a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSingleMatch(t *testing.T) {
|
func TestSingleMatch(t *testing.T) {
|
||||||
m := MatchOnKey("pod-name-here", func(obj runtime.Object) (bool, error) { return true, nil })
|
m := MatchOnKey("pod-name-here", func(obj runtime.Object) (bool, error) { return true, nil })
|
||||||
got, ok := m.MatchesSingle()
|
got, ok := m.MatchesSingle()
|
||||||
|
@ -213,13 +213,13 @@ func (c *Cacher) Get(key string, objPtr runtime.Object, ignoreNotFound bool) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (c *Cacher) GetToList(key string, listObj runtime.Object) error {
|
func (c *Cacher) GetToList(key string, filter FilterFunc, listObj runtime.Object) error {
|
||||||
return c.storage.GetToList(key, listObj)
|
return c.storage.GetToList(key, filter, listObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (c *Cacher) List(key string, listObj runtime.Object) error {
|
func (c *Cacher) List(key string, filter FilterFunc, listObj runtime.Object) error {
|
||||||
return c.storage.List(key, listObj)
|
return c.storage.List(key, filter, listObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListFromMemory implements list operation (the same signature as List method)
|
// ListFromMemory implements list operation (the same signature as List method)
|
||||||
@ -303,7 +303,7 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
|
|||||||
return func(obj runtime.Object) bool {
|
return func(obj runtime.Object) bool {
|
||||||
objKey, err := keyFunc(obj)
|
objKey, err := keyFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Invalid object for filter: %v", obj)
|
glog.Errorf("invalid object for filter: %v", obj)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if !strings.HasPrefix(objKey, key) {
|
if !strings.HasPrefix(objKey, key) {
|
||||||
@ -343,7 +343,7 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun
|
|||||||
// Implements cache.ListerWatcher interface.
|
// Implements cache.ListerWatcher interface.
|
||||||
func (lw *cacherListerWatcher) List() (runtime.Object, error) {
|
func (lw *cacherListerWatcher) List() (runtime.Object, error) {
|
||||||
list := lw.newListFunc()
|
list := lw.newListFunc()
|
||||||
if err := lw.storage.List(lw.resourcePrefix, list); err != nil {
|
if err := lw.storage.List(lw.resourcePrefix, Everything, list); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return list, nil
|
return list, nil
|
||||||
|
@ -243,7 +243,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error {
|
func (h *etcdHelper) GetToList(key string, filter storage.FilterFunc, listObj runtime.Object) error {
|
||||||
trace := util.NewTrace("GetToList " + getTypeName(listObj))
|
trace := util.NewTrace("GetToList " + getTypeName(listObj))
|
||||||
listPtr, err := runtime.GetItemsPtr(listObj)
|
listPtr, err := runtime.GetItemsPtr(listObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -265,7 +265,7 @@ func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error {
|
|||||||
nodes := make([]*etcd.Node, 0)
|
nodes := make([]*etcd.Node, 0)
|
||||||
nodes = append(nodes, response.Node)
|
nodes = append(nodes, response.Node)
|
||||||
|
|
||||||
if err := h.decodeNodeList(nodes, listPtr); err != nil {
|
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
trace.Step("Object decoded")
|
trace.Step("Object decoded")
|
||||||
@ -278,7 +278,7 @@ func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
|
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
|
||||||
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
|
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error {
|
||||||
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
|
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
|
||||||
defer trace.LogIfLong(500 * time.Millisecond)
|
defer trace.LogIfLong(500 * time.Millisecond)
|
||||||
v, err := conversion.EnforcePtr(slicePtr)
|
v, err := conversion.EnforcePtr(slicePtr)
|
||||||
@ -289,14 +289,16 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
|||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
if node.Dir {
|
if node.Dir {
|
||||||
trace.Step("Decoding dir " + node.Key + " START")
|
trace.Step("Decoding dir " + node.Key + " START")
|
||||||
if err := h.decodeNodeList(node.Nodes, slicePtr); err != nil {
|
if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
trace.Step("Decoding dir " + node.Key + " END")
|
trace.Step("Decoding dir " + node.Key + " END")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if obj, found := h.getFromCache(node.ModifiedIndex); found {
|
if obj, found := h.getFromCache(node.ModifiedIndex); found {
|
||||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
if filter(obj) {
|
||||||
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
obj := reflect.New(v.Type().Elem())
|
obj := reflect.New(v.Type().Elem())
|
||||||
if err := h.codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
|
if err := h.codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
|
||||||
@ -306,7 +308,9 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
|||||||
// being unable to set the version does not prevent the object from being extracted
|
// being unable to set the version does not prevent the object from being extracted
|
||||||
_ = h.versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
|
_ = h.versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
|
||||||
}
|
}
|
||||||
v.Set(reflect.Append(v, obj.Elem()))
|
if filter(obj.Interface().(runtime.Object)) {
|
||||||
|
v.Set(reflect.Append(v, obj.Elem()))
|
||||||
|
}
|
||||||
if node.ModifiedIndex != 0 {
|
if node.ModifiedIndex != 0 {
|
||||||
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
|
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
|
||||||
}
|
}
|
||||||
@ -317,7 +321,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (h *etcdHelper) List(key string, listObj runtime.Object) error {
|
func (h *etcdHelper) List(key string, filter storage.FilterFunc, listObj runtime.Object) error {
|
||||||
trace := util.NewTrace("List " + getTypeName(listObj))
|
trace := util.NewTrace("List " + getTypeName(listObj))
|
||||||
defer trace.LogIfLong(time.Second)
|
defer trace.LogIfLong(time.Second)
|
||||||
listPtr, err := runtime.GetItemsPtr(listObj)
|
listPtr, err := runtime.GetItemsPtr(listObj)
|
||||||
@ -333,7 +337,7 @@ func (h *etcdHelper) List(key string, listObj runtime.Object) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := h.decodeNodeList(nodes, listPtr); err != nil {
|
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
trace.Step("Node list decoded")
|
trace.Step("Node list decoded")
|
||||||
|
@ -155,7 +155,69 @@ func TestList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var got api.PodList
|
var got api.PodList
|
||||||
err := helper.List("/some/key", &got)
|
err := helper.List("/some/key", storage.Everything, &got)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %v", err)
|
||||||
|
}
|
||||||
|
if e, a := expect, got; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Expected %#v, got %#v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListFiltered(t *testing.T) {
|
||||||
|
fakeClient := tools.NewFakeEtcdClient(t)
|
||||||
|
helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||||
|
key := etcdtest.AddPrefix("/some/key")
|
||||||
|
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||||
|
R: &etcd.Response{
|
||||||
|
EtcdIndex: 10,
|
||||||
|
Node: &etcd.Node{
|
||||||
|
Dir: true,
|
||||||
|
Nodes: []*etcd.Node{
|
||||||
|
{
|
||||||
|
Key: "/foo",
|
||||||
|
Value: getEncodedPod("foo"),
|
||||||
|
Dir: false,
|
||||||
|
ModifiedIndex: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "/bar",
|
||||||
|
Value: getEncodedPod("bar"),
|
||||||
|
Dir: false,
|
||||||
|
ModifiedIndex: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "/baz",
|
||||||
|
Value: getEncodedPod("baz"),
|
||||||
|
Dir: false,
|
||||||
|
ModifiedIndex: 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
grace := int64(30)
|
||||||
|
expect := api.PodList{
|
||||||
|
ListMeta: api.ListMeta{ResourceVersion: "10"},
|
||||||
|
Items: []api.Pod{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
RestartPolicy: api.RestartPolicyAlways,
|
||||||
|
DNSPolicy: api.DNSClusterFirst,
|
||||||
|
TerminationGracePeriodSeconds: &grace,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
filter := func(obj runtime.Object) bool {
|
||||||
|
pod := obj.(*api.Pod)
|
||||||
|
return pod.Name == "bar"
|
||||||
|
}
|
||||||
|
|
||||||
|
var got api.PodList
|
||||||
|
err := helper.List("/some/key", filter, &got)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %v", err)
|
t.Errorf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -243,7 +305,7 @@ func TestListAcrossDirectories(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var got api.PodList
|
var got api.PodList
|
||||||
err := helper.List("/some/key", &got)
|
err := helper.List("/some/key", storage.Everything, &got)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %v", err)
|
t.Errorf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -318,7 +380,7 @@ func TestListExcludesDirectories(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var got api.PodList
|
var got api.PodList
|
||||||
err := helper.List("/some/key", &got)
|
err := helper.List("/some/key", storage.Everything, &got)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %v", err)
|
t.Errorf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
|
@ -111,11 +111,11 @@ type Interface interface {
|
|||||||
|
|
||||||
// GetToList unmarshals json found at key and opaque it into *List api object
|
// GetToList unmarshals json found at key and opaque it into *List api object
|
||||||
// (an object that satisfies the runtime.IsList definition).
|
// (an object that satisfies the runtime.IsList definition).
|
||||||
GetToList(key string, listObj runtime.Object) error
|
GetToList(key string, filter FilterFunc, listObj runtime.Object) error
|
||||||
|
|
||||||
// List unmarshalls jsons found at directory defined by key and opaque them
|
// List unmarshalls jsons found at directory defined by key and opaque them
|
||||||
// into *List api object (an object that satisfies runtime.IsList definition).
|
// into *List api object (an object that satisfies runtime.IsList definition).
|
||||||
List(key string, listObj runtime.Object) error
|
List(key string, filter FilterFunc, listObj runtime.Object) error
|
||||||
|
|
||||||
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
|
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
|
||||||
// retrying the update until success if there is index conflict.
|
// retrying the update until success if there is index conflict.
|
||||||
|
Loading…
Reference in New Issue
Block a user