mirror of
https://github.com/kubernetes/client-go.git
synced 2025-12-25 06:02:30 +00:00
Add unit tests for Data Consistency Detector
Kubernetes-commit: 76da8d6de027a4bf62601d45b8d72f8fa627ab5c
This commit is contained in:
committed by
Kubernetes Publisher
parent
f466f58eea
commit
7cf6a05732
31
tools/cache/controller.go
vendored
31
tools/cache/controller.go
vendored
@@ -708,20 +708,7 @@ func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
// KeyLister, that way resync operations will result in the correct set
|
||||
// of update/delete deltas.
|
||||
|
||||
var fifo Queue
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
fifo = NewRealFIFOWithOptions(RealFIFOOptions{
|
||||
KeyFunction: MetaNamespaceKeyFunc,
|
||||
KnownObjects: clientState,
|
||||
Transformer: options.Transform,
|
||||
})
|
||||
} else {
|
||||
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: clientState,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: options.Transform,
|
||||
})
|
||||
}
|
||||
fifo := newQueueFIFO(clientState, options.Transform)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
@@ -742,3 +729,19 @@ func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
}
|
||||
return New(cfg)
|
||||
}
|
||||
|
||||
func newQueueFIFO(clientState Store, transform TransformFunc) Queue {
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
return NewRealFIFOWithOptions(RealFIFOOptions{
|
||||
KeyFunction: MetaNamespaceKeyFunc,
|
||||
KnownObjects: clientState,
|
||||
Transformer: transform,
|
||||
})
|
||||
} else {
|
||||
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: clientState,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: transform,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
114
tools/cache/reflector_data_consistency_detector_test.go
vendored
Normal file
114
tools/cache/reflector_data_consistency_detector_test.go
vendored
Normal file
@@ -0,0 +1,114 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
clientfeatures "k8s.io/client-go/features"
|
||||
clientfeaturestesting "k8s.io/client-go/features/testing"
|
||||
"k8s.io/client-go/util/consistencydetector"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
)
|
||||
|
||||
func TestReflectorDataConsistencyDetector(t *testing.T) {
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
|
||||
restore := consistencydetector.SetDataConsistencyDetectionForWatchListEnabledForTest(true)
|
||||
defer restore()
|
||||
|
||||
markTransformed := func(obj interface{}) (interface{}, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return obj, nil
|
||||
}
|
||||
newPod := pod.DeepCopy()
|
||||
if newPod.Labels == nil {
|
||||
newPod.Labels = make(map[string]string)
|
||||
}
|
||||
newPod.Labels["transformed"] = "true"
|
||||
return newPod, nil
|
||||
}
|
||||
|
||||
for _, inOrder := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("InOrder=%v", inOrder), func(t *testing.T) {
|
||||
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.InOrderInformers, inOrder)
|
||||
for _, transformerEnabled := range []bool{false, true} {
|
||||
var transformer TransformFunc
|
||||
if transformerEnabled {
|
||||
transformer = markTransformed
|
||||
}
|
||||
t.Run(fmt.Sprintf("Transformer=%v", transformerEnabled), func(t *testing.T) {
|
||||
runTestReflectorDataConsistencyDetector(t, transformer)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
store := NewStore(MetaNamespaceKeyFunc)
|
||||
fifo := newQueueFIFO(store, transformer)
|
||||
|
||||
lw := &ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
return &v1.PodList{
|
||||
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
|
||||
Items: []v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}},
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
w := watch.NewFake()
|
||||
go func() {
|
||||
w.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}})
|
||||
w.Action(watch.Bookmark, &v1.Pod{ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod-1",
|
||||
ResourceVersion: "1",
|
||||
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
|
||||
}})
|
||||
}()
|
||||
return w, nil
|
||||
},
|
||||
}
|
||||
|
||||
r := NewReflector(lw, &v1.Pod{}, fifo, 0)
|
||||
|
||||
go func() {
|
||||
_ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
return r.LastSyncResourceVersion() != "", nil
|
||||
})
|
||||
cancel()
|
||||
}()
|
||||
|
||||
err := r.ListAndWatchWithContext(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("ListAndWatchWithContext returned error: %v", err)
|
||||
}
|
||||
}
|
||||
15
tools/cache/shared_informer.go
vendored
15
tools/cache/shared_informer.go
vendored
@@ -539,20 +539,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
|
||||
s.startedLock.Lock()
|
||||
defer s.startedLock.Unlock()
|
||||
|
||||
var fifo Queue
|
||||
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
|
||||
fifo = NewRealFIFOWithOptions(RealFIFOOptions{
|
||||
KeyFunction: MetaNamespaceKeyFunc,
|
||||
KnownObjects: s.indexer,
|
||||
Transformer: s.transform,
|
||||
})
|
||||
} else {
|
||||
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KnownObjects: s.indexer,
|
||||
EmitDeltaTypeReplaced: true,
|
||||
Transformer: s.transform,
|
||||
})
|
||||
}
|
||||
fifo := newQueueFIFO(s.indexer, s.transform)
|
||||
|
||||
cfg := &Config{
|
||||
Queue: fifo,
|
||||
|
||||
@@ -45,6 +45,16 @@ func IsDataConsistencyDetectionForWatchListEnabled() bool {
|
||||
return dataConsistencyDetectionForWatchListEnabled
|
||||
}
|
||||
|
||||
// SetDataConsistencyDetectionForWatchListEnabledForTest allows to enable/disable data consistency detection for testing purposes.
|
||||
// It returns a function that restores the original value.
|
||||
func SetDataConsistencyDetectionForWatchListEnabledForTest(enabled bool) func() {
|
||||
original := dataConsistencyDetectionForWatchListEnabled
|
||||
dataConsistencyDetectionForWatchListEnabled = enabled
|
||||
return func() {
|
||||
dataConsistencyDetectionForWatchListEnabled = original
|
||||
}
|
||||
}
|
||||
|
||||
type RetrieveItemsFunc[U any] func() []U
|
||||
|
||||
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
|
||||
|
||||
Reference in New Issue
Block a user