mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Merge pull request #61195 from grantr/use-race-free-fake-watcher
Automatic merge from submit-queue (batch tested with PRs 61195, 61479). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Use RaceFreeFakeWatcher in ObjectTracker to fix racy watch panics **What this PR does / why we need it**: The `FakeWatcher` added to `ObjectTracker` in #57504 allows sends on the result channel after it's closed; for example calling `Stop()` then `Add(obj)` will cause a panic. In my experience this has led to flaky tests when informers and controllers are running. Replacing `FakeWatcher` with `RaceFreeFakeWatcher` fixes the problem, since `RaceFreeFakeWatcher` ignores additional events that occur after the watcher is stopped. It also panics instead of blocking when the result channel is full, which seems like a more useful behavior in tests than blocking. I removed the `FakeWatchBufferSize` constant since `RaceFreeFakeWatcher` doesn't take a buffer size argument anymore. This seems fine since the `DefaultChanSize` constant is close to the `FakeWatchBufferSize` value (100 vs 128). **Special notes for your reviewer**: I can provide a minimal repro of a flaky test caused by the earlier behavior if necessary. **Release note**: ```release-note Fix racy panics when using fake watches with ObjectTracker ```
This commit is contained in:
commit
f8981147e2
@ -29,11 +29,6 @@ import (
|
|||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FakeWatchBufferSize is the max num of watch event can be buffered in the
|
|
||||||
// watch channel. Note that when watch event overflows or exceed this buffer
|
|
||||||
// size, manipulations via fake client may be blocked.
|
|
||||||
const FakeWatchBufferSize = 128
|
|
||||||
|
|
||||||
// ObjectTracker keeps track of objects. It is intended to be used to
|
// ObjectTracker keeps track of objects. It is intended to be used to
|
||||||
// fake calls to a server by returning objects based on their kind,
|
// fake calls to a server by returning objects based on their kind,
|
||||||
// namespace and name.
|
// namespace and name.
|
||||||
@ -142,12 +137,11 @@ type tracker struct {
|
|||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
objects map[schema.GroupVersionResource][]runtime.Object
|
objects map[schema.GroupVersionResource][]runtime.Object
|
||||||
// The value type of watchers is a map of which the key is either a namespace or
|
// The value type of watchers is a map of which the key is either a namespace or
|
||||||
// all/non namespace aka "" and its value is list of fake watchers. Each of
|
// all/non namespace aka "" and its value is list of fake watchers.
|
||||||
// fake watcher holds a buffered channel of size "FakeWatchBufferSize" which
|
// Manipulations on resources will broadcast the notification events into the
|
||||||
// is default to 128. Manipulations on resources will broadcast the notification
|
// watchers' channel. Note that too many unhandled events (currently 100,
|
||||||
// events into the watchers' channel and note that too many unhandled event may
|
// see apimachinery/pkg/watch.DefaultChanSize) will cause a panic.
|
||||||
// potentially block the tracker.
|
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
|
||||||
watchers map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ ObjectTracker = &tracker{}
|
var _ ObjectTracker = &tracker{}
|
||||||
@ -159,7 +153,7 @@ func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracke
|
|||||||
scheme: scheme,
|
scheme: scheme,
|
||||||
decoder: decoder,
|
decoder: decoder,
|
||||||
objects: make(map[schema.GroupVersionResource][]runtime.Object),
|
objects: make(map[schema.GroupVersionResource][]runtime.Object),
|
||||||
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher),
|
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,10 +200,10 @@ func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Inter
|
|||||||
t.lock.Lock()
|
t.lock.Lock()
|
||||||
defer t.lock.Unlock()
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
fakewatcher := watch.NewFakeWithChanSize(FakeWatchBufferSize, true)
|
fakewatcher := watch.NewRaceFreeFake()
|
||||||
|
|
||||||
if _, exists := t.watchers[gvr]; !exists {
|
if _, exists := t.watchers[gvr]; !exists {
|
||||||
t.watchers[gvr] = make(map[string][]*watch.FakeWatcher)
|
t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
|
||||||
}
|
}
|
||||||
t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
|
t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
|
||||||
return fakewatcher, nil
|
return fakewatcher, nil
|
||||||
@ -293,8 +287,8 @@ func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns
|
|||||||
return t.add(gvr, obj, ns, true)
|
return t.add(gvr, obj, ns, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.FakeWatcher {
|
func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
|
||||||
watches := []*watch.FakeWatcher{}
|
watches := []*watch.RaceFreeFakeWatcher{}
|
||||||
if t.watchers[gvr] != nil {
|
if t.watchers[gvr] != nil {
|
||||||
if w := t.watchers[gvr][ns]; w != nil {
|
if w := t.watchers[gvr][ns]; w != nil {
|
||||||
watches = append(watches, w...)
|
watches = append(watches, w...)
|
||||||
|
@ -190,3 +190,34 @@ func TestWatchCallMultipleInvocation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchAddAfterStop(t *testing.T) {
|
||||||
|
testResource := schema.GroupVersionResource{Group: "", Version: "test_version", Resource: "test_kind"}
|
||||||
|
testObj := getArbitraryResource(testResource, "test_name", "test_namespace")
|
||||||
|
accessor, err := meta.Accessor(testObj)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ns := accessor.GetNamespace()
|
||||||
|
scheme := runtime.NewScheme()
|
||||||
|
codecs := serializer.NewCodecFactory(scheme)
|
||||||
|
o := NewObjectTracker(scheme, codecs.UniversalDecoder())
|
||||||
|
watch, err := o.Watch(testResource, ns)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("watch creation failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// When the watch is stopped it should ignore later events without panicking.
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
t.Errorf("Watch panicked when it should have ignored create after stop: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
watch.Stop()
|
||||||
|
err = o.Create(testResource, testObj, ns)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("test resource creation failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user