mirror of
https://github.com/kubernetes/client-go.git
synced 2025-12-25 14:12:27 +00:00
Merge pull request #135580 from serathius/client-go-transformer
Embed proper interface in TransformingStore to ensure DeltaFIFO and RealFIFO are implementing it Kubernetes-commit: 04e8064bccebd04981ee0094457550c9de4f92e3
This commit is contained in:
9
go.mod
9
go.mod
@@ -24,8 +24,8 @@ require (
|
||||
golang.org/x/time v0.9.0
|
||||
google.golang.org/protobuf v1.36.8
|
||||
gopkg.in/evanphx/json-patch.v4 v4.13.0
|
||||
k8s.io/api v0.0.0
|
||||
k8s.io/apimachinery v0.0.0
|
||||
k8s.io/api v0.0.0-20251204222945-bbcbaa8f8665
|
||||
k8s.io/apimachinery v0.0.0-20251204222403-72d71eac265e
|
||||
k8s.io/klog/v2 v2.130.1
|
||||
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912
|
||||
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4
|
||||
@@ -62,8 +62,3 @@ require (
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
replace (
|
||||
k8s.io/api => ../api
|
||||
k8s.io/apimachinery => ../apimachinery
|
||||
)
|
||||
|
||||
14
go.sum
14
go.sum
@@ -1,10 +1,7 @@
|
||||
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
|
||||
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
|
||||
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
|
||||
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
|
||||
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
|
||||
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
|
||||
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
@@ -25,7 +22,6 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr
|
||||
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
|
||||
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
|
||||
github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo=
|
||||
@@ -41,7 +37,6 @@ github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5T
|
||||
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
|
||||
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
|
||||
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
@@ -98,7 +93,6 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
|
||||
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
|
||||
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc=
|
||||
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
|
||||
golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w=
|
||||
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
|
||||
@@ -117,9 +111,6 @@ golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
|
||||
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
|
||||
golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs=
|
||||
golang.org/x/tools/go/expect v0.1.0-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY=
|
||||
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated/go.mod h1:RVAQXBGNv1ib0J382/DPCRS/BPnsGebyM1Gj5VSDpG8=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
|
||||
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
@@ -132,7 +123,10 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU=
|
||||
k8s.io/api v0.0.0-20251204222945-bbcbaa8f8665 h1:yCdvBpHA4R+NYVHh6B+ZWOmN0FhnqP1uGX9oNLhDWLw=
|
||||
k8s.io/api v0.0.0-20251204222945-bbcbaa8f8665/go.mod h1:etlr1bA5uFLXrSHE4hq3fjX6JCVC4aD10YGu8kktjJM=
|
||||
k8s.io/apimachinery v0.0.0-20251204222403-72d71eac265e h1:j68TlPomsB5ecepACqUXTMgiob1Hmx4N+VTd+jzORI8=
|
||||
k8s.io/apimachinery v0.0.0-20251204222403-72d71eac265e/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns=
|
||||
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
|
||||
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
|
||||
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE=
|
||||
|
||||
31
tools/cache/controller.go
vendored
31
tools/cache/controller.go
vendored
@@ -708,20 +708,7 @@ func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
|
||||
var fifo Queue
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
fifo = NewRealFIFOWithOptions(RealFIFOOptions{
|
||||
KeyFunction: MetaNamespaceKeyFunc,
|
||||
KnownObjects: clientState,
|
||||
Transformer: options.Transform,
|
||||
})
|
||||
} else {
|
||||
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: clientState,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: options.Transform,
|
||||
})
|
||||
}
|
||||
fifo := newQueueFIFO(clientState, options.Transform)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
@@ -742,3 +729,19 @@ func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
}
|
||||
return New(cfg)
|
||||
}
|
||||
|
||||
func newQueueFIFO(clientState Store, transform TransformFunc) Queue {
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
return NewRealFIFOWithOptions(RealFIFOOptions{
|
||||
KeyFunction: MetaNamespaceKeyFunc,
|
||||
KnownObjects: clientState,
|
||||
Transformer: transform,
|
||||
})
|
||||
} else {
|
||||
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: clientState,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: transform,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
3
tools/cache/delta_fifo.go
vendored
3
tools/cache/delta_fifo.go
vendored
@@ -270,7 +270,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
||||
}
|
||||
|
||||
var (
|
||||
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
|
||||
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
|
||||
_ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
22
tools/cache/delta_fifo_test.go
vendored
22
tools/cache/delta_fifo_test.go
vendored
@@ -28,7 +28,7 @@ import (
|
||||
// from the most recent Delta.
|
||||
// You should treat the items returned inside the deltas as immutable.
|
||||
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
|
||||
func (f *DeltaFIFO) List() []interface{} {
|
||||
func (f *DeltaFIFO) list() []interface{} {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
return f.listLocked()
|
||||
@@ -46,7 +46,7 @@ func (f *DeltaFIFO) listLocked() []interface{} {
|
||||
// ListKeys returns a list of all the keys of the objects currently
|
||||
// in the FIFO.
|
||||
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
|
||||
func (f *DeltaFIFO) ListKeys() []string {
|
||||
func (f *DeltaFIFO) listKeys() []string {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
list := make([]string, 0, len(f.queue))
|
||||
@@ -60,19 +60,19 @@ func (f *DeltaFIFO) ListKeys() []string {
|
||||
// or sets exists=false.
|
||||
// You should treat the items returned inside the deltas as immutable.
|
||||
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
|
||||
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
|
||||
func (f *DeltaFIFO) get(obj interface{}) (item interface{}, exists bool, err error) {
|
||||
key, err := f.KeyOf(obj)
|
||||
if err != nil {
|
||||
return nil, false, KeyError{obj, err}
|
||||
}
|
||||
return f.GetByKey(key)
|
||||
return f.getByKey(key)
|
||||
}
|
||||
|
||||
// GetByKey returns the complete list of deltas for the requested item,
|
||||
// setting exists=false if that list is empty.
|
||||
// You should treat the items returned inside the deltas as immutable.
|
||||
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
|
||||
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
|
||||
func (f *DeltaFIFO) getByKey(key string) (item interface{}, exists bool, err error) {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
d, exists := f.items[key]
|
||||
@@ -320,10 +320,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
|
||||
f.Update(mkFifoObj("foo", 12))
|
||||
f.Delete(mkFifoObj("foo", 15))
|
||||
|
||||
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) {
|
||||
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.list(); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %+v, got %+v", e, a)
|
||||
}
|
||||
if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
|
||||
if e, a := []string{"foo"}, f.listKeys(); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %+v, got %+v", e, a)
|
||||
}
|
||||
|
||||
@@ -349,7 +349,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
|
||||
t.Errorf("Got second value %v", unexpected.val)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
_, exists, _ := f.Get(mkFifoObj("foo", ""))
|
||||
_, exists, _ := f.get(mkFifoObj("foo", ""))
|
||||
if exists {
|
||||
t.Errorf("item did not get removed")
|
||||
}
|
||||
@@ -397,7 +397,7 @@ func TestDeltaFIFO_transformer(t *testing.T) {
|
||||
must(f.Replace([]interface{}{}, ""))
|
||||
|
||||
// Should be empty
|
||||
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
|
||||
if e, a := []string{"foo", "bar"}, f.listKeys(); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %+v, got %+v", e, a)
|
||||
}
|
||||
|
||||
@@ -507,7 +507,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
|
||||
t.Errorf("Got second value %v", unexpected.val)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
_, exists, _ := f.Get(mkFifoObj("foo", ""))
|
||||
_, exists, _ := f.get(mkFifoObj("foo", ""))
|
||||
if exists {
|
||||
t.Errorf("item did not get removed")
|
||||
}
|
||||
@@ -991,7 +991,7 @@ func BenchmarkDeltaFIFOListKeys(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_ = f.ListKeys()
|
||||
_ = f.listKeys()
|
||||
}
|
||||
})
|
||||
b.StopTimer()
|
||||
|
||||
8
tools/cache/reflector.go
vendored
8
tools/cache/reflector.go
vendored
@@ -79,7 +79,7 @@ type ReflectorStore interface {
|
||||
// TransformingStore is an optional interface that can be implemented by the provided store.
|
||||
// If implemented on the provided store reflector will use the same transformer in its internal stores.
|
||||
type TransformingStore interface {
|
||||
Store
|
||||
ReflectorStore
|
||||
Transformer() TransformFunc
|
||||
}
|
||||
|
||||
@@ -733,9 +733,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
|
||||
return false
|
||||
}
|
||||
|
||||
var transformer TransformFunc
|
||||
storeOpts := []StoreOption{}
|
||||
if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil {
|
||||
storeOpts = append(storeOpts, WithTransformer(tr.Transformer()))
|
||||
transformer = tr.Transformer()
|
||||
storeOpts = append(storeOpts, WithTransformer(transformer))
|
||||
}
|
||||
|
||||
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
|
||||
@@ -795,7 +797,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
|
||||
// we utilize the temporaryStore to ensure independence from the current store implementation.
|
||||
// as of today, the store is implemented as a queue and will be drained by the higher-level
|
||||
// component as soon as it finishes replacing the content.
|
||||
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List)
|
||||
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, transformer, temporaryStore.List)
|
||||
|
||||
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
|
||||
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)
|
||||
|
||||
@@ -33,11 +33,11 @@ import (
|
||||
//
|
||||
// Note that this function will panic when data inconsistency is detected.
|
||||
// This is intentional because we want to catch it in the CI.
|
||||
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
|
||||
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], listItemTransformFunc func(interface{}) (interface{}, error), retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
|
||||
if !consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
|
||||
return
|
||||
}
|
||||
// for informers we pass an empty ListOptions because
|
||||
// listFn might be wrapped for filtering during informer construction.
|
||||
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
|
||||
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, listItemTransformFunc, metav1.ListOptions{}, retrieveItemsFn)
|
||||
}
|
||||
|
||||
114
tools/cache/reflector_data_consistency_detector_test.go
vendored
Normal file
114
tools/cache/reflector_data_consistency_detector_test.go
vendored
Normal file
@@ -0,0 +1,114 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
clientfeatures "k8s.io/client-go/features"
|
||||
clientfeaturestesting "k8s.io/client-go/features/testing"
|
||||
"k8s.io/client-go/util/consistencydetector"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
func TestReflectorDataConsistencyDetector(t *testing.T) {
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
|
||||
restore := consistencydetector.SetDataConsistencyDetectionForWatchListEnabledForTest(true)
|
||||
defer restore()
|
||||
|
||||
markTransformed := func(obj interface{}) (interface{}, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return obj, nil
|
||||
}
|
||||
newPod := pod.DeepCopy()
|
||||
if newPod.Labels == nil {
|
||||
newPod.Labels = make(map[string]string)
|
||||
}
|
||||
newPod.Labels["transformed"] = "true"
|
||||
return newPod, nil
|
||||
}
|
||||
|
||||
for _, inOrder := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("InOrder=%v", inOrder), func(t *testing.T) {
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.InOrderInformers, inOrder)
|
||||
for _, transformerEnabled := range []bool{false, true} {
|
||||
var transformer TransformFunc
|
||||
if transformerEnabled {
|
||||
transformer = markTransformed
|
||||
}
|
||||
t.Run(fmt.Sprintf("Transformer=%v", transformerEnabled), func(t *testing.T) {
|
||||
runTestReflectorDataConsistencyDetector(t, transformer)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
store := NewStore(MetaNamespaceKeyFunc)
|
||||
fifo := newQueueFIFO(store, transformer)
|
||||
|
||||
lw := &ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return &v1.PodList{
|
||||
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
|
||||
Items: []v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}},
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
w := watch.NewFake()
|
||||
go func() {
|
||||
w.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}})
|
||||
w.Action(watch.Bookmark, &v1.Pod{ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-1",
|
||||
ResourceVersion: "1",
|
||||
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
|
||||
}})
|
||||
}()
|
||||
return w, nil
|
||||
},
|
||||
}
|
||||
|
||||
r := NewReflector(lw, &v1.Pod{}, fifo, 0)
|
||||
|
||||
go func() {
|
||||
_ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
return r.LastSyncResourceVersion() != "", nil
|
||||
})
|
||||
cancel()
|
||||
}()
|
||||
|
||||
err := r.ListAndWatchWithContext(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("ListAndWatchWithContext returned error: %v", err)
|
||||
}
|
||||
}
|
||||
242
tools/cache/reflector_test.go
vendored
242
tools/cache/reflector_test.go
vendored
@@ -20,10 +20,12 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -1962,7 +1964,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
|
||||
s := NewFIFO(MetaNamespaceKeyFunc)
|
||||
var replaceInvoked atomic.Int32
|
||||
store := &fakeStore{
|
||||
Store: s,
|
||||
ReflectorStore: s,
|
||||
beforeReplace: func(list []interface{}, rv string) {
|
||||
// interested in the Replace call that happens after the Error event
|
||||
if rv == lastExpectedRV {
|
||||
@@ -2057,131 +2059,165 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReflectorRespectStoreTransformer(t *testing.T) {
|
||||
mkPod := func(id string, rv string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
|
||||
Spec: v1.PodSpec{
|
||||
Hostname: "test",
|
||||
for name, test := range map[string]struct {
|
||||
storeBuilder func(counter *atomic.Int32) ReflectorStore
|
||||
items func(rs ReflectorStore) []interface{}
|
||||
}{
|
||||
"real-fifo": {
|
||||
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
|
||||
return NewRealFIFO(MetaNamespaceKeyFunc, NewStore(MetaNamespaceKeyFunc), func(i interface{}) (interface{}, error) {
|
||||
counter.Add(1)
|
||||
cast := i.(*v1.Pod)
|
||||
cast.Spec.Hostname = "transformed"
|
||||
return cast, nil
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
preExisting1 := mkPod("foo-1", "1")
|
||||
preExisting2 := mkPod("foo-2", "2")
|
||||
pod3 := mkPod("foo-3", "3")
|
||||
|
||||
lastExpectedRV := "3"
|
||||
events := []watch.Event{
|
||||
{Type: watch.Added, Object: preExisting1},
|
||||
{Type: watch.Added, Object: preExisting2},
|
||||
{Type: watch.Bookmark, Object: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
ResourceVersion: lastExpectedRV,
|
||||
Annotations: map[string]string{
|
||||
metav1.InitialEventsAnnotationKey: "true",
|
||||
},
|
||||
items: func(rs ReflectorStore) []interface{} {
|
||||
store := rs.(*RealFIFO)
|
||||
objects := make(map[string]interface{})
|
||||
for _, item := range store.getItems() {
|
||||
key, _ := store.keyFunc(item.Object)
|
||||
if item.Type == Deleted {
|
||||
delete(objects, key)
|
||||
} else {
|
||||
objects[key] = item.Object
|
||||
}
|
||||
}
|
||||
return slices.Collect(maps.Values(objects))
|
||||
},
|
||||
}},
|
||||
{Type: watch.Added, Object: pod3},
|
||||
}
|
||||
|
||||
s := NewFIFO(MetaNamespaceKeyFunc)
|
||||
var replaceInvoked atomic.Int32
|
||||
store := &fakeStore{
|
||||
Store: s,
|
||||
beforeReplace: func(list []interface{}, rv string) {
|
||||
replaceInvoked.Add(1)
|
||||
// Only two pods are present at the point when Replace is called.
|
||||
if len(list) != 2 {
|
||||
t.Errorf("unexpected nb of objects: expected 2 received %d", len(list))
|
||||
},
|
||||
"delta-fifo": {
|
||||
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
|
||||
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KeyFunction: MetaNamespaceKeyFunc,
|
||||
Transformer: func(i interface{}) (interface{}, error) {
|
||||
counter.Add(1)
|
||||
cast := i.(*v1.Pod)
|
||||
cast.Spec.Hostname = "transformed"
|
||||
return cast, nil
|
||||
},
|
||||
})
|
||||
},
|
||||
items: func(rs ReflectorStore) []interface{} {
|
||||
return rs.(*DeltaFIFO).list()
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
mkPod := func(id string, rv string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
|
||||
Spec: v1.PodSpec{
|
||||
Hostname: "test",
|
||||
},
|
||||
}
|
||||
}
|
||||
for _, obj := range list {
|
||||
cast := obj.(*v1.Pod)
|
||||
|
||||
preExisting1 := mkPod("foo-1", "1")
|
||||
preExisting2 := mkPod("foo-2", "2")
|
||||
pod3 := mkPod("foo-3", "3")
|
||||
|
||||
lastExpectedRV := "3"
|
||||
events := []watch.Event{
|
||||
{Type: watch.Added, Object: preExisting1},
|
||||
{Type: watch.Added, Object: preExisting2},
|
||||
{Type: watch.Bookmark, Object: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
ResourceVersion: lastExpectedRV,
|
||||
Annotations: map[string]string{
|
||||
metav1.InitialEventsAnnotationKey: "true",
|
||||
},
|
||||
},
|
||||
}},
|
||||
{Type: watch.Added, Object: pod3},
|
||||
}
|
||||
|
||||
var transformerInvoked atomic.Int32
|
||||
s := test.storeBuilder(&transformerInvoked)
|
||||
|
||||
var once sync.Once
|
||||
lw := &ListWatch{
|
||||
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
once.Do(func() {
|
||||
for _, e := range events {
|
||||
fw.Action(e.Type, e.Object)
|
||||
}
|
||||
})
|
||||
}()
|
||||
return fw, nil
|
||||
},
|
||||
// ListFunc should never be used in WatchList mode
|
||||
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
|
||||
return nil, errors.New("list call not expected in WatchList mode")
|
||||
},
|
||||
}
|
||||
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
r.RunWithContext(ctx)
|
||||
}()
|
||||
|
||||
// wait for the RV to sync to the version returned by the final list
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
|
||||
}
|
||||
|
||||
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
|
||||
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
|
||||
}
|
||||
|
||||
informerItems := test.items(s)
|
||||
if want, got := 3, len(informerItems); want != got {
|
||||
t.Errorf("expected informer to contain %d objects, but got: %d", want, got)
|
||||
}
|
||||
for _, item := range informerItems {
|
||||
cast := item.(*v1.Pod)
|
||||
if cast.Spec.Hostname != "transformed" {
|
||||
t.Error("Object was not transformed prior to replacement")
|
||||
}
|
||||
}
|
||||
},
|
||||
afterReplace: func(rv string, err error) {},
|
||||
transformer: func(i interface{}) (interface{}, error) {
|
||||
cast := i.(*v1.Pod)
|
||||
cast.Spec.Hostname = "transformed"
|
||||
return cast, nil
|
||||
},
|
||||
}
|
||||
|
||||
var once sync.Once
|
||||
lw := &ListWatch{
|
||||
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
once.Do(func() {
|
||||
for _, e := range events {
|
||||
fw.Action(e.Type, e.Object)
|
||||
}
|
||||
})
|
||||
}()
|
||||
return fw, nil
|
||||
},
|
||||
// ListFunc should never be used in WatchList mode
|
||||
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
|
||||
return nil, errors.New("list call not expected in WatchList mode")
|
||||
},
|
||||
}
|
||||
// Transformer should have been invoked twice for the initial sync in the informer on the temporary store,
|
||||
// then twice on replace, then once on the following update.
|
||||
if want, got := 5, int(transformerInvoked.Load()); want != got {
|
||||
t.Errorf("expected transformer to be invoked %d times, but got: %d", want, got)
|
||||
}
|
||||
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
|
||||
r := NewReflector(lw, &v1.Pod{}, store, 0)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
r.RunWithContext(ctx)
|
||||
}()
|
||||
|
||||
// wait for the RV to sync to the version returned by the final list
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
|
||||
}
|
||||
|
||||
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
|
||||
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
|
||||
}
|
||||
if want, got := 1, int(replaceInvoked.Load()); want != got {
|
||||
t.Errorf("expected replace to be invoked %d times, but got: %d", want, got)
|
||||
}
|
||||
|
||||
cancel()
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("timed out waiting for Run to return")
|
||||
cancel()
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("timed out waiting for Run to return")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeStore struct {
|
||||
Store
|
||||
ReflectorStore
|
||||
beforeReplace func(list []interface{}, s string)
|
||||
afterReplace func(rv string, err error)
|
||||
transformer TransformFunc
|
||||
}
|
||||
|
||||
func (f *fakeStore) Replace(list []interface{}, rv string) error {
|
||||
f.beforeReplace(list, rv)
|
||||
err := f.Store.Replace(list, rv)
|
||||
err := f.ReflectorStore.Replace(list, rv)
|
||||
f.afterReplace(rv, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *fakeStore) Transformer() TransformFunc {
|
||||
return f.transformer
|
||||
}
|
||||
|
||||
func BenchmarkExtractList(b *testing.B) {
|
||||
_, _, podList := getPodListItems(0, fakeItemsNum)
|
||||
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
||||
|
||||
15
tools/cache/shared_informer.go
vendored
15
tools/cache/shared_informer.go
vendored
@@ -539,20 +539,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
var fifo Queue
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
fifo = NewRealFIFOWithOptions(RealFIFOOptions{
|
||||
KeyFunction: MetaNamespaceKeyFunc,
|
||||
KnownObjects: s.indexer,
|
||||
Transformer: s.transform,
|
||||
})
|
||||
} else {
|
||||
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: s.indexer,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: s.transform,
|
||||
})
|
||||
}
|
||||
fifo := newQueueFIFO(s.indexer, s.transform)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
|
||||
3
tools/cache/the_real_fifo.go
vendored
3
tools/cache/the_real_fifo.go
vendored
@@ -89,7 +89,8 @@ type RealFIFO struct {
|
||||
}
|
||||
|
||||
var (
|
||||
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
|
||||
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
|
||||
_ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations
|
||||
)
|
||||
|
||||
// Close the queue.
|
||||
|
||||
@@ -45,16 +45,28 @@ func IsDataConsistencyDetectionForWatchListEnabled() bool {
|
||||
return dataConsistencyDetectionForWatchListEnabled
|
||||
}
|
||||
|
||||
// SetDataConsistencyDetectionForWatchListEnabledForTest allows to enable/disable data consistency detection for testing purposes.
|
||||
// It returns a function that restores the original value.
|
||||
func SetDataConsistencyDetectionForWatchListEnabledForTest(enabled bool) func() {
|
||||
original := dataConsistencyDetectionForWatchListEnabled
|
||||
dataConsistencyDetectionForWatchListEnabled = enabled
|
||||
return func() {
|
||||
dataConsistencyDetectionForWatchListEnabled = original
|
||||
}
|
||||
}
|
||||
|
||||
type RetrieveItemsFunc[U any] func() []U
|
||||
|
||||
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
|
||||
|
||||
type TransformFunc func(interface{}) (interface{}, error)
|
||||
|
||||
// CheckDataConsistency exists solely for testing purposes.
|
||||
// we cannot use checkWatchListDataConsistencyIfRequested because
|
||||
// it is guarded by an environmental variable.
|
||||
// we cannot manipulate the environmental variable because
|
||||
// it will affect other tests in this package.
|
||||
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
|
||||
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listItemTransformFunc TransformFunc, listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
|
||||
if !canFormAdditionalListCall(lastSyncedResourceVersion, listOptions) {
|
||||
klog.V(4).Infof("data consistency check for %s is enabled but the parameters (RV, ListOptions) doesn't allow for creating a valid LIST request. Skipping the data consistency check.", identity)
|
||||
return
|
||||
@@ -84,6 +96,15 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
|
||||
if err != nil {
|
||||
panic(err) // this should never happen
|
||||
}
|
||||
if listItemTransformFunc != nil {
|
||||
for i := range rawListItems {
|
||||
obj, err := listItemTransformFunc(rawListItems[i])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawListItems[i] = obj.(runtime.Object)
|
||||
}
|
||||
}
|
||||
listItems := toMetaObjectSliceOrDie(rawListItems)
|
||||
|
||||
sort.Sort(byUID(listItems))
|
||||
|
||||
@@ -215,10 +215,10 @@ func TestDataConsistencyChecker(t *testing.T) {
|
||||
|
||||
if scenario.expectPanic {
|
||||
require.Panics(t, func() {
|
||||
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
|
||||
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
|
||||
})
|
||||
} else {
|
||||
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
|
||||
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
|
||||
}
|
||||
|
||||
require.Equal(t, scenario.expectedListRequests, fakeLister.counter)
|
||||
@@ -235,7 +235,7 @@ func TestDataConsistencyCheckerRetry(t *testing.T) {
|
||||
stopListErrorAfter := 5
|
||||
fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter}
|
||||
|
||||
CheckDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc)
|
||||
CheckDataConsistency(ctx, "", "", fakeErrLister.List, nil, metav1.ListOptions{}, retrievedItemsFunc)
|
||||
require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user