Add unit tests for Data Consistency Detector

This commit is contained in:
Valerian Roche
2025-07-28 13:53:34 -04:00
committed by Marek Siarkowicz
parent be4a4f5c20
commit 86c4e09a78
4 changed files with 138 additions and 20 deletions

View File

@@ -596,16 +596,7 @@ func newInformer(clientState Store, options InformerOptions) Controller {
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFO(MetaNamespaceKeyFunc, clientState, options.Transform)
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: options.Transform,
})
}
fifo := newQueueFIFO(clientState, options.Transform)
cfg := &Config{
Queue: fifo,
@@ -623,3 +614,15 @@ func newInformer(clientState Store, options InformerOptions) Controller {
}
return New(cfg)
}
func newQueueFIFO(clientState Store, transform TransformFunc) Queue {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
return NewRealFIFO(MetaNamespaceKeyFunc, clientState, transform)
} else {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transform,
})
}
}

View File

@@ -0,0 +1,114 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"fmt"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing"
"k8s.io/client-go/util/consistencydetector"
"k8s.io/klog/v2/ktesting"
)
func TestReflectorDataConsistencyDetector(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
restore := consistencydetector.SetDataConsistencyDetectionForWatchListEnabledForTest(true)
defer restore()
markTransformed := func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return obj, nil
}
newPod := pod.DeepCopy()
if newPod.Labels == nil {
newPod.Labels = make(map[string]string)
}
newPod.Labels["transformed"] = "true"
return newPod, nil
}
for _, inOrder := range []bool{false, true} {
t.Run(fmt.Sprintf("InOrder=%v", inOrder), func(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.InOrderInformers, inOrder)
for _, transformerEnabled := range []bool{false, true} {
var transformer TransformFunc
if transformerEnabled {
transformer = markTransformed
}
t.Run(fmt.Sprintf("Transformer=%v", transformerEnabled), func(t *testing.T) {
runTestReflectorDataConsistencyDetector(t, transformer)
})
}
})
}
}
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
store := NewStore(MetaNamespaceKeyFunc)
fifo := newQueueFIFO(store, transformer)
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}},
},
}, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
w := watch.NewFake()
go func() {
w.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}})
w.Action(watch.Bookmark, &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
ResourceVersion: "1",
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
}})
}()
return w, nil
},
}
r := NewReflector(lw, &v1.Pod{}, fifo, 0)
go func() {
_ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
return r.LastSyncResourceVersion() != "", nil
})
cancel()
}()
err := r.ListAndWatchWithContext(ctx)
if err != nil {
t.Errorf("ListAndWatchWithContext returned error: %v", err)
}
}

View File

@@ -539,16 +539,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFO(MetaNamespaceKeyFunc, s.indexer, s.transform)
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
}
fifo := newQueueFIFO(s.indexer, s.transform)
cfg := &Config{
Queue: fifo,

View File

@@ -45,6 +45,16 @@ func IsDataConsistencyDetectionForWatchListEnabled() bool {
return dataConsistencyDetectionForWatchListEnabled
}
// SetDataConsistencyDetectionForWatchListEnabledForTest allows to enable/disable data consistency detection for testing purposes.
// It returns a function that restores the original value.
func SetDataConsistencyDetectionForWatchListEnabledForTest(enabled bool) func() {
original := dataConsistencyDetectionForWatchListEnabled
dataConsistencyDetectionForWatchListEnabled = enabled
return func() {
dataConsistencyDetectionForWatchListEnabled = original
}
}
type RetrieveItemsFunc[U any] func() []U
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)