diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 954538ad..75be63a9 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -17,7 +17,10 @@ limitations under the License. package cache import ( + "context" "fmt" + "math/rand" + "strconv" "strings" "sync" "testing" @@ -629,6 +632,121 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { } } +// Shows that many concurrent goroutines can be manipulating shared informer +// listeners without tripping it up. There are not really many assertions in this +// test. Meant to be run with -race to find race conditions +func TestSharedInformerHandlerAbuse(t *testing.T) { + source := fcache.NewFakeControllerSource() + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + + ctx, cancel := context.WithCancel(context.Background()) + informerCtx, informerCancel := context.WithCancel(context.Background()) + go func() { + informer.Run(informerCtx.Done()) + cancel() + }() + + worker := func() { + // Keep adding and removing handler + // Make sure no duplicate events? + funcs := ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(oldObj, newObj interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + } + handles := []ResourceEventHandlerRegistration{} + + for { + select { + case <-ctx.Done(): + return + default: + switch rand.Intn(2) { + case 0: + // Register handler again + reg, err := informer.AddEventHandlerWithResyncPeriod(funcs, 1*time.Second) + if err != nil { + if strings.Contains(err.Error(), "stopped already") { + continue + } + t.Errorf("failed to add handler: %v", err) + return + } + handles = append(handles, reg) + case 1: + // Remove a random handler + if len(handles) == 0 { + continue + } + + idx := rand.Intn(len(handles)) + err := informer.RemoveEventHandler(handles[idx]) + if err != nil { + if strings.Contains(err.Error(), "stopped already") { + continue + } + t.Errorf("failed to remove handler: %v", err) + return + } + handles = append(handles[:idx], handles[idx+1:]...) + } + } + } + } + + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + worker() + wg.Done() + }() + } + + objs := []*v1.Pod{} + + // While workers run, randomly create events for the informer + for i := 0; i < 10000; i++ { + if len(objs) == 0 { + // Make sure there is always an object + obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "pod" + strconv.Itoa(i), + }} + objs = append(objs, obj) + + // deep copy before adding since the Modify function mutates the obj + source.Add(obj.DeepCopy()) + } + + switch rand.Intn(3) { + case 0: + // Add Object + obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "pod" + strconv.Itoa(i), + }} + objs = append(objs, obj) + source.Add(obj.DeepCopy()) + case 1: + // Update Object + idx := rand.Intn(len(objs)) + source.Modify(objs[idx].DeepCopy()) + + case 2: + // Remove Object + idx := rand.Intn(len(objs)) + source.Delete(objs[idx].DeepCopy()) + objs = append(objs[:idx], objs[idx+1:]...) + } + } + + // sotp informer which stops workers. stopping informer first to excercise + // contention for informer while it is closing + informerCancel() + + // wait for workers to finish since they may throw errors + wg.Wait() +} + func TestStateSharedInformer(t *testing.T) { source := fcache.NewFakeControllerSource() source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})