mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-23 13:47:19 +00:00
Merge pull request #117046 from howardjohn/client/add-indexer-after-start
client-go: allow adding indexes after informer starts Kubernetes-commit: db82260c65df9cedaf93e924a2401b4e55a8a2a0
This commit is contained in:
commit
9434e7539b
4
go.mod
4
go.mod
@ -24,7 +24,7 @@ require (
|
|||||||
golang.org/x/term v0.13.0
|
golang.org/x/term v0.13.0
|
||||||
golang.org/x/time v0.3.0
|
golang.org/x/time v0.3.0
|
||||||
google.golang.org/protobuf v1.31.0
|
google.golang.org/protobuf v1.31.0
|
||||||
k8s.io/api v0.0.0-20231113171418-a95c725cd890
|
k8s.io/api v0.0.0-20231213211702-6a8b8cdcd535
|
||||||
k8s.io/apimachinery v0.0.0-20231113171157-fa98d6eaedb4
|
k8s.io/apimachinery v0.0.0-20231113171157-fa98d6eaedb4
|
||||||
k8s.io/klog/v2 v2.110.1
|
k8s.io/klog/v2 v2.110.1
|
||||||
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00
|
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00
|
||||||
@ -61,6 +61,6 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
k8s.io/api => k8s.io/api v0.0.0-20231113171418-a95c725cd890
|
k8s.io/api => k8s.io/api v0.0.0-20231213211702-6a8b8cdcd535
|
||||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20231113171157-fa98d6eaedb4
|
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20231113171157-fa98d6eaedb4
|
||||||
)
|
)
|
||||||
|
4
go.sum
4
go.sum
@ -157,8 +157,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
|||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
k8s.io/api v0.0.0-20231113171418-a95c725cd890 h1:SHs5i2ucK0WbfIRTL1VEpojQ/mGUVFESrxZH8/8V3/c=
|
k8s.io/api v0.0.0-20231213211702-6a8b8cdcd535 h1:Io0oxmcx72uFM6xMjshxcQFRd9CclDTKhWd+hBsI5JQ=
|
||||||
k8s.io/api v0.0.0-20231113171418-a95c725cd890/go.mod h1:euD11hUzyWUaDInuGcZw8dn3W3PlBwQ7JFbZZjNho5s=
|
k8s.io/api v0.0.0-20231213211702-6a8b8cdcd535/go.mod h1:euD11hUzyWUaDInuGcZw8dn3W3PlBwQ7JFbZZjNho5s=
|
||||||
k8s.io/apimachinery v0.0.0-20231113171157-fa98d6eaedb4 h1:OtyUMcIK9+uygefKf9MzU2XzrWQV8IwrEv6vI3PJ7xw=
|
k8s.io/apimachinery v0.0.0-20231113171157-fa98d6eaedb4 h1:OtyUMcIK9+uygefKf9MzU2XzrWQV8IwrEv6vI3PJ7xw=
|
||||||
k8s.io/apimachinery v0.0.0-20231113171157-fa98d6eaedb4/go.mod h1:eVBxQ/cwiJxH58eK/jd/vAk4mrxmVlnpBH5J2GbMeis=
|
k8s.io/apimachinery v0.0.0-20231113171157-fa98d6eaedb4/go.mod h1:eVBxQ/cwiJxH58eK/jd/vAk4mrxmVlnpBH5J2GbMeis=
|
||||||
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
|
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
|
||||||
|
3
tools/cache/index.go
vendored
3
tools/cache/index.go
vendored
@ -50,8 +50,7 @@ type Indexer interface {
|
|||||||
// GetIndexers return the indexers
|
// GetIndexers return the indexers
|
||||||
GetIndexers() Indexers
|
GetIndexers() Indexers
|
||||||
|
|
||||||
// AddIndexers adds more indexers to this store. If you call this after you already have data
|
// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
|
||||||
// in the store, the results are undefined.
|
|
||||||
AddIndexers(newIndexers Indexers) error
|
AddIndexers(newIndexers Indexers) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
4
tools/cache/shared_informer.go
vendored
4
tools/cache/shared_informer.go
vendored
@ -540,8 +540,8 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
|
|||||||
s.startedLock.Lock()
|
s.startedLock.Lock()
|
||||||
defer s.startedLock.Unlock()
|
defer s.startedLock.Unlock()
|
||||||
|
|
||||||
if s.started {
|
if s.stopped {
|
||||||
return fmt.Errorf("informer has already started")
|
return fmt.Errorf("indexer was not added because it has stopped already")
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.indexer.AddIndexers(indexers)
|
return s.indexer.AddIndexers(indexers)
|
||||||
|
78
tools/cache/shared_informer_test.go
vendored
78
tools/cache/shared_informer_test.go
vendored
@ -26,6 +26,9 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -117,6 +120,81 @@ func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
|
|||||||
return s.processor.getListener(h) != nil
|
return s.processor.getListener(h) != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIndexer(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
// source simulates an apiserver object endpoint.
|
||||||
|
source := fcache.NewFakeControllerSource()
|
||||||
|
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}}
|
||||||
|
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}}
|
||||||
|
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}}
|
||||||
|
source.Add(pod1)
|
||||||
|
source.Add(pod2)
|
||||||
|
|
||||||
|
// create the shared informer and resync every 1s
|
||||||
|
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||||
|
err := informer.AddIndexers(map[string]IndexFunc{
|
||||||
|
"labels": func(obj interface{}) ([]string, error) {
|
||||||
|
res := []string{}
|
||||||
|
for k := range obj.(*v1.Pod).Labels {
|
||||||
|
res = append(res, k)
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
stop := make(chan struct{})
|
||||||
|
defer close(stop)
|
||||||
|
|
||||||
|
go informer.Run(stop)
|
||||||
|
WaitForCacheSync(stop, informer.HasSynced)
|
||||||
|
|
||||||
|
cmpOps := cmpopts.SortSlices(func(a, b any) bool {
|
||||||
|
return a.(*v1.Pod).Name < b.(*v1.Pod).Name
|
||||||
|
})
|
||||||
|
|
||||||
|
// We should be able to lookup by index
|
||||||
|
res, err := informer.GetIndexer().ByIndex("labels", "a")
|
||||||
|
assert.NoError(err)
|
||||||
|
if diff := cmp.Diff([]any{pod1}, res); diff != "" {
|
||||||
|
t.Fatal(diff)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adding an item later is fine as well
|
||||||
|
source.Add(pod3)
|
||||||
|
// Event is async, need to poll
|
||||||
|
assert.Eventually(func() bool {
|
||||||
|
res, _ := informer.GetIndexer().ByIndex("labels", "a")
|
||||||
|
return cmp.Diff([]any{pod1, pod3}, res, cmpOps) == ""
|
||||||
|
}, time.Second*3, time.Millisecond)
|
||||||
|
|
||||||
|
// Adding an index later is also fine
|
||||||
|
err = informer.AddIndexers(map[string]IndexFunc{
|
||||||
|
"labels-again": func(obj interface{}) ([]string, error) {
|
||||||
|
res := []string{}
|
||||||
|
for k := range obj.(*v1.Pod).Labels {
|
||||||
|
res = append(res, k)
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.NoError(err)
|
||||||
|
|
||||||
|
// Should be immediately available
|
||||||
|
res, err = informer.GetIndexer().ByIndex("labels-again", "a")
|
||||||
|
assert.NoError(err)
|
||||||
|
if diff := cmp.Diff([]any{pod1, pod3}, res, cmpOps); diff != "" {
|
||||||
|
t.Fatal(diff)
|
||||||
|
}
|
||||||
|
if got := informer.GetIndexer().ListIndexFuncValues("labels"); !sets.New(got...).Equal(sets.New("a", "b")) {
|
||||||
|
t.Fatalf("got %v", got)
|
||||||
|
}
|
||||||
|
if got := informer.GetIndexer().ListIndexFuncValues("labels-again"); !sets.New(got...).Equal(sets.New("a", "b")) {
|
||||||
|
t.Fatalf("got %v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestListenerResyncPeriods(t *testing.T) {
|
func TestListenerResyncPeriods(t *testing.T) {
|
||||||
// source simulates an apiserver object endpoint.
|
// source simulates an apiserver object endpoint.
|
||||||
source := fcache.NewFakeControllerSource()
|
source := fcache.NewFakeControllerSource()
|
||||||
|
108
tools/cache/thread_safe_store.go
vendored
108
tools/cache/thread_safe_store.go
vendored
@ -52,8 +52,7 @@ type ThreadSafeStore interface {
|
|||||||
ByIndex(indexName, indexedValue string) ([]interface{}, error)
|
ByIndex(indexName, indexedValue string) ([]interface{}, error)
|
||||||
GetIndexers() Indexers
|
GetIndexers() Indexers
|
||||||
|
|
||||||
// AddIndexers adds more indexers to this store. If you call this after you already have data
|
// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
|
||||||
// in the store, the results are undefined.
|
|
||||||
AddIndexers(newIndexers Indexers) error
|
AddIndexers(newIndexers Indexers) error
|
||||||
// Resync is a no-op and is deprecated
|
// Resync is a no-op and is deprecated
|
||||||
Resync() error
|
Resync() error
|
||||||
@ -135,50 +134,66 @@ func (i *storeIndex) addIndexers(newIndexers Indexers) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateSingleIndex modifies the objects location in the named index:
|
||||||
|
// - for create you must provide only the newObj
|
||||||
|
// - for update you must provide both the oldObj and the newObj
|
||||||
|
// - for delete you must provide only the oldObj
|
||||||
|
// updateSingleIndex must be called from a function that already has a lock on the cache
|
||||||
|
func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) {
|
||||||
|
var oldIndexValues, indexValues []string
|
||||||
|
indexFunc, ok := i.indexers[name]
|
||||||
|
if !ok {
|
||||||
|
// Should never happen. Caller is responsible for ensuring this exists, and should call with lock
|
||||||
|
// held to avoid any races.
|
||||||
|
panic(fmt.Errorf("indexer %q does not exist", name))
|
||||||
|
}
|
||||||
|
if oldObj != nil {
|
||||||
|
var err error
|
||||||
|
oldIndexValues, err = indexFunc(oldObj)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
oldIndexValues = oldIndexValues[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
if newObj != nil {
|
||||||
|
var err error
|
||||||
|
indexValues, err = indexFunc(newObj)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
indexValues = indexValues[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
index := i.indices[name]
|
||||||
|
if index == nil {
|
||||||
|
index = Index{}
|
||||||
|
i.indices[name] = index
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
|
||||||
|
// We optimize for the most common case where indexFunc returns a single value which has not been changed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, value := range oldIndexValues {
|
||||||
|
i.deleteKeyFromIndex(key, value, index)
|
||||||
|
}
|
||||||
|
for _, value := range indexValues {
|
||||||
|
i.addKeyToIndex(key, value, index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// updateIndices modifies the objects location in the managed indexes:
|
// updateIndices modifies the objects location in the managed indexes:
|
||||||
// - for create you must provide only the newObj
|
// - for create you must provide only the newObj
|
||||||
// - for update you must provide both the oldObj and the newObj
|
// - for update you must provide both the oldObj and the newObj
|
||||||
// - for delete you must provide only the oldObj
|
// - for delete you must provide only the oldObj
|
||||||
// updateIndices must be called from a function that already has a lock on the cache
|
// updateIndices must be called from a function that already has a lock on the cache
|
||||||
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
|
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
|
||||||
var oldIndexValues, indexValues []string
|
for name := range i.indexers {
|
||||||
var err error
|
i.updateSingleIndex(name, oldObj, newObj, key)
|
||||||
for name, indexFunc := range i.indexers {
|
|
||||||
if oldObj != nil {
|
|
||||||
oldIndexValues, err = indexFunc(oldObj)
|
|
||||||
} else {
|
|
||||||
oldIndexValues = oldIndexValues[:0]
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
|
||||||
}
|
|
||||||
|
|
||||||
if newObj != nil {
|
|
||||||
indexValues, err = indexFunc(newObj)
|
|
||||||
} else {
|
|
||||||
indexValues = indexValues[:0]
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
|
|
||||||
}
|
|
||||||
|
|
||||||
index := i.indices[name]
|
|
||||||
if index == nil {
|
|
||||||
index = Index{}
|
|
||||||
i.indices[name] = index
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
|
|
||||||
// We optimize for the most common case where indexFunc returns a single value which has not been changed
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, value := range oldIndexValues {
|
|
||||||
i.deleteKeyFromIndex(key, value, index)
|
|
||||||
}
|
|
||||||
for _, value := range indexValues {
|
|
||||||
i.addKeyToIndex(key, value, index)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -339,11 +354,18 @@ func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
|
|||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
defer c.lock.Unlock()
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
if len(c.items) > 0 {
|
if err := c.index.addIndexers(newIndexers); err != nil {
|
||||||
return fmt.Errorf("cannot add indexers to running index")
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.index.addIndexers(newIndexers)
|
// If there are already items, index them
|
||||||
|
for key, item := range c.items {
|
||||||
|
for name := range newIndexers {
|
||||||
|
c.index.updateSingleIndex(name, nil, item, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *threadSafeMap) Resync() error {
|
func (c *threadSafeMap) Resync() error {
|
||||||
|
Loading…
Reference in New Issue
Block a user