Fix race in informer transformers

Kubernetes-commit: e9f74597a8e7104b26640614e952d4453654453b
This commit is contained in:
Wojciech Tyczyński
2024-04-17 11:19:08 +02:00
committed by Kubernetes Publisher
parent 178bcf2846
commit 2fe05741c1
2 changed files with 142 additions and 20 deletions

View File

@@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
@@ -575,6 +576,114 @@ func TestTransformingInformer(t *testing.T) {
close(stopCh)
}
func TestTransformingInformerRace(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
label := "to-be-transformed"
makePod := func(name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "namespace",
Labels: map[string]string{label: "true"},
},
Spec: v1.PodSpec{
Hostname: "hostname",
},
}
}
badTransform := atomic.Bool{}
podTransformer := func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object type: %T", obj)
}
if pod.ObjectMeta.Labels[label] != "true" {
badTransform.Store(true)
return nil, fmt.Errorf("object already transformed: %#v", obj)
}
pod.ObjectMeta.Labels[label] = "false"
return pod, nil
}
numObjs := 5
for i := 0; i < numObjs; i++ {
source.Add(makePod(fmt.Sprintf("pod-%d", i)))
}
type event struct{}
events := make(chan event, numObjs)
recordEvent := func(eventType watch.EventType, previous, current interface{}) {
events <- event{}
}
checkEvents := func(count int) {
for i := 0; i < count; i++ {
<-events
}
}
store, controller := NewTransformingInformer(
source,
&v1.Pod{},
5*time.Millisecond,
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
},
podTransformer,
)
stopCh := make(chan struct{})
go controller.Run(stopCh)
checkEvents(numObjs)
// Periodically fetch objects to ensure no access races.
wg := sync.WaitGroup{}
errors := make(chan error, numObjs)
for i := 0; i < numObjs; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
key := fmt.Sprintf("namespace/pod-%d", index)
for {
select {
case <-stopCh:
return
default:
}
obj, ok, err := store.GetByKey(key)
if !ok || err != nil {
errors <- fmt.Errorf("couldn't get the object for %v", key)
return
}
pod := obj.(*v1.Pod)
if pod.ObjectMeta.Labels[label] != "false" {
errors <- fmt.Errorf("unexpected object: %#v", pod)
return
}
}
}(i)
}
// Let resyncs to happen for some time.
time.Sleep(time.Second)
close(stopCh)
wg.Wait()
close(errors)
for err := range errors {
t.Error(err)
}
if badTransform.Load() {
t.Errorf("unexpected transformation happened")
}
}
func TestDeletionHandlingObjectToName(t *testing.T) {
cm := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{