add multithreaded test to shared informer

Kubernetes-commit: 8af0a31a15079725e5910d538a6ace03df2e382a
This commit is contained in:
Alexander Zielenski 2022-08-08 13:39:18 -07:00 committed by Kubernetes Publisher
parent de0b7671e3
commit 449817f7b5

View File

@ -17,7 +17,10 @@ limitations under the License.
package cache
import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"testing"
@ -629,6 +632,121 @@ func TestRemovingRemovedSharedInformer(t *testing.T) {
}
}
// Shows that many concurrent goroutines can be manipulating shared informer
// listeners without tripping it up. There are not really many assertions in this
// test. Meant to be run with -race to find race conditions
func TestSharedInformerHandlerAbuse(t *testing.T) {
source := fcache.NewFakeControllerSource()
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
ctx, cancel := context.WithCancel(context.Background())
informerCtx, informerCancel := context.WithCancel(context.Background())
go func() {
informer.Run(informerCtx.Done())
cancel()
}()
worker := func() {
// Keep adding and removing handler
// Make sure no duplicate events?
funcs := ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {},
}
handles := []ResourceEventHandlerRegistration{}
for {
select {
case <-ctx.Done():
return
default:
switch rand.Intn(2) {
case 0:
// Register handler again
reg, err := informer.AddEventHandlerWithResyncPeriod(funcs, 1*time.Second)
if err != nil {
if strings.Contains(err.Error(), "stopped already") {
continue
}
t.Errorf("failed to add handler: %v", err)
return
}
handles = append(handles, reg)
case 1:
// Remove a random handler
if len(handles) == 0 {
continue
}
idx := rand.Intn(len(handles))
err := informer.RemoveEventHandler(handles[idx])
if err != nil {
if strings.Contains(err.Error(), "stopped already") {
continue
}
t.Errorf("failed to remove handler: %v", err)
return
}
handles = append(handles[:idx], handles[idx+1:]...)
}
}
}
}
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
worker()
wg.Done()
}()
}
objs := []*v1.Pod{}
// While workers run, randomly create events for the informer
for i := 0; i < 10000; i++ {
if len(objs) == 0 {
// Make sure there is always an object
obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod" + strconv.Itoa(i),
}}
objs = append(objs, obj)
// deep copy before adding since the Modify function mutates the obj
source.Add(obj.DeepCopy())
}
switch rand.Intn(3) {
case 0:
// Add Object
obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "pod" + strconv.Itoa(i),
}}
objs = append(objs, obj)
source.Add(obj.DeepCopy())
case 1:
// Update Object
idx := rand.Intn(len(objs))
source.Modify(objs[idx].DeepCopy())
case 2:
// Remove Object
idx := rand.Intn(len(objs))
source.Delete(objs[idx].DeepCopy())
objs = append(objs[:idx], objs[idx+1:]...)
}
}
// sotp informer which stops workers. stopping informer first to excercise
// contention for informer while it is closing
informerCancel()
// wait for workers to finish since they may throw errors
wg.Wait()
}
func TestStateSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})