Files
client-go/tools/cache/shared_informer_test.go
Patrick Ohly 40cace856c client-go/tools/cache: fix TestAddWhileActive
4638ba971661497b147906b8977ae206c9dd6e44 added tracking of the goroutine which
executes informer.Run. In the TestAddWhileActive the original `go
informer.Run()` was left in place, causing a data race between the two
`informer.Run` instances:

==================
WARNING: DATA RACE
Read at 0x00c000262398 by goroutine 5302:
  k8s.io/client-go/tools/cache.(*controller).RunWithContext()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/controller.go:162 +0x1ad
  k8s.io/client-go/tools/cache.(*sharedIndexInformer).RunWithContext()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer.go:584 +0x6c5
  k8s.io/client-go/tools/cache.(*sharedIndexInformer).Run()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer.go:527 +0x48
  k8s.io/client-go/tools/cache.TestAddWhileActive.gowrap1()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go:1080 +0x17

Previous write at 0x00c000262398 by goroutine 5301:
  k8s.io/client-go/tools/cache.New()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/controller.go:142 +0x9de
  k8s.io/client-go/tools/cache.(*sharedIndexInformer).RunWithContext.func1()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer.go:562 +0xa78
  k8s.io/client-go/tools/cache.(*sharedIndexInformer).RunWithContext()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer.go:565 +0x119
  k8s.io/client-go/tools/cache.(*sharedIndexInformer).Run()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer.go:527 +0x44
  k8s.io/client-go/tools/cache.(*sharedIndexInformer).Run-fm()
      <autogenerated>:1 +0x17
  k8s.io/client-go/tools/cache.TestAddWhileActive.(*Group).StartWithChannel.func2()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go:55 +0x38
  k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go:72 +0x86

Goroutine 5302 (running) created at:
  k8s.io/client-go/tools/cache.TestAddWhileActive()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go:1080 +0x93e
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1690 +0x226
  testing.(*T).Run.gowrap1()
      /usr/local/go/src/testing/testing.go:1743 +0x44

Goroutine 5301 (running) created at:
  k8s.io/apimachinery/pkg/util/wait.(*Group).Start()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go:70 +0xe4
  k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go:54 +0x7e6
  k8s.io/client-go/tools/cache.TestAddWhileActive()
      /home/prow/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go:1074 +0x6a1
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1690 +0x226
  testing.(*T).Run.gowrap1()
      /usr/local/go/src/testing/testing.go:1743 +0x44
==================

Kubernetes-commit: d66ced5730fa60c04b0a39df58a156b7045585f6
2025-01-14 14:06:31 +01:00

1333 lines
40 KiB
Go

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"fmt"
"math/rand"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
fcache "k8s.io/client-go/tools/cache/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/klog/v2/textlogger"
testingclock "k8s.io/utils/clock/testing"
)
type testListener struct {
lock sync.RWMutex
resyncPeriod time.Duration
expectedItemNames sets.String
receivedItemNames []string
name string
}
func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener {
l := &testListener{
resyncPeriod: resyncPeriod,
expectedItemNames: sets.NewString(expected...),
name: name,
}
return l
}
func (l *testListener) OnAdd(obj interface{}, isInInitialList bool) {
l.handle(obj)
}
func (l *testListener) OnUpdate(old, new interface{}) {
l.handle(new)
}
func (l *testListener) OnDelete(obj interface{}) {
}
func (l *testListener) handle(obj interface{}) {
key, _ := MetaNamespaceKeyFunc(obj)
fmt.Printf("%s: handle: %v\n", l.name, key)
l.lock.Lock()
defer l.lock.Unlock()
objectMeta, _ := meta.Accessor(obj)
l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName())
}
func (l *testListener) ok() bool {
fmt.Println("polling")
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
if l.satisfiedExpectations() {
return true, nil
}
return false, nil
})
if err != nil {
return false
}
// wait just a bit to allow any unexpected stragglers to come in
fmt.Println("sleeping")
time.Sleep(1 * time.Second)
fmt.Println("final check")
return l.satisfiedExpectations()
}
func (l *testListener) satisfiedExpectations() bool {
l.lock.RLock()
defer l.lock.RUnlock()
return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
}
func eventHandlerCount(i SharedInformer) int {
s := i.(*sharedIndexInformer)
s.startedLock.Lock()
defer s.startedLock.Unlock()
return len(s.processor.listeners)
}
func isStarted(i SharedInformer) bool {
s := i.(*sharedIndexInformer)
s.startedLock.Lock()
defer s.startedLock.Unlock()
return s.started
}
func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
s := i.(*sharedIndexInformer)
return s.processor.getListener(h) != nil
}
func TestIndexer(t *testing.T) {
assert := assert.New(t)
// source simulates an apiserver object endpoint.
source := newFakeControllerSource(t)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}}
source.Add(pod1)
source.Add(pod2)
// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
err := informer.AddIndexers(map[string]IndexFunc{
"labels": func(obj interface{}) ([]string, error) {
res := []string{}
for k := range obj.(*v1.Pod).Labels {
res = append(res, k)
}
return res, nil
},
})
if err != nil {
t.Fatal(err)
}
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
WaitForCacheSync(stop, informer.HasSynced)
cmpOps := cmpopts.SortSlices(func(a, b any) bool {
return a.(*v1.Pod).Name < b.(*v1.Pod).Name
})
// We should be able to lookup by index
res, err := informer.GetIndexer().ByIndex("labels", "a")
assert.NoError(err)
if diff := cmp.Diff([]any{pod1}, res); diff != "" {
t.Fatal(diff)
}
// Adding an item later is fine as well
source.Add(pod3)
// Event is async, need to poll
assert.Eventually(func() bool {
res, _ := informer.GetIndexer().ByIndex("labels", "a")
return cmp.Diff([]any{pod1, pod3}, res, cmpOps) == ""
}, time.Second*3, time.Millisecond)
// Adding an index later is also fine
err = informer.AddIndexers(map[string]IndexFunc{
"labels-again": func(obj interface{}) ([]string, error) {
res := []string{}
for k := range obj.(*v1.Pod).Labels {
res = append(res, k)
}
return res, nil
},
})
assert.NoError(err)
// Should be immediately available
res, err = informer.GetIndexer().ByIndex("labels-again", "a")
assert.NoError(err)
if diff := cmp.Diff([]any{pod1, pod3}, res, cmpOps); diff != "" {
t.Fatal(diff)
}
if got := informer.GetIndexer().ListIndexFuncValues("labels"); !sets.New(got...).Equal(sets.New("a", "b")) {
t.Fatalf("got %v", got)
}
if got := informer.GetIndexer().ListIndexFuncValues("labels-again"); !sets.New(got...).Equal(sets.New("a", "b")) {
t.Fatalf("got %v", got)
}
}
func TestListenerResyncPeriods(t *testing.T) {
// source simulates an apiserver object endpoint.
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
clock := testingclock.NewFakeClock(time.Now())
informer.clock = clock
informer.processor.clock = clock
// listener 1, never resync
listener1 := newTestListener("listener1", 0, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
// listener 2, resync every 2s
listener2 := newTestListener("listener2", 2*time.Second, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
// listener 3, resync every 3s
listener3 := newTestListener("listener3", 3*time.Second, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
listeners := []*testListener{listener1, listener2, listener3}
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
// ensure all listeners got the initial List
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
// reset
for _, listener := range listeners {
listener.receivedItemNames = []string{}
}
// advance so listener2 gets a resync
clock.Step(2 * time.Second)
// make sure listener2 got the resync
if !listener2.ok() {
t.Errorf("%s: expected %v, got %v", listener2.name, listener2.expectedItemNames, listener2.receivedItemNames)
}
// wait a bit to give errant items a chance to go to 1 and 3
time.Sleep(1 * time.Second)
// make sure listeners 1 and 3 got nothing
if len(listener1.receivedItemNames) != 0 {
t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
}
if len(listener3.receivedItemNames) != 0 {
t.Errorf("listener3: should not have resynced (got %d)", len(listener3.receivedItemNames))
}
// reset
for _, listener := range listeners {
listener.receivedItemNames = []string{}
}
// advance so listener3 gets a resync
clock.Step(1 * time.Second)
// make sure listener3 got the resync
if !listener3.ok() {
t.Errorf("%s: expected %v, got %v", listener3.name, listener3.expectedItemNames, listener3.receivedItemNames)
}
// wait a bit to give errant items a chance to go to 1 and 2
time.Sleep(1 * time.Second)
// make sure listeners 1 and 2 got nothing
if len(listener1.receivedItemNames) != 0 {
t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
}
if len(listener2.receivedItemNames) != 0 {
t.Errorf("listener2: should not have resynced (got %d)", len(listener2.receivedItemNames))
}
}
func TestResyncCheckPeriod(t *testing.T) {
// source simulates an apiserver object endpoint.
source := newFakeControllerSource(t)
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer)
gl := informer.processor.getListener
clock := testingclock.NewFakeClock(time.Now())
informer.clock = clock
informer.processor.clock = clock
// listener 1, never resync
listener1 := newTestListener("listener1", 0)
handler1, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
// listener 2, resync every minute
listener2 := newTestListener("listener2", 1*time.Minute)
handler2, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
// listener 3, resync every 55 seconds
listener3 := newTestListener("listener3", 55*time.Second)
handler3, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
// listener 4, resync every 5 seconds
listener4 := newTestListener("listener4", 5*time.Second)
handler4, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod)
if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 5*time.Second, gl(handler4).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
}
// verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed
func TestSharedInformerInitializationRace(t *testing.T) {
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("raceListener", 0)
go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
}
// TestSharedInformerWatchDisruption simulates a watch that was closed
// with updates to the store during that time. We ensure that handlers with
// resync and no resync see the expected state.
func TestSharedInformerWatchDisruption(t *testing.T) {
// source simulates an apiserver object endpoint.
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
clock := testingclock.NewFakeClock(time.Now())
informer.clock = clock
informer.processor.clock = clock
// listener, never resync
listenerNoResync := newTestListener("listenerNoResync", 0, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listenerNoResync, listenerNoResync.resyncPeriod)
listenerResync := newTestListener("listenerResync", 1*time.Second, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod)
listeners := []*testListener{listenerNoResync, listenerResync}
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
// Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist
source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3", ResourceVersion: "3"}})
source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "4"}})
// Ensure that nobody saw any changes
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
for _, listener := range listeners {
listener.receivedItemNames = []string{}
}
listenerNoResync.expectedItemNames = sets.NewString("pod2", "pod3")
listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
// This calls shouldSync, which deletes noResync from the list of syncingListeners
clock.Step(1 * time.Second)
// Simulate a connection loss (or even just a too-old-watch)
source.ResetWatch()
// Wait long enough for the reflector to exit and the backoff function to start waiting
// on the fake clock, otherwise advancing the fake clock will have no effect.
// TODO: Make this deterministic by counting the number of waiters on FakeClock
time.Sleep(10 * time.Millisecond)
// Advance the clock to cause the backoff wait to expire.
clock.Step(1601 * time.Millisecond)
// Wait long enough for backoff to invoke ListWatch a second time and distribute events
// to listeners.
time.Sleep(10 * time.Millisecond)
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
}
func TestSharedInformerErrorHandling(t *testing.T) {
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
source.ListError = fmt.Errorf("Access Denied")
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
errCh := make(chan error)
_ = informer.SetWatchErrorHandler(func(_ *Reflector, err error) {
errCh <- err
})
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
select {
case err := <-errCh:
if !strings.Contains(err.Error(), "Access Denied") {
t.Errorf("Expected 'Access Denied' error. Actual: %v", err)
}
case <-time.After(time.Second):
t.Errorf("Timeout waiting for error handler call")
}
}
// TestSharedInformerStartRace is a regression test to ensure there is no race between
// Run and SetWatchErrorHandler, and Run and SetTransform.
func TestSharedInformerStartRace(t *testing.T) {
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
stop := make(chan struct{})
go func() {
for {
select {
case <-stop:
return
default:
}
// Set dummy functions, just to test for race
informer.SetTransform(func(i interface{}) (interface{}, error) {
return i, nil
})
informer.SetWatchErrorHandler(func(r *Reflector, err error) {
})
}
}()
var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
}
func TestSharedInformerTransformer(t *testing.T) {
// source simulates an apiserver object endpoint.
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
informer.SetTransform(func(obj interface{}) (interface{}, error) {
if pod, ok := obj.(*v1.Pod); ok {
name := pod.GetName()
if upper := strings.ToUpper(name); upper != name {
pod.SetName(upper)
return pod, nil
}
}
return obj, nil
})
listenerTransformer := newTestListener("listenerTransformer", 0, "POD1", "POD2")
informer.AddEventHandler(listenerTransformer)
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
if !listenerTransformer.ok() {
t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames)
}
}
func TestSharedInformerRemoveHandler(t *testing.T) {
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handler1 := &ResourceEventHandlerFuncs{}
handle1, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler1: %s", err)
return
}
handler2 := &ResourceEventHandlerFuncs{}
handle2, err := informer.AddEventHandler(handler2)
if err != nil {
t.Errorf("informer did not add handler2: %s", err)
return
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
}
if err := informer.RemoveEventHandler(handle2); err != nil {
t.Errorf("removing of second pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 1 {
t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
}
if err := informer.RemoveEventHandler(handle1); err != nil {
t.Errorf("removing of first pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 0 {
t.Errorf("informer still has registered handlers after removing both handlers")
}
}
func TestSharedInformerRemoveForeignHandler(t *testing.T) {
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
source2 := newFakeControllerSource(t)
source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
handler1 := &ResourceEventHandlerFuncs{}
handle1, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler1: %s", err)
return
}
handler2 := &ResourceEventHandlerFuncs{}
handle2, err := informer.AddEventHandler(handler2)
if err != nil {
t.Errorf("informer did not add handler2: %s", err)
return
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
}
if eventHandlerCount(informer2) != 0 {
t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
}
// remove handle at foreign informer
if isRegistered(informer2, handle1) {
t.Errorf("handle1 registered for informer2")
}
if isRegistered(informer2, handle2) {
t.Errorf("handle2 registered for informer2")
}
if err := informer2.RemoveEventHandler(handle1); err != nil {
t.Errorf("removing of second pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
}
if eventHandlerCount(informer2) != 0 {
t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
}
if !isRegistered(informer, handle1) {
t.Errorf("handle1 not registered anymore for informer")
}
if !isRegistered(informer, handle2) {
t.Errorf("handle2 not registered anymore for informer")
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
}
if eventHandlerCount(informer2) != 0 {
t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
}
if !isRegistered(informer, handle1) {
t.Errorf("handle1 not registered anymore for informer")
}
if !isRegistered(informer, handle2) {
t.Errorf("handle2 not registered anymore for informer")
}
if err := informer.RemoveEventHandler(handle2); err != nil {
t.Errorf("removing of second pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 1 {
t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
}
if err := informer.RemoveEventHandler(handle1); err != nil {
t.Errorf("removing of first pointer handler failed: %s", err)
}
if eventHandlerCount(informer) != 0 {
t.Errorf("informer still has registered handlers after removing both handlers")
}
}
func TestSharedInformerMultipleRegistration(t *testing.T) {
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
handler1 := &ResourceEventHandlerFuncs{}
reg1, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler for the first time: %s", err)
return
}
if !isRegistered(informer, reg1) {
t.Errorf("handle1 is not active after successful registration")
return
}
reg2, err := informer.AddEventHandler(handler1)
if err != nil {
t.Errorf("informer did not add handler for the second: %s", err)
return
}
if !isRegistered(informer, reg2) {
t.Errorf("handle2 is not active after successful registration")
return
}
if eventHandlerCount(informer) != 2 {
t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer))
}
if err := informer.RemoveEventHandler(reg1); err != nil {
t.Errorf("removing of duplicate handler registration failed: %s", err)
}
if isRegistered(informer, reg1) {
t.Errorf("handle1 is still active after successful remove")
return
}
if !isRegistered(informer, reg2) {
t.Errorf("handle2 is not active after removing handle1")
return
}
if eventHandlerCount(informer) != 1 {
if eventHandlerCount(informer) == 0 {
t.Errorf("informer has no registered handler anymore after removal of duplicate registrations")
} else {
t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", eventHandlerCount(informer))
}
}
if err := informer.RemoveEventHandler(reg2); err != nil {
t.Errorf("removing of second handler registration failed: %s", err)
}
if isRegistered(informer, reg2) {
t.Errorf("handle2 is still active after successful remove")
return
}
if eventHandlerCount(informer) != 0 {
t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", eventHandlerCount(informer))
}
}
func TestRemovingRemovedSharedInformer(t *testing.T) {
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
handler := &ResourceEventHandlerFuncs{}
reg, err := informer.AddEventHandler(handler)
if err != nil {
t.Errorf("informer did not add handler for the first time: %s", err)
return
}
if err := informer.RemoveEventHandler(reg); err != nil {
t.Errorf("removing of handler registration failed: %s", err)
return
}
if isRegistered(informer, reg) {
t.Errorf("handle is still active after successful remove")
return
}
if err := informer.RemoveEventHandler(reg); err != nil {
t.Errorf("removing of already removed registration yields unexpected error: %s", err)
}
if isRegistered(informer, reg) {
t.Errorf("handle is still active after second remove")
return
}
}
// Shows that many concurrent goroutines can be manipulating shared informer
// listeners without tripping it up. There are not really many assertions in this
// test. Meant to be run with -race to find race conditions
func TestSharedInformerHandlerAbuse(t *testing.T) {
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
ctx, cancel := context.WithCancel(context.Background())
informerCtx, informerCancel := context.WithCancel(context.Background())
var informerWg wait.Group
informerWg.StartWithChannel(informerCtx.Done(), informer.Run)
defer func() {
cancel()
informerWg.Wait()
}()
worker := func() {
// Keep adding and removing handler
// Make sure no duplicate events?
funcs := ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) {},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {},
}
handles := []ResourceEventHandlerRegistration{}
for {
select {
case <-ctx.Done():
return
default:
switch rand.Intn(2) {
case 0:
// Register handler again
reg, err := informer.AddEventHandlerWithResyncPeriod(funcs, 1*time.Second)
if err != nil {
if strings.Contains(err.Error(), "stopped already") {
// test is over
return
}
t.Errorf("failed to add handler: %v", err)
return
}
handles = append(handles, reg)
case 1:
// Remove a random handler
if len(handles) == 0 {
continue
}
idx := rand.Intn(len(handles))
err := informer.RemoveEventHandler(handles[idx])
if err != nil {
if strings.Contains(err.Error(), "stopped already") {
// test is over
return
}
t.Errorf("failed to remove handler: %v", err)
return
}
handles = append(handles[:idx], handles[idx+1:]...)
}
}
}
}
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
worker()
wg.Done()
}()
}
objs := []*v1.Pod{}
// While workers run, randomly create events for the informer
for i := 0; i < 10000; i++ {
if len(objs) == 0 {
// Make sure there is always an object
obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod" + strconv.Itoa(i),
}}
objs = append(objs, obj)
// deep copy before adding since the Modify function mutates the obj
source.Add(obj.DeepCopy())
}
switch rand.Intn(3) {
case 0:
// Add Object
obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod" + strconv.Itoa(i),
}}
objs = append(objs, obj)
source.Add(obj.DeepCopy())
case 1:
// Update Object
idx := rand.Intn(len(objs))
source.Modify(objs[idx].DeepCopy())
case 2:
// Remove Object
idx := rand.Intn(len(objs))
source.Delete(objs[idx].DeepCopy())
objs = append(objs[:idx], objs[idx+1:]...)
}
}
// sotp informer which stops workers. stopping informer first to exercise
// contention for informer while it is closing
informerCancel()
// wait for workers to finish since they may throw errors
wg.Wait()
}
func TestStateSharedInformer(t *testing.T) {
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if isStarted(informer) {
t.Errorf("informer already started after creation")
return
}
if informer.IsStopped() {
t.Errorf("informer already stopped after creation")
return
}
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer wg.Wait()
if !listener.ok() {
t.Errorf("informer did not report initial objects")
close(stop)
return
}
if !isStarted(informer) {
t.Errorf("informer does not report to be started although handling events")
close(stop)
return
}
if informer.IsStopped() {
t.Errorf("informer reports to be stopped although stop channel not closed")
close(stop)
return
}
close(stop)
fmt.Println("sleeping")
time.Sleep(1 * time.Second)
if !informer.IsStopped() {
t.Errorf("informer reports not to be stopped although stop channel closed")
return
}
if !isStarted(informer) {
t.Errorf("informer reports not to be started after it has been started and stopped")
return
}
}
func TestAddOnStoppedSharedInformer(t *testing.T) {
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
stop := make(chan struct{})
var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer wg.Wait()
close(stop)
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
if informer.IsStopped() {
return true, nil
}
return false, nil
})
if err != nil {
t.Errorf("informer reports not to be stopped although stop channel closed")
return
}
_, err = informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if err == nil {
t.Errorf("stopped informer did not reject add handler")
return
}
if !strings.HasSuffix(err.Error(), "was not added to shared informer because it has stopped already") {
t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err)
return
}
}
func TestRemoveOnStoppedSharedInformer(t *testing.T) {
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
if err != nil {
t.Errorf("informer did not add handler: %s", err)
return
}
stop := make(chan struct{})
var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer wg.Wait()
close(stop)
fmt.Println("sleeping")
time.Sleep(1 * time.Second)
if !informer.IsStopped() {
t.Errorf("informer reports not to be stopped although stop channel closed")
return
}
err = informer.RemoveEventHandler(handle)
if err != nil {
t.Errorf("informer does not remove handler on stopped informer")
return
}
}
func TestRemoveWhileActive(t *testing.T) {
// source simulates an apiserver object endpoint.
source := newFakeControllerSource(t)
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
listener := newTestListener("listener", 0, "pod1")
handle, _ := informer.AddEventHandler(listener)
stop := make(chan struct{})
var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
if !listener.ok() {
t.Errorf("event did not occur")
return
}
informer.RemoveEventHandler(handle)
if isRegistered(informer, handle) {
t.Errorf("handle is still active after successful remove")
return
}
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
if !listener.ok() {
t.Errorf("unexpected event occurred")
return
}
}
func TestAddWhileActive(t *testing.T) {
// source simulates an apiserver object endpoint.
source := newFakeControllerSource(t)
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
listener1 := newTestListener("originalListener", 0, "pod1")
listener2 := newTestListener("listener2", 0, "pod1", "pod2")
handle1, _ := informer.AddEventHandler(listener1)
if handle1.HasSynced() {
t.Error("Synced before Run??")
}
stop := make(chan struct{})
var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
if !listener1.ok() {
t.Errorf("events on listener1 did not occur")
return
}
if !handle1.HasSynced() {
t.Error("Not synced after Run??")
}
listener2.lock.Lock() // ensure we observe it before it has synced
handle2, _ := informer.AddEventHandler(listener2)
if handle2.HasSynced() {
t.Error("Synced before processing anything?")
}
listener2.lock.Unlock() // permit it to proceed and sync
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
if !listener2.ok() {
t.Errorf("event on listener2 did not occur")
return
}
if !handle2.HasSynced() {
t.Error("Not synced even after processing?")
}
if !isRegistered(informer, handle1) {
t.Errorf("handle1 is not active")
return
}
if !isRegistered(informer, handle2) {
t.Errorf("handle2 is not active")
return
}
listener1.expectedItemNames = listener2.expectedItemNames
if !listener1.ok() {
t.Errorf("events on listener1 did not occur")
return
}
}
// TestShutdown depends on goleak.VerifyTestMain in main_test.go to verify that
// all goroutines really have stopped in the different scenarios.
func TestShutdown(t *testing.T) {
t.Run("no-context", func(t *testing.T) {
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
AddFunc: func(_ any) {},
})
require.NoError(t, err)
defer func() {
assert.NoError(t, informer.RemoveEventHandler(handler))
}()
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
})
t.Run("no-context-later", func(t *testing.T) {
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
var wg wait.Group
stop := make(chan struct{})
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
wg.Wait()
}()
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
AddFunc: func(_ any) {},
})
require.NoError(t, err)
assert.NoError(t, informer.RemoveEventHandler(handler))
})
t.Run("no-run", func(t *testing.T) {
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
_, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
AddFunc: func(_ any) {},
})
require.NoError(t, err)
// At this point, neither informer nor handler have any goroutines running
// and it doesn't matter that nothing gets stopped or removed.
})
}
func TestEventPanics(t *testing.T) {
// timeInUTC := time.Date(2009, 12, 1, 13, 30, 40, 42000, time.UTC)
// timeString := "1201 13:30:40.000042"
// Initialized by init.
var (
buffer threadSafeBuffer
logger klog.Logger
source *fcache.FakeControllerSource
)
init := func(t *testing.T) {
// Restoring state is very sensitive to ordering. All goroutines spawned
// by a test must have completed and there has to be a check that they
// have completed that is visible to the race detector. This also
// applies to all other tests!
t.Cleanup(klog.CaptureState().Restore) //nolint:logcheck // CaptureState shouldn't be used in packages with contextual logging, but here it is okay.
buffer.buffer.Reset()
logger = textlogger.NewLogger(textlogger.NewConfig(
// textlogger.FixedTime(timeInUTC),
textlogger.Output(&buffer),
))
oldReallyCrash := utilruntime.ReallyCrash
utilruntime.ReallyCrash = false
t.Cleanup(func() { utilruntime.ReallyCrash = oldReallyCrash })
source = newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
}
newHandler := func(ctx context.Context) ResourceEventHandlerFuncs {
logger := klog.FromContext(ctx)
return ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
logger.Info("Add func will panic now", "pod", klog.KObj(obj.(*v1.Pod)))
panic("fake panic")
},
}
}
_, _, panicLine, _ := runtime.Caller(0)
panicLine -= 4
expectedLog := func(name string) string {
if name == "" {
return fmt.Sprintf(`shared_informer_test.go:%d] "Observed a panic" panic="fake panic"`, panicLine)
}
return fmt.Sprintf(`shared_informer_test.go:%d] "Observed a panic" logger=%q panic="fake panic"`, panicLine, name)
}
handler := newHandler(context.Background())
t.Run("simple", func(t *testing.T) {
init(t)
klog.SetLogger(logger)
stop := make(chan struct{})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handle, err := informer.AddEventHandler(handler)
require.NoError(t, err)
defer func() {
assert.NoError(t, informer.RemoveEventHandler(handle))
}()
var wg wait.Group
wg.StartWithChannel(stop, informer.Run)
defer func() {
close(stop)
assert.Eventually(t, informer.IsStopped, time.Minute, time.Millisecond, "informer has stopped")
wg.Wait() // For race detector...
}()
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
// This times out (https://github.com/kubernetes/kubernetes/issues/129024) because the
// handler never syncs when the callback panics:
// require.Eventually(t, handle.HasSynced, time.Minute, time.Millisecond, "handler has synced")
//
// Wait for a non-empty buffer instead. This implies that we have to make
// the buffer thread-safe, which wouldn't be necessary otherwise.
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Contains(t, buffer.String(), expectedLog(""))
}, time.Minute, time.Millisecond, "handler has panicked")
})
t.Run("many", func(t *testing.T) {
init(t)
// One pod was already created in init, add some more.
numPods := 5
for i := 1; i < numPods; i++ {
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod%d", i+1)}})
}
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
name1 := "fake-event-handler-1"
logger1 := klog.LoggerWithName(logger, name1)
ctx1 := klog.NewContext(ctx, logger1)
handle1, err := informer.AddEventHandlerWithOptions(newHandler(ctx1), HandlerOptions{Logger: &logger1})
require.NoError(t, err)
defer func() {
assert.NoError(t, informer.RemoveEventHandler(handle1))
}()
name2 := "fake-event-handler-2"
logger2 := klog.LoggerWithName(logger, name2)
ctx2 := klog.NewContext(ctx, logger2)
handle2, err := informer.AddEventHandlerWithOptions(newHandler(ctx2), HandlerOptions{Logger: &logger2})
require.NoError(t, err)
defer func() {
assert.NoError(t, informer.RemoveEventHandler(handle2))
}()
start := time.Now()
var wg wait.Group
informerName := "informer"
informerLogger := klog.LoggerWithName(logger, informerName)
informerCtx := klog.NewContext(ctx, informerLogger)
wg.StartWithContext(informerCtx, informer.RunWithContext)
defer func() {
cancel()
assert.Eventually(t, informer.IsStopped, time.Minute, time.Millisecond, "informer has stopped")
wg.Wait() // For race detector...
}()
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
assert.EventuallyWithT(t, func(t *assert.CollectT) {
output := buffer.String()
expected := expectedLog(name1)
if !assert.Equal(t, numPods, numOccurrences(output, expected), "Log output should have the right number of panics for %q (search string: %q), got instead:\n%s", name1, expected, output) {
return
}
expected = expectedLog(name2)
assert.Equal(t, numPods, numOccurrences(output, expected), "Log output should have the right number of panics for %q (search string %q, got instead:\n%s", name2, expected, output)
}, 30*time.Second, time.Millisecond, "handler has panicked")
// Both handlers should have slept for one second after each panic,
// except after the last pod event because then the input channel
// gets closed.
assert.GreaterOrEqual(t, time.Since(start), time.Duration(numPods-1)*time.Second, "Delay in processorListener.run")
})
}
func numOccurrences(hay, needle string) int {
count := 0
for {
index := strings.Index(hay, needle)
if index < 0 {
return count
}
count++
hay = hay[index+len(needle):]
}
}