Merge pull request #124344 from wojtek-t/fix_transformer

Fix race in informer transformers

Kubernetes-commit: 0b15f8cbfc611889f703495ee845b3c54bdd759d
This commit is contained in:
Kubernetes Publisher 2024-04-24 12:34:53 -07:00
commit 6b47d7dcbe
2 changed files with 142 additions and 20 deletions

View File

@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
@ -575,6 +576,114 @@ func TestTransformingInformer(t *testing.T) {
close(stopCh)
}
func TestTransformingInformerRace(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
label := "to-be-transformed"
makePod := func(name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "namespace",
Labels: map[string]string{label: "true"},
},
Spec: v1.PodSpec{
Hostname: "hostname",
},
}
}
badTransform := atomic.Bool{}
podTransformer := func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object type: %T", obj)
}
if pod.ObjectMeta.Labels[label] != "true" {
badTransform.Store(true)
return nil, fmt.Errorf("object already transformed: %#v", obj)
}
pod.ObjectMeta.Labels[label] = "false"
return pod, nil
}
numObjs := 5
for i := 0; i < numObjs; i++ {
source.Add(makePod(fmt.Sprintf("pod-%d", i)))
}
type event struct{}
events := make(chan event, numObjs)
recordEvent := func(eventType watch.EventType, previous, current interface{}) {
events <- event{}
}
checkEvents := func(count int) {
for i := 0; i < count; i++ {
<-events
}
}
store, controller := NewTransformingInformer(
source,
&v1.Pod{},
5*time.Millisecond,
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
},
podTransformer,
)
stopCh := make(chan struct{})
go controller.Run(stopCh)
checkEvents(numObjs)
// Periodically fetch objects to ensure no access races.
wg := sync.WaitGroup{}
errors := make(chan error, numObjs)
for i := 0; i < numObjs; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
key := fmt.Sprintf("namespace/pod-%d", index)
for {
select {
case <-stopCh:
return
default:
}
obj, ok, err := store.GetByKey(key)
if !ok || err != nil {
errors <- fmt.Errorf("couldn't get the object for %v", key)
return
}
pod := obj.(*v1.Pod)
if pod.ObjectMeta.Labels[label] != "false" {
errors <- fmt.Errorf("unexpected object: %#v", pod)
return
}
}
}(i)
}
// Let resyncs to happen for some time.
time.Sleep(time.Second)
close(stopCh)
wg.Wait()
close(errors)
for err := range errors {
t.Error(err)
}
if badTransform.Load() {
t.Errorf("unexpected transformation happened")
}
}
func TestDeletionHandlingObjectToName(t *testing.T) {
cm := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{

View File

@ -139,20 +139,17 @@ type DeltaFIFO struct {
}
// TransformFunc allows for transforming an object before it will be processed.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
//
// New in v1.27: In such cases, the contained object will already have gone
// through the transform object separately (when it was added / updated prior
// to the delete), so the TransformFunc can likely safely ignore such objects
// (i.e., just return the input object).
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
//
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
// sees the object before any other actor, and it is now safe to mutate the
// object in place instead of making a copy.
// New in v1.27: TransformFunc sees the object before any other actor, and it
// is now safe to mutate the object in place instead of making a copy.
//
// It's recommended for the TransformFunc to be idempotent.
// It MUST be idempotent if objects already present in the cache are passed to
// the Replace() to avoid re-mutating them. Default informers do not pass
// existing objects to Replace though.
//
// Note that TransformFunc is called while inserting objects into the
// notification queue and is therefore extremely performance sensitive; please
@ -440,22 +437,38 @@ func isDeletionDup(a, b *Delta) *Delta {
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
return f.queueActionInternalLocked(actionType, actionType, obj)
}
// queueActionInternalLocked appends to the delta list for the object.
// The actionType is emitted and must honor emitDeltaTypeReplaced.
// The internalActionType is only used within this function and must
// ignore emitDeltaTypeReplaced.
// Caller must lock first.
func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// Every object comes through this code path once, so this is a good
// place to call the transform func. If obj is a
// DeletedFinalStateUnknown tombstone, then the containted inner object
// will already have gone through the transformer, but we document that
// this can happen. In cases involving Replace(), such an object can
// come through multiple times.
// place to call the transform func.
//
// If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync,
// then the object have already gone through the transformer.
//
// If the objects already present in the cache are passed to Replace(),
// the transformer must be idempotent to avoid re-mutating them,
// or coordinate with all readers from the cache to avoid data races.
// Default informers do not pass existing objects to Replace.
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
_, isTombstone := obj.(DeletedFinalStateUnknown)
if !isTombstone && internalActionType != Sync {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
}
@ -638,7 +651,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
if err := f.queueActionInternalLocked(action, Replaced, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}