diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go index 4ccc4b49a9c..4e0a400bb55 100644 --- a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go @@ -18,42 +18,86 @@ package watch import ( "sync" - "sync/atomic" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) -func newTicketer() *ticketer { - return &ticketer{ +func newEventProcessor(out chan<- watch.Event) *eventProcessor { + return &eventProcessor{ + out: out, cond: sync.NewCond(&sync.Mutex{}), + done: make(chan struct{}), } } -type ticketer struct { - counter uint64 +// eventProcessor buffers events and writes them to an out chan when a reader +// is waiting. Because of the requirement to buffer events, it synchronizes +// input with a condition, and synchronizes output with a channels. It needs to +// be able to yield while both waiting on an input condition and while blocked +// on writing to the output channel. +type eventProcessor struct { + out chan<- watch.Event - cond *sync.Cond - current uint64 + cond *sync.Cond + buff []watch.Event + + done chan struct{} } -func (t *ticketer) GetTicket() uint64 { - // -1 to start from 0 - return atomic.AddUint64(&t.counter, 1) - 1 +func (e *eventProcessor) run() { + for { + batch := e.takeBatch() + e.writeBatch(batch) + if e.stopped() { + return + } + } } -func (t *ticketer) WaitForTicket(ticket uint64, f func()) { - t.cond.L.Lock() - defer t.cond.L.Unlock() - for ticket != t.current { - t.cond.Wait() +func (e *eventProcessor) takeBatch() []watch.Event { + e.cond.L.Lock() + defer e.cond.L.Unlock() + + for len(e.buff) == 0 && !e.stopped() { + e.cond.Wait() } - f() + batch := e.buff + e.buff = nil + return batch +} - t.current++ - t.cond.Broadcast() +func (e *eventProcessor) writeBatch(events []watch.Event) { + for _, event := range events { + select { + case e.out <- event: + case <-e.done: + return + } + } +} + +func (e *eventProcessor) push(event watch.Event) { + e.cond.L.Lock() + defer e.cond.L.Unlock() + defer e.cond.Signal() + e.buff = append(e.buff, event) +} + +func (e *eventProcessor) stopped() bool { + select { + case <-e.done: + return true + default: + return false + } +} + +func (e *eventProcessor) stop() { + close(e.done) + e.cond.Signal() } // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface @@ -61,55 +105,44 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) { // it also returns a channel you can use to wait for the informers to fully shutdown. func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { ch := make(chan watch.Event) - doneCh := make(chan struct{}) w := watch.NewProxyWatcher(ch) - t := newTicketer() + e := newEventProcessor(ch) indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - go t.WaitForTicket(t.GetTicket(), func() { - select { - case ch <- watch.Event{ - Type: watch.Added, - Object: obj.(runtime.Object), - }: - case <-w.StopChan(): - } + e.push(watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), }) }, UpdateFunc: func(old, new interface{}) { - go t.WaitForTicket(t.GetTicket(), func() { - select { - case ch <- watch.Event{ - Type: watch.Modified, - Object: new.(runtime.Object), - }: - case <-w.StopChan(): - } + e.push(watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), }) }, DeleteFunc: func(obj interface{}) { - go t.WaitForTicket(t.GetTicket(), func() { - staleObj, stale := obj.(cache.DeletedFinalStateUnknown) - if stale { - // We have no means of passing the additional information down using watch API based on watch.Event - // but the caller can filter such objects by checking if metadata.deletionTimestamp is set - obj = staleObj - } + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using + // watch API based on watch.Event but the caller can filter such + // objects by checking if metadata.deletionTimestamp is set + obj = staleObj + } - select { - case ch <- watch.Event{ - Type: watch.Deleted, - Object: obj.(runtime.Object), - }: - case <-w.StopChan(): - } + e.push(watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), }) }, }, cache.Indexers{}) + go e.run() + + doneCh := make(chan struct{}) go func() { defer close(doneCh) + defer e.stop() informer.Run(w.StopChan()) }() diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go index 051898654f1..d6e1e8e223f 100644 --- a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go @@ -17,8 +17,9 @@ limitations under the License. package watch import ( - "math/rand" + "context" "reflect" + goruntime "runtime" "sort" "testing" "time" @@ -28,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/watch" fakeclientset "k8s.io/client-go/kubernetes/fake" @@ -35,6 +37,86 @@ import ( "k8s.io/client-go/tools/cache" ) +// TestEventProcessorExit is expected to timeout if the event processor fails +// to exit when stopped. +func TestEventProcessorExit(t *testing.T) { + event := watch.Event{} + + tests := []struct { + name string + write func(e *eventProcessor) + }{ + { + name: "exit on blocked read", + write: func(e *eventProcessor) { + e.push(event) + }, + }, + { + name: "exit on blocked write", + write: func(e *eventProcessor) { + e.push(event) + e.push(event) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + out := make(chan watch.Event) + e := newEventProcessor(out) + + test.write(e) + + exited := make(chan struct{}) + go func() { + e.run() + close(exited) + }() + + <-out + e.stop() + goruntime.Gosched() + <-exited + }) + } +} + +type apiInt int + +func (apiInt) GetObjectKind() schema.ObjectKind { return nil } +func (apiInt) DeepCopyObject() runtime.Object { return nil } + +func TestEventProcessorOrdersEvents(t *testing.T) { + out := make(chan watch.Event) + e := newEventProcessor(out) + go e.run() + + numProcessed := 0 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + go func() { + for i := 0; i < 1000; i++ { + e := <-out + if got, want := int(e.Object.(apiInt)), i; got != want { + t.Errorf("unexpected event: got=%d, want=%d", got, want) + } + numProcessed++ + } + cancel() + }() + + for i := 0; i < 1000; i++ { + e.push(watch.Event{Object: apiInt(i)}) + } + + <-ctx.Done() + e.stop() + + if numProcessed != 1000 { + t.Errorf("unexpected number of events processed: %d", numProcessed) + } + +} + type byEventTypeAndName []watch.Event func (a byEventTypeAndName) Len() int { return len(a) } @@ -51,44 +133,6 @@ func (a byEventTypeAndName) Less(i, j int) bool { return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name } -func TestTicketer(t *testing.T) { - tg := newTicketer() - - const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines - var tickets []uint64 - for i := 0; i < numTickets; i++ { - ticket := tg.GetTicket() - tickets = append(tickets, ticket) - - exp, got := uint64(i), ticket - if got != exp { - t.Fatalf("expected ticket %d, got %d", exp, got) - } - } - - // shuffle tickets - rand.Shuffle(len(tickets), func(i, j int) { - tickets[i], tickets[j] = tickets[j], tickets[i] - }) - - res := make(chan uint64, len(tickets)) - for _, ticket := range tickets { - go func(ticket uint64) { - time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) - tg.WaitForTicket(ticket, func() { - res <- ticket - }) - }(ticket) - } - - for i := 0; i < numTickets; i++ { - exp, got := uint64(i), <-res - if got != exp { - t.Fatalf("expected ticket %d, got %d", exp, got) - } - } -} - func TestNewInformerWatcher(t *testing.T) { // Make sure there are no 2 same types of events on a secret with the same name or that might be flaky. tt := []struct {