diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 2072477ba..51e0a4659 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -708,20 +708,7 @@ func newInformer(clientState Store, options InformerOptions) Controller { // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - var fifo Queue - if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { - fifo = NewRealFIFOWithOptions(RealFIFOOptions{ - KeyFunction: MetaNamespaceKeyFunc, - KnownObjects: clientState, - Transformer: options.Transform, - }) - } else { - fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: clientState, - EmitDeltaTypeReplaced: true, - Transformer: options.Transform, - }) - } + fifo := newQueueFIFO(clientState, options.Transform) cfg := &Config{ Queue: fifo, @@ -742,3 +729,19 @@ func newInformer(clientState Store, options InformerOptions) Controller { } return New(cfg) } + +func newQueueFIFO(clientState Store, transform TransformFunc) Queue { + if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { + return NewRealFIFOWithOptions(RealFIFOOptions{ + KeyFunction: MetaNamespaceKeyFunc, + KnownObjects: clientState, + Transformer: transform, + }) + } else { + return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: clientState, + EmitDeltaTypeReplaced: true, + Transformer: transform, + }) + } +} diff --git a/tools/cache/reflector_data_consistency_detector_test.go b/tools/cache/reflector_data_consistency_detector_test.go new file mode 100644 index 000000000..1d18d0be1 --- /dev/null +++ b/tools/cache/reflector_data_consistency_detector_test.go @@ -0,0 +1,114 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "context" + "fmt" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + clientfeatures "k8s.io/client-go/features" + clientfeaturestesting "k8s.io/client-go/features/testing" + "k8s.io/client-go/util/consistencydetector" + "k8s.io/klog/v2/ktesting" +) + +func TestReflectorDataConsistencyDetector(t *testing.T) { + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) + restore := consistencydetector.SetDataConsistencyDetectionForWatchListEnabledForTest(true) + defer restore() + + markTransformed := func(obj interface{}) (interface{}, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return obj, nil + } + newPod := pod.DeepCopy() + if newPod.Labels == nil { + newPod.Labels = make(map[string]string) + } + newPod.Labels["transformed"] = "true" + return newPod, nil + } + + for _, inOrder := range []bool{false, true} { + t.Run(fmt.Sprintf("InOrder=%v", inOrder), func(t *testing.T) { + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.InOrderInformers, inOrder) + for _, transformerEnabled := range []bool{false, true} { + var transformer TransformFunc + if transformerEnabled { + transformer = markTransformed + } + t.Run(fmt.Sprintf("Transformer=%v", transformerEnabled), func(t *testing.T) { + runTestReflectorDataConsistencyDetector(t, transformer) + }) + } + }) + } +} + +func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + store := NewStore(MetaNamespaceKeyFunc) + fifo := newQueueFIFO(store, transformer) + + lw := &ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return &v1.PodList{ + ListMeta: metav1.ListMeta{ResourceVersion: "1"}, + Items: []v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}}, + }, + }, nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + w := watch.NewFake() + go func() { + w.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}}) + w.Action(watch.Bookmark, &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + ResourceVersion: "1", + Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + }}) + }() + return w, nil + }, + } + + r := NewReflector(lw, &v1.Pod{}, fifo, 0) + + go func() { + _ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + return r.LastSyncResourceVersion() != "", nil + }) + cancel() + }() + + err := r.ListAndWatchWithContext(ctx) + if err != nil { + t.Errorf("ListAndWatchWithContext returned error: %v", err) + } +} diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index f5c279ac3..8973a33e8 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -539,20 +539,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - var fifo Queue - if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { - fifo = NewRealFIFOWithOptions(RealFIFOOptions{ - KeyFunction: MetaNamespaceKeyFunc, - KnownObjects: s.indexer, - Transformer: s.transform, - }) - } else { - fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: s.indexer, - EmitDeltaTypeReplaced: true, - Transformer: s.transform, - }) - } + fifo := newQueueFIFO(s.indexer, s.transform) cfg := &Config{ Queue: fifo, diff --git a/util/consistencydetector/data_consistency_detector.go b/util/consistencydetector/data_consistency_detector.go index 06f172d82..2e883bf4a 100644 --- a/util/consistencydetector/data_consistency_detector.go +++ b/util/consistencydetector/data_consistency_detector.go @@ -45,6 +45,16 @@ func IsDataConsistencyDetectionForWatchListEnabled() bool { return dataConsistencyDetectionForWatchListEnabled } +// SetDataConsistencyDetectionForWatchListEnabledForTest allows to enable/disable data consistency detection for testing purposes. +// It returns a function that restores the original value. +func SetDataConsistencyDetectionForWatchListEnabledForTest(enabled bool) func() { + original := dataConsistencyDetectionForWatchListEnabled + dataConsistencyDetectionForWatchListEnabled = enabled + return func() { + dataConsistencyDetectionForWatchListEnabled = original + } +} + type RetrieveItemsFunc[U any] func() []U type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)