From cafc640bfa0f7362b178b1b896085962d018afe3 Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Fri, 3 May 2019 23:50:53 -0400 Subject: [PATCH] NewIndexerInformerWatcher: fix goroutine leak There was some weird queuing going on. The queue was implement by spawning goroutines that would block on the "ticketer" until it was their turn to write to the output channel. If N events where in the watch when the consumer of the watch stopped reading events, N goroutines would leak. In unit tests of the certificate manager, this was causing ~10k goroutines to leak. Fix it with a buffering event processor that uses only one routine and cancels correctly. --- .../client-go/tools/watch/informerwatcher.go | 133 +++++++++++------- .../tools/watch/informerwatcher_test.go | 122 +++++++++++----- 2 files changed, 166 insertions(+), 89 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go index 4ccc4b49a9c..4e0a400bb55 100644 --- a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go @@ -18,42 +18,86 @@ package watch import ( "sync" - "sync/atomic" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) -func newTicketer() *ticketer { - return &ticketer{ +func newEventProcessor(out chan<- watch.Event) *eventProcessor { + return &eventProcessor{ + out: out, cond: sync.NewCond(&sync.Mutex{}), + done: make(chan struct{}), } } -type ticketer struct { - counter uint64 +// eventProcessor buffers events and writes them to an out chan when a reader +// is waiting. Because of the requirement to buffer events, it synchronizes +// input with a condition, and synchronizes output with a channels. It needs to +// be able to yield while both waiting on an input condition and while blocked +// on writing to the output channel. +type eventProcessor struct { + out chan<- watch.Event - cond *sync.Cond - current uint64 + cond *sync.Cond + buff []watch.Event + + done chan struct{} } -func (t *ticketer) GetTicket() uint64 { - // -1 to start from 0 - return atomic.AddUint64(&t.counter, 1) - 1 +func (e *eventProcessor) run() { + for { + batch := e.takeBatch() + e.writeBatch(batch) + if e.stopped() { + return + } + } } -func (t *ticketer) WaitForTicket(ticket uint64, f func()) { - t.cond.L.Lock() - defer t.cond.L.Unlock() - for ticket != t.current { - t.cond.Wait() +func (e *eventProcessor) takeBatch() []watch.Event { + e.cond.L.Lock() + defer e.cond.L.Unlock() + + for len(e.buff) == 0 && !e.stopped() { + e.cond.Wait() } - f() + batch := e.buff + e.buff = nil + return batch +} - t.current++ - t.cond.Broadcast() +func (e *eventProcessor) writeBatch(events []watch.Event) { + for _, event := range events { + select { + case e.out <- event: + case <-e.done: + return + } + } +} + +func (e *eventProcessor) push(event watch.Event) { + e.cond.L.Lock() + defer e.cond.L.Unlock() + defer e.cond.Signal() + e.buff = append(e.buff, event) +} + +func (e *eventProcessor) stopped() bool { + select { + case <-e.done: + return true + default: + return false + } +} + +func (e *eventProcessor) stop() { + close(e.done) + e.cond.Signal() } // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface @@ -61,55 +105,44 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) { // it also returns a channel you can use to wait for the informers to fully shutdown. func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { ch := make(chan watch.Event) - doneCh := make(chan struct{}) w := watch.NewProxyWatcher(ch) - t := newTicketer() + e := newEventProcessor(ch) indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - go t.WaitForTicket(t.GetTicket(), func() { - select { - case ch <- watch.Event{ - Type: watch.Added, - Object: obj.(runtime.Object), - }: - case <-w.StopChan(): - } + e.push(watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), }) }, UpdateFunc: func(old, new interface{}) { - go t.WaitForTicket(t.GetTicket(), func() { - select { - case ch <- watch.Event{ - Type: watch.Modified, - Object: new.(runtime.Object), - }: - case <-w.StopChan(): - } + e.push(watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), }) }, DeleteFunc: func(obj interface{}) { - go t.WaitForTicket(t.GetTicket(), func() { - staleObj, stale := obj.(cache.DeletedFinalStateUnknown) - if stale { - // We have no means of passing the additional information down using watch API based on watch.Event - // but the caller can filter such objects by checking if metadata.deletionTimestamp is set - obj = staleObj - } + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using + // watch API based on watch.Event but the caller can filter such + // objects by checking if metadata.deletionTimestamp is set + obj = staleObj + } - select { - case ch <- watch.Event{ - Type: watch.Deleted, - Object: obj.(runtime.Object), - }: - case <-w.StopChan(): - } + e.push(watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), }) }, }, cache.Indexers{}) + go e.run() + + doneCh := make(chan struct{}) go func() { defer close(doneCh) + defer e.stop() informer.Run(w.StopChan()) }() diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go index 051898654f1..d6e1e8e223f 100644 --- a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go @@ -17,8 +17,9 @@ limitations under the License. package watch import ( - "math/rand" + "context" "reflect" + goruntime "runtime" "sort" "testing" "time" @@ -28,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/watch" fakeclientset "k8s.io/client-go/kubernetes/fake" @@ -35,6 +37,86 @@ import ( "k8s.io/client-go/tools/cache" ) +// TestEventProcessorExit is expected to timeout if the event processor fails +// to exit when stopped. +func TestEventProcessorExit(t *testing.T) { + event := watch.Event{} + + tests := []struct { + name string + write func(e *eventProcessor) + }{ + { + name: "exit on blocked read", + write: func(e *eventProcessor) { + e.push(event) + }, + }, + { + name: "exit on blocked write", + write: func(e *eventProcessor) { + e.push(event) + e.push(event) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + out := make(chan watch.Event) + e := newEventProcessor(out) + + test.write(e) + + exited := make(chan struct{}) + go func() { + e.run() + close(exited) + }() + + <-out + e.stop() + goruntime.Gosched() + <-exited + }) + } +} + +type apiInt int + +func (apiInt) GetObjectKind() schema.ObjectKind { return nil } +func (apiInt) DeepCopyObject() runtime.Object { return nil } + +func TestEventProcessorOrdersEvents(t *testing.T) { + out := make(chan watch.Event) + e := newEventProcessor(out) + go e.run() + + numProcessed := 0 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + go func() { + for i := 0; i < 1000; i++ { + e := <-out + if got, want := int(e.Object.(apiInt)), i; got != want { + t.Errorf("unexpected event: got=%d, want=%d", got, want) + } + numProcessed++ + } + cancel() + }() + + for i := 0; i < 1000; i++ { + e.push(watch.Event{Object: apiInt(i)}) + } + + <-ctx.Done() + e.stop() + + if numProcessed != 1000 { + t.Errorf("unexpected number of events processed: %d", numProcessed) + } + +} + type byEventTypeAndName []watch.Event func (a byEventTypeAndName) Len() int { return len(a) } @@ -51,44 +133,6 @@ func (a byEventTypeAndName) Less(i, j int) bool { return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name } -func TestTicketer(t *testing.T) { - tg := newTicketer() - - const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines - var tickets []uint64 - for i := 0; i < numTickets; i++ { - ticket := tg.GetTicket() - tickets = append(tickets, ticket) - - exp, got := uint64(i), ticket - if got != exp { - t.Fatalf("expected ticket %d, got %d", exp, got) - } - } - - // shuffle tickets - rand.Shuffle(len(tickets), func(i, j int) { - tickets[i], tickets[j] = tickets[j], tickets[i] - }) - - res := make(chan uint64, len(tickets)) - for _, ticket := range tickets { - go func(ticket uint64) { - time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) - tg.WaitForTicket(ticket, func() { - res <- ticket - }) - }(ticket) - } - - for i := 0; i < numTickets; i++ { - exp, got := uint64(i), <-res - if got != exp { - t.Fatalf("expected ticket %d, got %d", exp, got) - } - } -} - func TestNewInformerWatcher(t *testing.T) { // Make sure there are no 2 same types of events on a secret with the same name or that might be flaky. tt := []struct {