Compare commits

...

13 Commits

Author SHA1 Message Date
Kubernetes Publisher
d43aed2afb Update dependencies to v0.35.4 tag 2026-04-15 22:51:48 +00:00
Kubernetes Publisher
8ebd9bb45e Merge pull request #138356 from dims/update-moby-spdystream-v0.5.1-1.35
[1.35] Update github.com/moby/spdystream from v0.5.0 to v0.5.1

Kubernetes-commit: 93828861e0b554581ba647d48efaba1ff579afa3
2026-04-14 15:19:54 +00:00
Davanum Srinivas
00b2f2b182 Update github.com/moby/spdystream from v0.5.0 to v0.5.1
Kubernetes-commit: 7e9c2c8eef26f99aa2f94d8e09d6d32de86c7769

Kubernetes-commit: 1687aa8c94e73c3a1744cb1f4498c0fbc41abe0c
2026-04-13 14:14:40 -04:00
Kubernetes Publisher
f80003c240 Merge pull request #136903 from pohly/automated-cherry-pick-of-#136455-origin-release-1.35
Automated cherry pick of #136455: fake client-go: un-deprecate NewSimpleClientset

Kubernetes-commit: a0e5f1aba53a16db2c2cd0a2cb33c60f43e4d984
2026-03-05 00:46:20 +05:30
Patrick Ohly
8b415569d9 fake client-go: un-deprecate NewSimpleClientset
NewSimpleClientset was marked as deprecated when NewClientset was
introduced. This has caused some confusion:
- Not all packages have NewClientset (https://github.com/kubernetes/kubernetes/issues/135980).
- Tests that work with NewSimpleClientset fail when
  switched to NewClientset (https://github.com/kubernetes/kubernetes/issues/136327)
  because of missing CRD support (https://github.com/kubernetes/kubernetes/issues/126850).

It doesn't seem burdensome to keep NewSimpleClientset around forever. Some unit
tests may even prefer to use it when they don't need server-side apply (less
overhead). Therefore there is no need to deprecate it.

This avoids churn in the eco system because contributors no longer create PRs
"because the linter complains about the usage of a deprecated function".

Kubernetes-commit: bd399917375d2a1d77c04f8dfd67a67301af9721
2026-01-23 11:20:40 +01:00
Kubernetes Publisher
2d83546256 Merge remote-tracking branch 'origin/master' into release-1.35
Kubernetes-commit: b5f0a8229dd6ce88c2364257acb65d5875706b14
2025-12-05 03:34:45 +00:00
Kubernetes Publisher
56b4af2aeb Merge pull request #135591 from p0lyn0mial/upstream-watchlist-reflector-log-fallback
downgrade reflector watchlist fallback log to V(4)

Kubernetes-commit: 9293f9326d41e1e4ad53096ef180dd6ab0f9c699
2025-12-05 06:29:44 +00:00
Kubernetes Publisher
891f94c690 Merge remote-tracking branch 'origin/master' into release-1.35
Kubernetes-commit: fc0159905536cd41e3d2a3ac8fcbd905d1bc7c3f
2025-12-04 21:37:05 +00:00
Kubernetes Publisher
65ffe044e5 Merge pull request #135580 from serathius/client-go-transformer
Embed proper interface in TransformingStore to ensure DeltaFIFO and RealFIFO are implementing it

Kubernetes-commit: 04e8064bccebd04981ee0094457550c9de4f92e3
2025-12-04 22:45:12 +00:00
Lukasz Szaszkiewicz
2fe4ac239c downgrade reflector watchlist fallback log to V(4)
Kubernetes-commit: 3f42ca14011e972ee439a27d47415bc7574f2317
2025-12-04 16:14:19 +01:00
Jordan Liggitt
171ef8cd00 Use transformer in consistency checker
Kubernetes-commit: 91368adbb556286942d996c60ab6cc39306415b7
2025-11-26 15:19:00 -05:00
Valerian Roche
7cf6a05732 Add unit tests for Data Consistency Detector
Kubernetes-commit: 76da8d6de027a4bf62601d45b8d72f8fa627ab5c
2025-07-28 13:53:34 -04:00
Valerian Roche
f466f58eea [client-go #1415] Embed proper interface in TransformingStore to ensure DeltaFIFO and RealFIFO are implementing it
Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>

Kubernetes-commit: 88c20d46a4e5ca8db7519c81856a358c919ae262
2025-07-28 13:53:34 -04:00
14 changed files with 335 additions and 178 deletions

11
go.mod
View File

@@ -24,8 +24,8 @@ require (
golang.org/x/time v0.9.0
google.golang.org/protobuf v1.36.8
gopkg.in/evanphx/json-patch.v4 v4.13.0
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
k8s.io/api v0.35.4
k8s.io/apimachinery v0.35.4
k8s.io/klog/v2 v2.130.1
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4
@@ -48,7 +48,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/moby/spdystream v0.5.0 // indirect
github.com/moby/spdystream v0.5.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
@@ -62,8 +62,3 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace (
k8s.io/api => ../api
k8s.io/apimachinery => ../apimachinery
)

18
go.sum
View File

@@ -1,10 +1,7 @@
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -25,7 +22,6 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo=
@@ -41,7 +37,6 @@ github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5T
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -55,8 +50,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/moby/spdystream v0.5.0 h1:7r0J1Si3QO/kjRitvSLVVFUjxMEb/YLj6S9FF62JBCU=
github.com/moby/spdystream v0.5.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI=
github.com/moby/spdystream v0.5.1 h1:9sNYeYZUcci9R6/w7KDaFWEWeV4LStVG78Mpyq/Zm/Y=
github.com/moby/spdystream v0.5.1/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -98,7 +93,6 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc=
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w=
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
@@ -117,9 +111,6 @@ golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs=
golang.org/x/tools/go/expect v0.1.0-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY=
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated/go.mod h1:RVAQXBGNv1ib0J382/DPCRS/BPnsGebyM1Gj5VSDpG8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -132,7 +123,10 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU=
k8s.io/api v0.35.4 h1:P7nFYKl5vo9AGUp1Z+Pmd3p2tA7bX2wbFWCvDeRv988=
k8s.io/api v0.35.4/go.mod h1:yl4lqySWOgYJJf9RERXKUwE9g2y+CkuwG+xmcOK8wXU=
k8s.io/apimachinery v0.35.4 h1:xtdom9RG7e+yDp71uoXoJDWEE2eOiHgeO4GdBzwWpds=
k8s.io/apimachinery v0.35.4/go.mod h1:NNi1taPOpep0jOj+oRha3mBJPqvi0hGdaV8TCqGQ+cc=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE=

View File

@@ -143,10 +143,6 @@ import (
// It's backed by a very simple object tracker that processes creates, updates and deletions as-is,
// without applying any field management, validations and/or defaults. It shouldn't be considered a replacement
// for a real clientset and is mostly useful in simple unit tests.
//
// Deprecated: NewClientset replaces this with support for field management, which significantly improves
// server side apply testing. NewClientset is only available when apply configurations are generated (e.g.
// via --with-applyconfig).
func NewSimpleClientset(objects ...runtime.Object) *Clientset {
o := testing.NewObjectTracker(scheme, codecs.UniversalDecoder())
for _, obj := range objects {
@@ -207,6 +203,10 @@ func (c *Clientset) IsWatchListSemanticsUnSupported() bool {
// It's backed by a very simple object tracker that processes creates, updates and deletions as-is,
// without applying any validations and/or defaults. It shouldn't be considered a replacement
// for a real clientset and is mostly useful in simple unit tests.
//
// Compared to NewSimpleClientset, the Clientset returned here supports field tracking and thus
// server-side apply. Beware though that support in that for CRDs is missing
// (https://github.com/kubernetes/kubernetes/issues/126850).
func NewClientset(objects ...runtime.Object) *Clientset {
o := testing.NewFieldManagedObjectTracker(
scheme,

View File

@@ -708,20 +708,7 @@ func newInformer(clientState Store, options InformerOptions) Controller {
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFOWithOptions(RealFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
KnownObjects: clientState,
Transformer: options.Transform,
})
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: options.Transform,
})
}
fifo := newQueueFIFO(clientState, options.Transform)
cfg := &Config{
Queue: fifo,
@@ -742,3 +729,19 @@ func newInformer(clientState Store, options InformerOptions) Controller {
}
return New(cfg)
}
func newQueueFIFO(clientState Store, transform TransformFunc) Queue {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
return NewRealFIFOWithOptions(RealFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
KnownObjects: clientState,
Transformer: transform,
})
} else {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transform,
})
}
}

View File

@@ -270,7 +270,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
}
var (
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
)
var (

View File

@@ -28,7 +28,7 @@ import (
// from the most recent Delta.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) List() []interface{} {
func (f *DeltaFIFO) list() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
@@ -46,7 +46,7 @@ func (f *DeltaFIFO) listLocked() []interface{} {
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) ListKeys() []string {
func (f *DeltaFIFO) listKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.queue))
@@ -60,19 +60,19 @@ func (f *DeltaFIFO) ListKeys() []string {
// or sets exists=false.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
func (f *DeltaFIFO) get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
return f.getByKey(key)
}
// GetByKey returns the complete list of deltas for the requested item,
// setting exists=false if that list is empty.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
func (f *DeltaFIFO) getByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
@@ -320,10 +320,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
f.Update(mkFifoObj("foo", 12))
f.Delete(mkFifoObj("foo", 15))
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) {
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.list(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
if e, a := []string{"foo"}, f.listKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
@@ -349,7 +349,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists, _ := f.Get(mkFifoObj("foo", ""))
_, exists, _ := f.get(mkFifoObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
@@ -397,7 +397,7 @@ func TestDeltaFIFO_transformer(t *testing.T) {
must(f.Replace([]interface{}{}, ""))
// Should be empty
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
if e, a := []string{"foo", "bar"}, f.listKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
@@ -507,7 +507,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists, _ := f.Get(mkFifoObj("foo", ""))
_, exists, _ := f.get(mkFifoObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
@@ -991,7 +991,7 @@ func BenchmarkDeltaFIFOListKeys(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = f.ListKeys()
_ = f.listKeys()
}
})
b.StopTimer()

View File

@@ -79,7 +79,7 @@ type ReflectorStore interface {
// TransformingStore is an optional interface that can be implemented by the provided store.
// If implemented on the provided store reflector will use the same transformer in its internal stores.
type TransformingStore interface {
Store
ReflectorStore
Transformer() TransformFunc
}
@@ -426,7 +426,10 @@ func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
return nil
}
if err != nil {
logger.Error(err, "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking")
logger.V(4).Info(
"Data couldn't be fetched in watchlist mode. Falling back to regular list. This is expected if watchlist is not supported or disabled in kube-apiserver.",
"err", err,
)
fallbackToList = true
// ensure that we won't accidentally pass some garbage down the watch.
w = nil
@@ -733,9 +736,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
return false
}
var transformer TransformFunc
storeOpts := []StoreOption{}
if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil {
storeOpts = append(storeOpts, WithTransformer(tr.Transformer()))
transformer = tr.Transformer()
storeOpts = append(storeOpts, WithTransformer(transformer))
}
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
@@ -795,7 +800,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
// we utilize the temporaryStore to ensure independence from the current store implementation.
// as of today, the store is implemented as a queue and will be drained by the higher-level
// component as soon as it finishes replacing the content.
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List)
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, transformer, temporaryStore.List)
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)

View File

@@ -33,11 +33,11 @@ import (
//
// Note that this function will panic when data inconsistency is detected.
// This is intentional because we want to catch it in the CI.
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], listItemTransformFunc func(interface{}) (interface{}, error), retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
if !consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
return
}
// for informers we pass an empty ListOptions because
// listFn might be wrapped for filtering during informer construction.
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, listItemTransformFunc, metav1.ListOptions{}, retrieveItemsFn)
}

View File

@@ -0,0 +1,114 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"fmt"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing"
"k8s.io/client-go/util/consistencydetector"
"k8s.io/klog/v2/ktesting"
)
func TestReflectorDataConsistencyDetector(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
restore := consistencydetector.SetDataConsistencyDetectionForWatchListEnabledForTest(true)
defer restore()
markTransformed := func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return obj, nil
}
newPod := pod.DeepCopy()
if newPod.Labels == nil {
newPod.Labels = make(map[string]string)
}
newPod.Labels["transformed"] = "true"
return newPod, nil
}
for _, inOrder := range []bool{false, true} {
t.Run(fmt.Sprintf("InOrder=%v", inOrder), func(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.InOrderInformers, inOrder)
for _, transformerEnabled := range []bool{false, true} {
var transformer TransformFunc
if transformerEnabled {
transformer = markTransformed
}
t.Run(fmt.Sprintf("Transformer=%v", transformerEnabled), func(t *testing.T) {
runTestReflectorDataConsistencyDetector(t, transformer)
})
}
})
}
}
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
store := NewStore(MetaNamespaceKeyFunc)
fifo := newQueueFIFO(store, transformer)
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}},
},
}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
w := watch.NewFake()
go func() {
w.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}})
w.Action(watch.Bookmark, &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
ResourceVersion: "1",
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
}})
}()
return w, nil
},
}
r := NewReflector(lw, &v1.Pod{}, fifo, 0)
go func() {
_ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
return r.LastSyncResourceVersion() != "", nil
})
cancel()
}()
err := r.ListAndWatchWithContext(ctx)
if err != nil {
t.Errorf("ListAndWatchWithContext returned error: %v", err)
}
}

View File

@@ -20,10 +20,12 @@ import (
"context"
"errors"
"fmt"
"maps"
"math/rand"
"net/http"
"reflect"
goruntime "runtime"
"slices"
"strconv"
"sync"
"sync/atomic"
@@ -1962,7 +1964,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
ReflectorStore: s,
beforeReplace: func(list []interface{}, rv string) {
// interested in the Replace call that happens after the Error event
if rv == lastExpectedRV {
@@ -2057,131 +2059,165 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
}
func TestReflectorRespectStoreTransformer(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
Spec: v1.PodSpec{
Hostname: "test",
for name, test := range map[string]struct {
storeBuilder func(counter *atomic.Int32) ReflectorStore
items func(rs ReflectorStore) []interface{}
}{
"real-fifo": {
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
return NewRealFIFO(MetaNamespaceKeyFunc, NewStore(MetaNamespaceKeyFunc), func(i interface{}) (interface{}, error) {
counter.Add(1)
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
})
},
}
}
preExisting1 := mkPod("foo-1", "1")
preExisting2 := mkPod("foo-2", "2")
pod3 := mkPod("foo-3", "3")
lastExpectedRV := "3"
events := []watch.Event{
{Type: watch.Added, Object: preExisting1},
{Type: watch.Added, Object: preExisting2},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: lastExpectedRV,
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
items: func(rs ReflectorStore) []interface{} {
store := rs.(*RealFIFO)
objects := make(map[string]interface{})
for _, item := range store.getItems() {
key, _ := store.keyFunc(item.Object)
if item.Type == Deleted {
delete(objects, key)
} else {
objects[key] = item.Object
}
}
return slices.Collect(maps.Values(objects))
},
}},
{Type: watch.Added, Object: pod3},
}
s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
beforeReplace: func(list []interface{}, rv string) {
replaceInvoked.Add(1)
// Only two pods are present at the point when Replace is called.
if len(list) != 2 {
t.Errorf("unexpected nb of objects: expected 2 received %d", len(list))
},
"delta-fifo": {
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
Transformer: func(i interface{}) (interface{}, error) {
counter.Add(1)
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
},
})
},
items: func(rs ReflectorStore) []interface{} {
return rs.(*DeltaFIFO).list()
},
},
} {
t.Run(name, func(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
Spec: v1.PodSpec{
Hostname: "test",
},
}
}
for _, obj := range list {
cast := obj.(*v1.Pod)
preExisting1 := mkPod("foo-1", "1")
preExisting2 := mkPod("foo-2", "2")
pod3 := mkPod("foo-3", "3")
lastExpectedRV := "3"
events := []watch.Event{
{Type: watch.Added, Object: preExisting1},
{Type: watch.Added, Object: preExisting2},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: lastExpectedRV,
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
},
}},
{Type: watch.Added, Object: pod3},
}
var transformerInvoked atomic.Int32
s := test.storeBuilder(&transformerInvoked)
var once sync.Once
lw := &ListWatch{
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
}()
return fw, nil
},
// ListFunc should never be used in WatchList mode
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
return nil, errors.New("list call not expected in WatchList mode")
},
}
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
r := NewReflector(lw, &v1.Pod{}, s, 0)
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.RunWithContext(ctx)
}()
// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}
informerItems := test.items(s)
if want, got := 3, len(informerItems); want != got {
t.Errorf("expected informer to contain %d objects, but got: %d", want, got)
}
for _, item := range informerItems {
cast := item.(*v1.Pod)
if cast.Spec.Hostname != "transformed" {
t.Error("Object was not transformed prior to replacement")
}
}
},
afterReplace: func(rv string, err error) {},
transformer: func(i interface{}) (interface{}, error) {
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
},
}
var once sync.Once
lw := &ListWatch{
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
}()
return fw, nil
},
// ListFunc should never be used in WatchList mode
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
return nil, errors.New("list call not expected in WatchList mode")
},
}
// Transformer should have been invoked twice for the initial sync in the informer on the temporary store,
// then twice on replace, then once on the following update.
if want, got := 5, int(transformerInvoked.Load()); want != got {
t.Errorf("expected transformer to be invoked %d times, but got: %d", want, got)
}
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
r := NewReflector(lw, &v1.Pod{}, store, 0)
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.RunWithContext(ctx)
}()
// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}
if want, got := 1, int(replaceInvoked.Load()); want != got {
t.Errorf("expected replace to be invoked %d times, but got: %d", want, got)
}
cancel()
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
cancel()
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
}
})
}
}
type fakeStore struct {
Store
ReflectorStore
beforeReplace func(list []interface{}, s string)
afterReplace func(rv string, err error)
transformer TransformFunc
}
func (f *fakeStore) Replace(list []interface{}, rv string) error {
f.beforeReplace(list, rv)
err := f.Store.Replace(list, rv)
err := f.ReflectorStore.Replace(list, rv)
f.afterReplace(rv, err)
return err
}
func (f *fakeStore) Transformer() TransformFunc {
return f.transformer
}
func BenchmarkExtractList(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)

View File

@@ -539,20 +539,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFOWithOptions(RealFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
KnownObjects: s.indexer,
Transformer: s.transform,
})
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
}
fifo := newQueueFIFO(s.indexer, s.transform)
cfg := &Config{
Queue: fifo,

View File

@@ -89,7 +89,8 @@ type RealFIFO struct {
}
var (
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
_ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations
)
// Close the queue.

View File

@@ -45,16 +45,28 @@ func IsDataConsistencyDetectionForWatchListEnabled() bool {
return dataConsistencyDetectionForWatchListEnabled
}
// SetDataConsistencyDetectionForWatchListEnabledForTest allows to enable/disable data consistency detection for testing purposes.
// It returns a function that restores the original value.
func SetDataConsistencyDetectionForWatchListEnabledForTest(enabled bool) func() {
original := dataConsistencyDetectionForWatchListEnabled
dataConsistencyDetectionForWatchListEnabled = enabled
return func() {
dataConsistencyDetectionForWatchListEnabled = original
}
}
type RetrieveItemsFunc[U any] func() []U
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
type TransformFunc func(interface{}) (interface{}, error)
// CheckDataConsistency exists solely for testing purposes.
// we cannot use checkWatchListDataConsistencyIfRequested because
// it is guarded by an environmental variable.
// we cannot manipulate the environmental variable because
// it will affect other tests in this package.
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listItemTransformFunc TransformFunc, listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
if !canFormAdditionalListCall(lastSyncedResourceVersion, listOptions) {
klog.V(4).Infof("data consistency check for %s is enabled but the parameters (RV, ListOptions) doesn't allow for creating a valid LIST request. Skipping the data consistency check.", identity)
return
@@ -84,6 +96,15 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
if err != nil {
panic(err) // this should never happen
}
if listItemTransformFunc != nil {
for i := range rawListItems {
obj, err := listItemTransformFunc(rawListItems[i])
if err != nil {
panic(err)
}
rawListItems[i] = obj.(runtime.Object)
}
}
listItems := toMetaObjectSliceOrDie(rawListItems)
sort.Sort(byUID(listItems))

View File

@@ -215,10 +215,10 @@ func TestDataConsistencyChecker(t *testing.T) {
if scenario.expectPanic {
require.Panics(t, func() {
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
})
} else {
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
}
require.Equal(t, scenario.expectedListRequests, fakeLister.counter)
@@ -235,7 +235,7 @@ func TestDataConsistencyCheckerRetry(t *testing.T) {
stopListErrorAfter := 5
fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter}
CheckDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc)
CheckDataConsistency(ctx, "", "", fakeErrLister.List, nil, metav1.ListOptions{}, retrievedItemsFunc)
require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter)
}