Merge pull request #125302 from karlkfi/karl-informer-watcher-test

Update TestNewInformerWatcher for WatchListClient

Kubernetes-commit: 169a952720ebd75fcbcb4f3f5cc64e82fdd3ec45
This commit is contained in:
Kubernetes Publisher 2024-06-07 04:52:20 -07:00
commit 94cabc85d9

View File

@ -32,7 +32,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing"
fakeclientset "k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
@ -134,96 +137,180 @@ func (a byEventTypeAndName) Less(i, j int) bool {
return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
}
func newTestSecret(name, key, value string) *corev1.Secret {
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
StringData: map[string]string{
key: value,
},
}
}
func TestNewInformerWatcher(t *testing.T) {
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
tt := []struct {
name string
watchListFeatureEnabled bool
objects []runtime.Object
events []watch.Event
inputEvents []watch.Event
outputEvents []watch.Event
}{
{
name: "basic test",
name: "WatchListClient feature disabled",
watchListFeatureEnabled: false,
objects: []runtime.Object{
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
newTestSecret("pod-1", "foo-1", "initial"),
newTestSecret("pod-2", "foo-2", "initial"),
newTestSecret("pod-3", "foo-3", "initial"),
},
StringData: map[string]string{
"foo-1": "initial",
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
},
StringData: map[string]string{
"foo-2": "initial",
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
},
StringData: map[string]string{
"foo-3": "initial",
},
},
},
events: []watch.Event{
inputEvents: []watch.Event{
{
Type: watch.Added,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-4",
},
StringData: map[string]string{
"foo-4": "initial",
},
},
Object: newTestSecret("pod-4", "foo-4", "initial"),
},
{
Type: watch.Modified,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
Object: newTestSecret("pod-2", "foo-2", "new"),
},
StringData: map[string]string{
"foo-2": "new",
{
Type: watch.Deleted,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
},
outputEvents: []watch.Event{
// When WatchListClient is disabled, ListAndWatch creates fake
// ADDED events for each object listed.
{
Type: watch.Added,
Object: newTestSecret("pod-1", "foo-1", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-2", "foo-2", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
// Normal events follow.
{
Type: watch.Added,
Object: newTestSecret("pod-4", "foo-4", "initial"),
},
{
Type: watch.Modified,
Object: newTestSecret("pod-2", "foo-2", "new"),
},
{
Type: watch.Deleted,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
},
},
{
Type: watch.Deleted,
name: "WatchListClient feature enabled",
watchListFeatureEnabled: true,
objects: []runtime.Object{
newTestSecret("pod-1", "foo-1", "initial"),
newTestSecret("pod-2", "foo-2", "initial"),
newTestSecret("pod-3", "foo-3", "initial"),
},
inputEvents: []watch.Event{
{
Type: watch.Added,
Object: newTestSecret("pod-1", "foo-1", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-2", "foo-2", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
// ListWatch bookmark indicates that initial listing is done
{
Type: watch.Bookmark,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
},
},
},
{
Type: watch.Added,
Object: newTestSecret("pod-4", "foo-4", "initial"),
},
{
Type: watch.Modified,
Object: newTestSecret("pod-2", "foo-2", "new"),
},
{
Type: watch.Deleted,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
},
outputEvents: []watch.Event{
// When WatchListClient is enabled, WatchList receives
// ADDED events from the server for each existing object.
{
Type: watch.Added,
Object: newTestSecret("pod-1", "foo-1", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-2", "foo-2", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
// Bookmark event at the end of listing is not sent to the client.
// Normal events follow.
{
Type: watch.Added,
Object: newTestSecret("pod-4", "foo-4", "initial"),
},
{
Type: watch.Modified,
Object: newTestSecret("pod-2", "foo-2", "new"),
},
{
Type: watch.Deleted,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
var expected []watch.Event
for _, o := range tc.objects {
expected = append(expected, watch.Event{
Type: watch.Added,
Object: o.DeepCopyObject(),
})
}
for _, e := range tc.events {
expected = append(expected, *e.DeepCopy())
}
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, tc.watchListFeatureEnabled)
fake := fakeclientset.NewSimpleClientset(tc.objects...)
fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false)
fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil))
for _, e := range tc.events {
fakeWatch.Action(e.Type, e.Object)
inputCh := make(chan watch.Event)
inputWatcher := watch.NewProxyWatcher(inputCh)
// Indexer should stop the input watcher when the output watcher is stopped.
// But stop it at the end of the test, just in case.
defer inputWatcher.Stop()
inputStopCh := inputWatcher.StopChan()
fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(inputWatcher, nil))
// Send events and then close the done channel
inputDoneCh := make(chan struct{})
go func() {
defer close(inputDoneCh)
for _, e := range tc.inputEvents {
select {
case inputCh <- e:
case <-inputStopCh:
return
}
}
}()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@ -233,46 +320,58 @@ func TestNewInformerWatcher(t *testing.T) {
return fake.CoreV1().Secrets("").Watch(context.TODO(), options)
},
}
_, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
_, _, outputWatcher, informerDoneCh := NewIndexerInformerWatcher(lw, &corev1.Secret{})
outputCh := outputWatcher.ResultChan()
timeoutCh := time.After(wait.ForeverTestTimeout)
var result []watch.Event
loop:
for {
var event watch.Event
var ok bool
select {
case event, ok = <-w.ResultChan():
case event, ok := <-outputCh:
if !ok {
t.Errorf("Failed to read event: channel is already closed!")
return
t.Errorf("Output result channel closed prematurely")
break loop
}
result = append(result, *event.DeepCopy())
case <-time.After(time.Second * 1):
// All the events are buffered -> this means we are done
// Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event
if len(result) >= len(tc.outputEvents) {
break loop
}
case <-timeoutCh:
t.Error("Timed out waiting for events")
break loop
}
}
// Informers don't guarantee event order so we need to sort these arrays to compare them
sort.Sort(byEventTypeAndName(expected))
// Informers don't guarantee event order so we need to sort these arrays to compare them.
sort.Sort(byEventTypeAndName(tc.outputEvents))
sort.Sort(byEventTypeAndName(result))
if !reflect.DeepEqual(expected, result) {
t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(expected), dump.Pretty(result), cmp.Diff(expected, result))
if !reflect.DeepEqual(tc.outputEvents, result) {
t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(tc.outputEvents), dump.Pretty(result), cmp.Diff(tc.outputEvents, result))
return
}
// Fill in some data to test watch closing while there are some events to be read
for _, e := range tc.events {
fakeWatch.Action(e.Type, e.Object)
// Send some more events, but don't read them.
// Stop producing events when the consumer stops the watcher.
go func() {
defer close(inputCh)
for _, e := range tc.inputEvents {
select {
case inputCh <- e:
case <-inputStopCh:
return
}
}
}()
// Stop before reading all the data to make sure the informer can deal with closed channel
w.Stop()
outputWatcher.Stop()
<-done
select {
case <-informerDoneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Error("Timed out waiting for informer to cleanup")
}
})
}