mirror of
https://github.com/kubernetes/client-go.git
synced 2026-06-10 03:07:20 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b1c7d7bb62 |
10
go.mod
10
go.mod
@@ -7,7 +7,7 @@ go 1.24.0
|
||||
godebug default=go1.24
|
||||
|
||||
require (
|
||||
github.com/go-logr/logr v1.4.3
|
||||
github.com/go-logr/logr v1.4.2
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/google/gnostic-models v0.7.0
|
||||
github.com/google/go-cmp v0.7.0
|
||||
@@ -17,7 +17,7 @@ require (
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible
|
||||
github.com/spf13/pflag v1.0.6
|
||||
github.com/stretchr/testify v1.11.1
|
||||
github.com/stretchr/testify v1.10.0
|
||||
go.uber.org/goleak v1.3.0
|
||||
golang.org/x/net v0.38.0
|
||||
golang.org/x/oauth2 v0.27.0
|
||||
@@ -25,8 +25,8 @@ require (
|
||||
golang.org/x/time v0.9.0
|
||||
google.golang.org/protobuf v1.36.5
|
||||
gopkg.in/evanphx/json-patch.v4 v4.12.0
|
||||
k8s.io/api v0.34.8
|
||||
k8s.io/apimachinery v0.34.8
|
||||
k8s.io/api v0.34.0
|
||||
k8s.io/apimachinery v0.34.0
|
||||
k8s.io/klog/v2 v2.130.1
|
||||
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b
|
||||
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
|
||||
@@ -48,7 +48,7 @@ require (
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/moby/spdystream v0.5.1 // indirect
|
||||
github.com/moby/spdystream v0.5.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
|
||||
|
||||
24
go.sum
24
go.sum
@@ -8,8 +8,8 @@ github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf
|
||||
github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
|
||||
github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM=
|
||||
github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ=
|
||||
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
|
||||
github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ=
|
||||
github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY=
|
||||
@@ -52,8 +52,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/moby/spdystream v0.5.1 h1:9sNYeYZUcci9R6/w7KDaFWEWeV4LStVG78Mpyq/Zm/Y=
|
||||
github.com/moby/spdystream v0.5.1/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI=
|
||||
github.com/moby/spdystream v0.5.0 h1:7r0J1Si3QO/kjRitvSLVVFUjxMEb/YLj6S9FF62JBCU=
|
||||
github.com/moby/spdystream v0.5.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
@@ -74,8 +74,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
|
||||
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
@@ -87,8 +87,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
@@ -150,10 +150,10 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
k8s.io/api v0.34.8 h1:PATray7Ixf5fzuvW6WTrLGqWpaeSUlV3NqygBu10YC0=
|
||||
k8s.io/api v0.34.8/go.mod h1:kkv5KLONEBxemk51GDsqR0TaBig60bVriYyDj9v3gg8=
|
||||
k8s.io/apimachinery v0.34.8 h1:Vjyd/TdakksL0g7ikxyaAk3s3qgqRRYz9vj0AudwNzc=
|
||||
k8s.io/apimachinery v0.34.8/go.mod h1:z7dd12Xd400CXIycE8nmn32xZhApV9zskHs0A5xeU/Q=
|
||||
k8s.io/api v0.34.0 h1:L+JtP2wDbEYPUeNGbeSa/5GwFtIA662EmT2YSLOkAVE=
|
||||
k8s.io/api v0.34.0/go.mod h1:YzgkIzOOlhl9uwWCZNqpw6RJy9L2FK4dlJeayUoydug=
|
||||
k8s.io/apimachinery v0.34.0 h1:eR1WO5fo0HyoQZt1wdISpFDffnWOvFLOOeJ7MgIv4z0=
|
||||
k8s.io/apimachinery v0.34.0/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw=
|
||||
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
|
||||
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
|
||||
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA=
|
||||
|
||||
23
tools/cache/controller.go
vendored
23
tools/cache/controller.go
vendored
@@ -596,7 +596,16 @@ func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
|
||||
fifo := newQueueFIFO(clientState, options.Transform)
|
||||
var fifo Queue
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
fifo = NewRealFIFO(MetaNamespaceKeyFunc, clientState, options.Transform)
|
||||
} else {
|
||||
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: clientState,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: options.Transform,
|
||||
})
|
||||
}
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
@@ -614,15 +623,3 @@ func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
}
|
||||
return New(cfg)
|
||||
}
|
||||
|
||||
func newQueueFIFO(clientState Store, transform TransformFunc) Queue {
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
return NewRealFIFO(MetaNamespaceKeyFunc, clientState, transform)
|
||||
} else {
|
||||
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: clientState,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: transform,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
3
tools/cache/delta_fifo.go
vendored
3
tools/cache/delta_fifo.go
vendored
@@ -270,8 +270,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
||||
}
|
||||
|
||||
var (
|
||||
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
|
||||
_ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
|
||||
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
22
tools/cache/delta_fifo_test.go
vendored
22
tools/cache/delta_fifo_test.go
vendored
@@ -28,7 +28,7 @@ import (
|
||||
// from the most recent Delta.
|
||||
// You should treat the items returned inside the deltas as immutable.
|
||||
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
|
||||
func (f *DeltaFIFO) list() []interface{} {
|
||||
func (f *DeltaFIFO) List() []interface{} {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
return f.listLocked()
|
||||
@@ -46,7 +46,7 @@ func (f *DeltaFIFO) listLocked() []interface{} {
|
||||
// ListKeys returns a list of all the keys of the objects currently
|
||||
// in the FIFO.
|
||||
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
|
||||
func (f *DeltaFIFO) listKeys() []string {
|
||||
func (f *DeltaFIFO) ListKeys() []string {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
list := make([]string, 0, len(f.queue))
|
||||
@@ -60,19 +60,19 @@ func (f *DeltaFIFO) listKeys() []string {
|
||||
// or sets exists=false.
|
||||
// You should treat the items returned inside the deltas as immutable.
|
||||
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
|
||||
func (f *DeltaFIFO) get(obj interface{}) (item interface{}, exists bool, err error) {
|
||||
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
|
||||
key, err := f.KeyOf(obj)
|
||||
if err != nil {
|
||||
return nil, false, KeyError{obj, err}
|
||||
}
|
||||
return f.getByKey(key)
|
||||
return f.GetByKey(key)
|
||||
}
|
||||
|
||||
// GetByKey returns the complete list of deltas for the requested item,
|
||||
// setting exists=false if that list is empty.
|
||||
// You should treat the items returned inside the deltas as immutable.
|
||||
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
|
||||
func (f *DeltaFIFO) getByKey(key string) (item interface{}, exists bool, err error) {
|
||||
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
d, exists := f.items[key]
|
||||
@@ -320,10 +320,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
|
||||
f.Update(mkFifoObj("foo", 12))
|
||||
f.Delete(mkFifoObj("foo", 15))
|
||||
|
||||
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.list(); !reflect.DeepEqual(e, a) {
|
||||
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %+v, got %+v", e, a)
|
||||
}
|
||||
if e, a := []string{"foo"}, f.listKeys(); !reflect.DeepEqual(e, a) {
|
||||
if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %+v, got %+v", e, a)
|
||||
}
|
||||
|
||||
@@ -349,7 +349,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
|
||||
t.Errorf("Got second value %v", unexpected.val)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
_, exists, _ := f.get(mkFifoObj("foo", ""))
|
||||
_, exists, _ := f.Get(mkFifoObj("foo", ""))
|
||||
if exists {
|
||||
t.Errorf("item did not get removed")
|
||||
}
|
||||
@@ -397,7 +397,7 @@ func TestDeltaFIFO_transformer(t *testing.T) {
|
||||
must(f.Replace([]interface{}{}, ""))
|
||||
|
||||
// Should be empty
|
||||
if e, a := []string{"foo", "bar"}, f.listKeys(); !reflect.DeepEqual(e, a) {
|
||||
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %+v, got %+v", e, a)
|
||||
}
|
||||
|
||||
@@ -507,7 +507,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
|
||||
t.Errorf("Got second value %v", unexpected.val)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
_, exists, _ := f.get(mkFifoObj("foo", ""))
|
||||
_, exists, _ := f.Get(mkFifoObj("foo", ""))
|
||||
if exists {
|
||||
t.Errorf("item did not get removed")
|
||||
}
|
||||
@@ -991,7 +991,7 @@ func BenchmarkDeltaFIFOListKeys(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_ = f.listKeys()
|
||||
_ = f.ListKeys()
|
||||
}
|
||||
})
|
||||
b.StopTimer()
|
||||
|
||||
13
tools/cache/reflector.go
vendored
13
tools/cache/reflector.go
vendored
@@ -80,7 +80,7 @@ type ReflectorStore interface {
|
||||
// TransformingStore is an optional interface that can be implemented by the provided store.
|
||||
// If implemented on the provided store reflector will use the same transformer in its internal stores.
|
||||
type TransformingStore interface {
|
||||
ReflectorStore
|
||||
Store
|
||||
Transformer() TransformFunc
|
||||
}
|
||||
|
||||
@@ -419,10 +419,7 @@ func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
logger.V(4).Info(
|
||||
"Data couldn't be fetched in watchlist mode. Falling back to regular list. This is expected if watchlist is not supported or disabled in kube-apiserver.",
|
||||
"err", err,
|
||||
)
|
||||
logger.Error(err, "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking")
|
||||
fallbackToList = true
|
||||
// ensure that we won't accidentally pass some garbage down the watch.
|
||||
w = nil
|
||||
@@ -729,11 +726,9 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
|
||||
return false
|
||||
}
|
||||
|
||||
var transformer TransformFunc
|
||||
storeOpts := []StoreOption{}
|
||||
if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil {
|
||||
transformer = tr.Transformer()
|
||||
storeOpts = append(storeOpts, WithTransformer(transformer))
|
||||
storeOpts = append(storeOpts, WithTransformer(tr.Transformer()))
|
||||
}
|
||||
|
||||
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
|
||||
@@ -793,7 +788,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
|
||||
// we utilize the temporaryStore to ensure independence from the current store implementation.
|
||||
// as of today, the store is implemented as a queue and will be drained by the higher-level
|
||||
// component as soon as it finishes replacing the content.
|
||||
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, transformer, temporaryStore.List)
|
||||
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List)
|
||||
|
||||
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
|
||||
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)
|
||||
|
||||
@@ -33,11 +33,11 @@ import (
|
||||
//
|
||||
// Note that this function will panic when data inconsistency is detected.
|
||||
// This is intentional because we want to catch it in the CI.
|
||||
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], listItemTransformFunc func(interface{}) (interface{}, error), retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
|
||||
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
|
||||
if !consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
|
||||
return
|
||||
}
|
||||
// for informers we pass an empty ListOptions because
|
||||
// listFn might be wrapped for filtering during informer construction.
|
||||
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, listItemTransformFunc, metav1.ListOptions{}, retrieveItemsFn)
|
||||
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
|
||||
}
|
||||
|
||||
@@ -1,114 +0,0 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
clientfeatures "k8s.io/client-go/features"
|
||||
clientfeaturestesting "k8s.io/client-go/features/testing"
|
||||
"k8s.io/client-go/util/consistencydetector"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
func TestReflectorDataConsistencyDetector(t *testing.T) {
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
|
||||
restore := consistencydetector.SetDataConsistencyDetectionForWatchListEnabledForTest(true)
|
||||
defer restore()
|
||||
|
||||
markTransformed := func(obj interface{}) (interface{}, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return obj, nil
|
||||
}
|
||||
newPod := pod.DeepCopy()
|
||||
if newPod.Labels == nil {
|
||||
newPod.Labels = make(map[string]string)
|
||||
}
|
||||
newPod.Labels["transformed"] = "true"
|
||||
return newPod, nil
|
||||
}
|
||||
|
||||
for _, inOrder := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("InOrder=%v", inOrder), func(t *testing.T) {
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.InOrderInformers, inOrder)
|
||||
for _, transformerEnabled := range []bool{false, true} {
|
||||
var transformer TransformFunc
|
||||
if transformerEnabled {
|
||||
transformer = markTransformed
|
||||
}
|
||||
t.Run(fmt.Sprintf("Transformer=%v", transformerEnabled), func(t *testing.T) {
|
||||
runTestReflectorDataConsistencyDetector(t, transformer)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
store := NewStore(MetaNamespaceKeyFunc)
|
||||
fifo := newQueueFIFO(store, transformer)
|
||||
|
||||
lw := &ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return &v1.PodList{
|
||||
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
|
||||
Items: []v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}},
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
w := watch.NewFake()
|
||||
go func() {
|
||||
w.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}})
|
||||
w.Action(watch.Bookmark, &v1.Pod{ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-1",
|
||||
ResourceVersion: "1",
|
||||
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
|
||||
}})
|
||||
}()
|
||||
return w, nil
|
||||
},
|
||||
}
|
||||
|
||||
r := NewReflector(lw, &v1.Pod{}, fifo, 0)
|
||||
|
||||
go func() {
|
||||
_ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
return r.LastSyncResourceVersion() != "", nil
|
||||
})
|
||||
cancel()
|
||||
}()
|
||||
|
||||
err := r.ListAndWatchWithContext(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("ListAndWatchWithContext returned error: %v", err)
|
||||
}
|
||||
}
|
||||
236
tools/cache/reflector_test.go
vendored
236
tools/cache/reflector_test.go
vendored
@@ -20,12 +20,10 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -1964,7 +1962,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
|
||||
s := NewFIFO(MetaNamespaceKeyFunc)
|
||||
var replaceInvoked atomic.Int32
|
||||
store := &fakeStore{
|
||||
ReflectorStore: s,
|
||||
Store: s,
|
||||
beforeReplace: func(list []interface{}, rv string) {
|
||||
// interested in the Replace call that happens after the Error event
|
||||
if rv == lastExpectedRV {
|
||||
@@ -2059,165 +2057,131 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReflectorRespectStoreTransformer(t *testing.T) {
|
||||
for name, test := range map[string]struct {
|
||||
storeBuilder func(counter *atomic.Int32) ReflectorStore
|
||||
items func(rs ReflectorStore) []interface{}
|
||||
}{
|
||||
"real-fifo": {
|
||||
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
|
||||
return NewRealFIFO(MetaNamespaceKeyFunc, NewStore(MetaNamespaceKeyFunc), func(i interface{}) (interface{}, error) {
|
||||
counter.Add(1)
|
||||
cast := i.(*v1.Pod)
|
||||
cast.Spec.Hostname = "transformed"
|
||||
return cast, nil
|
||||
})
|
||||
mkPod := func(id string, rv string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
|
||||
Spec: v1.PodSpec{
|
||||
Hostname: "test",
|
||||
},
|
||||
items: func(rs ReflectorStore) []interface{} {
|
||||
store := rs.(*RealFIFO)
|
||||
objects := make(map[string]interface{})
|
||||
for _, item := range store.getItems() {
|
||||
key, _ := store.keyFunc(item.Object)
|
||||
if item.Type == Deleted {
|
||||
delete(objects, key)
|
||||
} else {
|
||||
objects[key] = item.Object
|
||||
}
|
||||
}
|
||||
return slices.Collect(maps.Values(objects))
|
||||
},
|
||||
},
|
||||
"delta-fifo": {
|
||||
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
|
||||
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KeyFunction: MetaNamespaceKeyFunc,
|
||||
Transformer: func(i interface{}) (interface{}, error) {
|
||||
counter.Add(1)
|
||||
cast := i.(*v1.Pod)
|
||||
cast.Spec.Hostname = "transformed"
|
||||
return cast, nil
|
||||
},
|
||||
})
|
||||
},
|
||||
items: func(rs ReflectorStore) []interface{} {
|
||||
return rs.(*DeltaFIFO).list()
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
mkPod := func(id string, rv string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
|
||||
Spec: v1.PodSpec{
|
||||
Hostname: "test",
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
preExisting1 := mkPod("foo-1", "1")
|
||||
preExisting2 := mkPod("foo-2", "2")
|
||||
pod3 := mkPod("foo-3", "3")
|
||||
preExisting1 := mkPod("foo-1", "1")
|
||||
preExisting2 := mkPod("foo-2", "2")
|
||||
pod3 := mkPod("foo-3", "3")
|
||||
|
||||
lastExpectedRV := "3"
|
||||
events := []watch.Event{
|
||||
{Type: watch.Added, Object: preExisting1},
|
||||
{Type: watch.Added, Object: preExisting2},
|
||||
{Type: watch.Bookmark, Object: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
ResourceVersion: lastExpectedRV,
|
||||
Annotations: map[string]string{
|
||||
metav1.InitialEventsAnnotationKey: "true",
|
||||
},
|
||||
},
|
||||
}},
|
||||
{Type: watch.Added, Object: pod3},
|
||||
}
|
||||
|
||||
var transformerInvoked atomic.Int32
|
||||
s := test.storeBuilder(&transformerInvoked)
|
||||
|
||||
var once sync.Once
|
||||
lw := &ListWatch{
|
||||
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
once.Do(func() {
|
||||
for _, e := range events {
|
||||
fw.Action(e.Type, e.Object)
|
||||
}
|
||||
})
|
||||
}()
|
||||
return fw, nil
|
||||
lastExpectedRV := "3"
|
||||
events := []watch.Event{
|
||||
{Type: watch.Added, Object: preExisting1},
|
||||
{Type: watch.Added, Object: preExisting2},
|
||||
{Type: watch.Bookmark, Object: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
ResourceVersion: lastExpectedRV,
|
||||
Annotations: map[string]string{
|
||||
metav1.InitialEventsAnnotationKey: "true",
|
||||
},
|
||||
// ListFunc should never be used in WatchList mode
|
||||
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
|
||||
return nil, errors.New("list call not expected in WatchList mode")
|
||||
},
|
||||
}
|
||||
},
|
||||
}},
|
||||
{Type: watch.Added, Object: pod3},
|
||||
}
|
||||
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
|
||||
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
r.RunWithContext(ctx)
|
||||
}()
|
||||
|
||||
// wait for the RV to sync to the version returned by the final list
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
|
||||
s := NewFIFO(MetaNamespaceKeyFunc)
|
||||
var replaceInvoked atomic.Int32
|
||||
store := &fakeStore{
|
||||
Store: s,
|
||||
beforeReplace: func(list []interface{}, rv string) {
|
||||
replaceInvoked.Add(1)
|
||||
// Only two pods are present at the point when Replace is called.
|
||||
if len(list) != 2 {
|
||||
t.Errorf("unexpected nb of objects: expected 2 received %d", len(list))
|
||||
}
|
||||
|
||||
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
|
||||
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
|
||||
}
|
||||
|
||||
informerItems := test.items(s)
|
||||
if want, got := 3, len(informerItems); want != got {
|
||||
t.Errorf("expected informer to contain %d objects, but got: %d", want, got)
|
||||
}
|
||||
for _, item := range informerItems {
|
||||
cast := item.(*v1.Pod)
|
||||
for _, obj := range list {
|
||||
cast := obj.(*v1.Pod)
|
||||
if cast.Spec.Hostname != "transformed" {
|
||||
t.Error("Object was not transformed prior to replacement")
|
||||
}
|
||||
}
|
||||
},
|
||||
afterReplace: func(rv string, err error) {},
|
||||
transformer: func(i interface{}) (interface{}, error) {
|
||||
cast := i.(*v1.Pod)
|
||||
cast.Spec.Hostname = "transformed"
|
||||
return cast, nil
|
||||
},
|
||||
}
|
||||
|
||||
// Transformer should have been invoked twice for the initial sync in the informer on the temporary store,
|
||||
// then twice on replace, then once on the following update.
|
||||
if want, got := 5, int(transformerInvoked.Load()); want != got {
|
||||
t.Errorf("expected transformer to be invoked %d times, but got: %d", want, got)
|
||||
}
|
||||
var once sync.Once
|
||||
lw := &ListWatch{
|
||||
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
once.Do(func() {
|
||||
for _, e := range events {
|
||||
fw.Action(e.Type, e.Object)
|
||||
}
|
||||
})
|
||||
}()
|
||||
return fw, nil
|
||||
},
|
||||
// ListFunc should never be used in WatchList mode
|
||||
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
|
||||
return nil, errors.New("list call not expected in WatchList mode")
|
||||
},
|
||||
}
|
||||
|
||||
cancel()
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("timed out waiting for Run to return")
|
||||
}
|
||||
})
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
|
||||
r := NewReflector(lw, &v1.Pod{}, store, 0)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
r.RunWithContext(ctx)
|
||||
}()
|
||||
|
||||
// wait for the RV to sync to the version returned by the final list
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
|
||||
}
|
||||
|
||||
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
|
||||
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
|
||||
}
|
||||
if want, got := 1, int(replaceInvoked.Load()); want != got {
|
||||
t.Errorf("expected replace to be invoked %d times, but got: %d", want, got)
|
||||
}
|
||||
|
||||
cancel()
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("timed out waiting for Run to return")
|
||||
}
|
||||
}
|
||||
|
||||
type fakeStore struct {
|
||||
ReflectorStore
|
||||
Store
|
||||
beforeReplace func(list []interface{}, s string)
|
||||
afterReplace func(rv string, err error)
|
||||
transformer TransformFunc
|
||||
}
|
||||
|
||||
func (f *fakeStore) Replace(list []interface{}, rv string) error {
|
||||
f.beforeReplace(list, rv)
|
||||
err := f.ReflectorStore.Replace(list, rv)
|
||||
err := f.Store.Replace(list, rv)
|
||||
f.afterReplace(rv, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *fakeStore) Transformer() TransformFunc {
|
||||
return f.transformer
|
||||
}
|
||||
|
||||
func BenchmarkExtractList(b *testing.B) {
|
||||
_, _, podList := getPodListItems(0, fakeItemsNum)
|
||||
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
|
||||
|
||||
11
tools/cache/shared_informer.go
vendored
11
tools/cache/shared_informer.go
vendored
@@ -539,7 +539,16 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
fifo := newQueueFIFO(s.indexer, s.transform)
|
||||
var fifo Queue
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
fifo = NewRealFIFO(MetaNamespaceKeyFunc, s.indexer, s.transform)
|
||||
} else {
|
||||
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: s.indexer,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: s.transform,
|
||||
})
|
||||
}
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
|
||||
3
tools/cache/the_real_fifo.go
vendored
3
tools/cache/the_real_fifo.go
vendored
@@ -61,8 +61,7 @@ type RealFIFO struct {
|
||||
}
|
||||
|
||||
var (
|
||||
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
|
||||
_ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations
|
||||
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
|
||||
)
|
||||
|
||||
// Close the queue.
|
||||
|
||||
@@ -77,9 +77,6 @@ func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error
|
||||
ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)
|
||||
|
||||
if ll.Labels != nil {
|
||||
if ll.lease.Labels == nil {
|
||||
ll.lease.Labels = map[string]string{}
|
||||
}
|
||||
// Only overwrite the labels that are specifically set
|
||||
for k, v := range ll.Labels {
|
||||
ll.lease.Labels[k] = v
|
||||
|
||||
@@ -266,7 +266,7 @@ func TestLeaseConversion(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateWithNilLabelsOnLease(t *testing.T) {
|
||||
func TestUpdateWithNilLabels(t *testing.T) {
|
||||
setup()
|
||||
|
||||
// Create initial lease
|
||||
@@ -278,33 +278,23 @@ func TestUpdateWithNilLabelsOnLease(t *testing.T) {
|
||||
t.Fatalf("Failed to get lease: %v", err)
|
||||
}
|
||||
|
||||
leaseLock.lease.Labels = nil
|
||||
|
||||
leaseLock.Labels = map[string]string{"custom-key": "custom-val"}
|
||||
|
||||
// Update should succeed even with nil Labels on the lease itself
|
||||
if err := leaseLock.Update(context.Background(), testRecord); err != nil {
|
||||
t.Errorf("Update failed with nil Labels: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateWithNilLabelsOnLeaseLock(t *testing.T) {
|
||||
setup()
|
||||
|
||||
// Create initial lease
|
||||
if err := leaseLock.Create(context.Background(), testRecord); err != nil {
|
||||
t.Fatalf("Failed to create lease: %v", err)
|
||||
}
|
||||
// Get the lease to initialize leaseLock.lease
|
||||
if _, _, err := leaseLock.Get(context.Background()); err != nil {
|
||||
t.Fatalf("Failed to get lease: %v", err)
|
||||
}
|
||||
|
||||
leaseLock.Labels = nil
|
||||
|
||||
leaseLock.lease.Labels = map[string]string{"custom-key": "custom-val"}
|
||||
|
||||
// Update should succeed even with nil Labels on the leaselock
|
||||
// Update labels
|
||||
lease, err := leaseLock.Client.Leases(testNamespace).Update(context.Background(), leaseLock.lease, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update lease labels: %v", err)
|
||||
}
|
||||
|
||||
val, exists := lease.Labels["custom-key"]
|
||||
if !exists {
|
||||
t.Error("Label was overidden on the lease")
|
||||
}
|
||||
if val != "custom-val" {
|
||||
t.Errorf("Label value mismatch, got %q want %q", val, "custom-val")
|
||||
}
|
||||
|
||||
// Update should succeed even with nil Labels
|
||||
if err := leaseLock.Update(context.Background(), testRecord); err != nil {
|
||||
t.Errorf("Update failed with nil Labels: %v", err)
|
||||
}
|
||||
|
||||
@@ -75,15 +75,13 @@ func NewSelfSignedCACert(cfg Config, key crypto.Signer) (*x509.Certificate, erro
|
||||
CommonName: cfg.CommonName,
|
||||
Organization: cfg.Organization,
|
||||
},
|
||||
DNSNames: []string{cfg.CommonName},
|
||||
NotBefore: notBefore,
|
||||
NotAfter: now.Add(duration365d * 10).UTC(),
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
||||
BasicConstraintsValid: true,
|
||||
IsCA: true,
|
||||
}
|
||||
if len(cfg.CommonName) > 0 {
|
||||
tmpl.DNSNames = []string{cfg.CommonName}
|
||||
}
|
||||
|
||||
certDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &tmpl, &tmpl, key.Public(), key)
|
||||
if err != nil {
|
||||
|
||||
@@ -45,28 +45,16 @@ func IsDataConsistencyDetectionForWatchListEnabled() bool {
|
||||
return dataConsistencyDetectionForWatchListEnabled
|
||||
}
|
||||
|
||||
// SetDataConsistencyDetectionForWatchListEnabledForTest allows to enable/disable data consistency detection for testing purposes.
|
||||
// It returns a function that restores the original value.
|
||||
func SetDataConsistencyDetectionForWatchListEnabledForTest(enabled bool) func() {
|
||||
original := dataConsistencyDetectionForWatchListEnabled
|
||||
dataConsistencyDetectionForWatchListEnabled = enabled
|
||||
return func() {
|
||||
dataConsistencyDetectionForWatchListEnabled = original
|
||||
}
|
||||
}
|
||||
|
||||
type RetrieveItemsFunc[U any] func() []U
|
||||
|
||||
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
|
||||
|
||||
type TransformFunc func(interface{}) (interface{}, error)
|
||||
|
||||
// CheckDataConsistency exists solely for testing purposes.
|
||||
// we cannot use checkWatchListDataConsistencyIfRequested because
|
||||
// it is guarded by an environmental variable.
|
||||
// we cannot manipulate the environmental variable because
|
||||
// it will affect other tests in this package.
|
||||
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listItemTransformFunc TransformFunc, listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
|
||||
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
|
||||
if !canFormAdditionalListCall(lastSyncedResourceVersion, listOptions) {
|
||||
klog.V(4).Infof("data consistency check for %s is enabled but the parameters (RV, ListOptions) doesn't allow for creating a valid LIST request. Skipping the data consistency check.", identity)
|
||||
return
|
||||
@@ -96,15 +84,6 @@ func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity
|
||||
if err != nil {
|
||||
panic(err) // this should never happen
|
||||
}
|
||||
if listItemTransformFunc != nil {
|
||||
for i := range rawListItems {
|
||||
obj, err := listItemTransformFunc(rawListItems[i])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawListItems[i] = obj.(runtime.Object)
|
||||
}
|
||||
}
|
||||
listItems := toMetaObjectSliceOrDie(rawListItems)
|
||||
|
||||
sort.Sort(byUID(listItems))
|
||||
|
||||
@@ -215,10 +215,10 @@ func TestDataConsistencyChecker(t *testing.T) {
|
||||
|
||||
if scenario.expectPanic {
|
||||
require.Panics(t, func() {
|
||||
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
|
||||
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
|
||||
})
|
||||
} else {
|
||||
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, nil, scenario.requestOptions, retrievedItemsFunc)
|
||||
CheckDataConsistency(ctx, "", scenario.lastSyncedResourceVersion, fakeLister.List, scenario.requestOptions, retrievedItemsFunc)
|
||||
}
|
||||
|
||||
require.Equal(t, scenario.expectedListRequests, fakeLister.counter)
|
||||
@@ -235,7 +235,7 @@ func TestDataConsistencyCheckerRetry(t *testing.T) {
|
||||
stopListErrorAfter := 5
|
||||
fakeErrLister := &errorLister{stopErrorAfter: stopListErrorAfter}
|
||||
|
||||
CheckDataConsistency(ctx, "", "", fakeErrLister.List, nil, metav1.ListOptions{}, retrievedItemsFunc)
|
||||
CheckDataConsistency(ctx, "", "", fakeErrLister.List, metav1.ListOptions{}, retrievedItemsFunc)
|
||||
require.Equal(t, fakeErrLister.listCounter, fakeErrLister.stopErrorAfter)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user