From ba0da5aa4d3dfd1236b1df0ee2036a7fa5189228 Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Fri, 3 May 2019 23:50:53 -0400 Subject: [PATCH] NewIndexerInformerWatcher: fix goroutine leak There was some weird queuing going on. The queue was implement by spawning goroutines that would block on the "ticketer" until it was their turn to write to the output channel. If N events where in the watch when the consumer of the watch stopped reading events, N goroutines would leak. In unit tests of the certificate manager, this was causing ~10k goroutines to leak. Fix it with a buffering event processor that uses only one routine and cancels correctly. Kubernetes-commit: cafc640bfa0f7362b178b1b896085962d018afe3 --- tools/watch/informerwatcher.go | 133 +++++++++++++++++----------- tools/watch/informerwatcher_test.go | 122 +++++++++++++++++-------- 2 files changed, 166 insertions(+), 89 deletions(-) diff --git a/tools/watch/informerwatcher.go b/tools/watch/informerwatcher.go index 4ccc4b49..4e0a400b 100644 --- a/tools/watch/informerwatcher.go +++ b/tools/watch/informerwatcher.go @@ -18,42 +18,86 @@ package watch import ( "sync" - "sync/atomic" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) -func newTicketer() *ticketer { - return &ticketer{ +func newEventProcessor(out chan<- watch.Event) *eventProcessor { + return &eventProcessor{ + out: out, cond: sync.NewCond(&sync.Mutex{}), + done: make(chan struct{}), } } -type ticketer struct { - counter uint64 +// eventProcessor buffers events and writes them to an out chan when a reader +// is waiting. Because of the requirement to buffer events, it synchronizes +// input with a condition, and synchronizes output with a channels. It needs to +// be able to yield while both waiting on an input condition and while blocked +// on writing to the output channel. +type eventProcessor struct { + out chan<- watch.Event - cond *sync.Cond - current uint64 + cond *sync.Cond + buff []watch.Event + + done chan struct{} } -func (t *ticketer) GetTicket() uint64 { - // -1 to start from 0 - return atomic.AddUint64(&t.counter, 1) - 1 +func (e *eventProcessor) run() { + for { + batch := e.takeBatch() + e.writeBatch(batch) + if e.stopped() { + return + } + } } -func (t *ticketer) WaitForTicket(ticket uint64, f func()) { - t.cond.L.Lock() - defer t.cond.L.Unlock() - for ticket != t.current { - t.cond.Wait() +func (e *eventProcessor) takeBatch() []watch.Event { + e.cond.L.Lock() + defer e.cond.L.Unlock() + + for len(e.buff) == 0 && !e.stopped() { + e.cond.Wait() } - f() + batch := e.buff + e.buff = nil + return batch +} - t.current++ - t.cond.Broadcast() +func (e *eventProcessor) writeBatch(events []watch.Event) { + for _, event := range events { + select { + case e.out <- event: + case <-e.done: + return + } + } +} + +func (e *eventProcessor) push(event watch.Event) { + e.cond.L.Lock() + defer e.cond.L.Unlock() + defer e.cond.Signal() + e.buff = append(e.buff, event) +} + +func (e *eventProcessor) stopped() bool { + select { + case <-e.done: + return true + default: + return false + } +} + +func (e *eventProcessor) stop() { + close(e.done) + e.cond.Signal() } // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface @@ -61,55 +105,44 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) { // it also returns a channel you can use to wait for the informers to fully shutdown. func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { ch := make(chan watch.Event) - doneCh := make(chan struct{}) w := watch.NewProxyWatcher(ch) - t := newTicketer() + e := newEventProcessor(ch) indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - go t.WaitForTicket(t.GetTicket(), func() { - select { - case ch <- watch.Event{ - Type: watch.Added, - Object: obj.(runtime.Object), - }: - case <-w.StopChan(): - } + e.push(watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), }) }, UpdateFunc: func(old, new interface{}) { - go t.WaitForTicket(t.GetTicket(), func() { - select { - case ch <- watch.Event{ - Type: watch.Modified, - Object: new.(runtime.Object), - }: - case <-w.StopChan(): - } + e.push(watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), }) }, DeleteFunc: func(obj interface{}) { - go t.WaitForTicket(t.GetTicket(), func() { - staleObj, stale := obj.(cache.DeletedFinalStateUnknown) - if stale { - // We have no means of passing the additional information down using watch API based on watch.Event - // but the caller can filter such objects by checking if metadata.deletionTimestamp is set - obj = staleObj - } + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using + // watch API based on watch.Event but the caller can filter such + // objects by checking if metadata.deletionTimestamp is set + obj = staleObj + } - select { - case ch <- watch.Event{ - Type: watch.Deleted, - Object: obj.(runtime.Object), - }: - case <-w.StopChan(): - } + e.push(watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), }) }, }, cache.Indexers{}) + go e.run() + + doneCh := make(chan struct{}) go func() { defer close(doneCh) + defer e.stop() informer.Run(w.StopChan()) }() diff --git a/tools/watch/informerwatcher_test.go b/tools/watch/informerwatcher_test.go index 05189865..d6e1e8e2 100644 --- a/tools/watch/informerwatcher_test.go +++ b/tools/watch/informerwatcher_test.go @@ -17,8 +17,9 @@ limitations under the License. package watch import ( - "math/rand" + "context" "reflect" + goruntime "runtime" "sort" "testing" "time" @@ -28,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/watch" fakeclientset "k8s.io/client-go/kubernetes/fake" @@ -35,6 +37,86 @@ import ( "k8s.io/client-go/tools/cache" ) +// TestEventProcessorExit is expected to timeout if the event processor fails +// to exit when stopped. +func TestEventProcessorExit(t *testing.T) { + event := watch.Event{} + + tests := []struct { + name string + write func(e *eventProcessor) + }{ + { + name: "exit on blocked read", + write: func(e *eventProcessor) { + e.push(event) + }, + }, + { + name: "exit on blocked write", + write: func(e *eventProcessor) { + e.push(event) + e.push(event) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + out := make(chan watch.Event) + e := newEventProcessor(out) + + test.write(e) + + exited := make(chan struct{}) + go func() { + e.run() + close(exited) + }() + + <-out + e.stop() + goruntime.Gosched() + <-exited + }) + } +} + +type apiInt int + +func (apiInt) GetObjectKind() schema.ObjectKind { return nil } +func (apiInt) DeepCopyObject() runtime.Object { return nil } + +func TestEventProcessorOrdersEvents(t *testing.T) { + out := make(chan watch.Event) + e := newEventProcessor(out) + go e.run() + + numProcessed := 0 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + go func() { + for i := 0; i < 1000; i++ { + e := <-out + if got, want := int(e.Object.(apiInt)), i; got != want { + t.Errorf("unexpected event: got=%d, want=%d", got, want) + } + numProcessed++ + } + cancel() + }() + + for i := 0; i < 1000; i++ { + e.push(watch.Event{Object: apiInt(i)}) + } + + <-ctx.Done() + e.stop() + + if numProcessed != 1000 { + t.Errorf("unexpected number of events processed: %d", numProcessed) + } + +} + type byEventTypeAndName []watch.Event func (a byEventTypeAndName) Len() int { return len(a) } @@ -51,44 +133,6 @@ func (a byEventTypeAndName) Less(i, j int) bool { return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name } -func TestTicketer(t *testing.T) { - tg := newTicketer() - - const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines - var tickets []uint64 - for i := 0; i < numTickets; i++ { - ticket := tg.GetTicket() - tickets = append(tickets, ticket) - - exp, got := uint64(i), ticket - if got != exp { - t.Fatalf("expected ticket %d, got %d", exp, got) - } - } - - // shuffle tickets - rand.Shuffle(len(tickets), func(i, j int) { - tickets[i], tickets[j] = tickets[j], tickets[i] - }) - - res := make(chan uint64, len(tickets)) - for _, ticket := range tickets { - go func(ticket uint64) { - time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) - tg.WaitForTicket(ticket, func() { - res <- ticket - }) - }(ticket) - } - - for i := 0; i < numTickets; i++ { - exp, got := uint64(i), <-res - if got != exp { - t.Fatalf("expected ticket %d, got %d", exp, got) - } - } -} - func TestNewInformerWatcher(t *testing.T) { // Make sure there are no 2 same types of events on a secret with the same name or that might be flaky. tt := []struct {