Merge pull request #77434 from mikedanese/watching

NewIndexerInformerWatcher: fix goroutine leak

Kubernetes-commit: 4fed75302a869c1c633e482af5e9d5fa3966fba6
This commit is contained in:
Kubernetes Publisher 2019-05-28 10:59:06 -07:00
commit d2583122ce
2 changed files with 166 additions and 89 deletions

View File

@ -18,42 +18,86 @@ package watch
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
func newTicketer() *ticketer {
return &ticketer{
func newEventProcessor(out chan<- watch.Event) *eventProcessor {
return &eventProcessor{
out: out,
cond: sync.NewCond(&sync.Mutex{}),
done: make(chan struct{}),
}
}
type ticketer struct {
counter uint64
// eventProcessor buffers events and writes them to an out chan when a reader
// is waiting. Because of the requirement to buffer events, it synchronizes
// input with a condition, and synchronizes output with a channels. It needs to
// be able to yield while both waiting on an input condition and while blocked
// on writing to the output channel.
type eventProcessor struct {
out chan<- watch.Event
cond *sync.Cond
current uint64
cond *sync.Cond
buff []watch.Event
done chan struct{}
}
func (t *ticketer) GetTicket() uint64 {
// -1 to start from 0
return atomic.AddUint64(&t.counter, 1) - 1
func (e *eventProcessor) run() {
for {
batch := e.takeBatch()
e.writeBatch(batch)
if e.stopped() {
return
}
}
}
func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
t.cond.L.Lock()
defer t.cond.L.Unlock()
for ticket != t.current {
t.cond.Wait()
func (e *eventProcessor) takeBatch() []watch.Event {
e.cond.L.Lock()
defer e.cond.L.Unlock()
for len(e.buff) == 0 && !e.stopped() {
e.cond.Wait()
}
f()
batch := e.buff
e.buff = nil
return batch
}
t.current++
t.cond.Broadcast()
func (e *eventProcessor) writeBatch(events []watch.Event) {
for _, event := range events {
select {
case e.out <- event:
case <-e.done:
return
}
}
}
func (e *eventProcessor) push(event watch.Event) {
e.cond.L.Lock()
defer e.cond.L.Unlock()
defer e.cond.Signal()
e.buff = append(e.buff, event)
}
func (e *eventProcessor) stopped() bool {
select {
case <-e.done:
return true
default:
return false
}
}
func (e *eventProcessor) stop() {
close(e.done)
e.cond.Signal()
}
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
@ -61,55 +105,44 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
// it also returns a channel you can use to wait for the informers to fully shutdown.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
ch := make(chan watch.Event)
doneCh := make(chan struct{})
w := watch.NewProxyWatcher(ch)
t := newTicketer()
e := newEventProcessor(ch)
indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Added,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
e.push(watch.Event{
Type: watch.Added,
Object: obj.(runtime.Object),
})
},
UpdateFunc: func(old, new interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Modified,
Object: new.(runtime.Object),
}:
case <-w.StopChan():
}
e.push(watch.Event{
Type: watch.Modified,
Object: new.(runtime.Object),
})
},
DeleteFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
if stale {
// We have no means of passing the additional information down using watch API based on watch.Event
// but the caller can filter such objects by checking if metadata.deletionTimestamp is set
obj = staleObj
}
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
if stale {
// We have no means of passing the additional information down using
// watch API based on watch.Event but the caller can filter such
// objects by checking if metadata.deletionTimestamp is set
obj = staleObj
}
select {
case ch <- watch.Event{
Type: watch.Deleted,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
e.push(watch.Event{
Type: watch.Deleted,
Object: obj.(runtime.Object),
})
},
}, cache.Indexers{})
go e.run()
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
defer e.stop()
informer.Run(w.StopChan())
}()

View File

@ -17,8 +17,9 @@ limitations under the License.
package watch
import (
"math/rand"
"context"
"reflect"
goruntime "runtime"
"sort"
"testing"
"time"
@ -28,6 +29,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/watch"
fakeclientset "k8s.io/client-go/kubernetes/fake"
@ -35,6 +37,86 @@ import (
"k8s.io/client-go/tools/cache"
)
// TestEventProcessorExit is expected to timeout if the event processor fails
// to exit when stopped.
func TestEventProcessorExit(t *testing.T) {
event := watch.Event{}
tests := []struct {
name string
write func(e *eventProcessor)
}{
{
name: "exit on blocked read",
write: func(e *eventProcessor) {
e.push(event)
},
},
{
name: "exit on blocked write",
write: func(e *eventProcessor) {
e.push(event)
e.push(event)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
out := make(chan watch.Event)
e := newEventProcessor(out)
test.write(e)
exited := make(chan struct{})
go func() {
e.run()
close(exited)
}()
<-out
e.stop()
goruntime.Gosched()
<-exited
})
}
}
type apiInt int
func (apiInt) GetObjectKind() schema.ObjectKind { return nil }
func (apiInt) DeepCopyObject() runtime.Object { return nil }
func TestEventProcessorOrdersEvents(t *testing.T) {
out := make(chan watch.Event)
e := newEventProcessor(out)
go e.run()
numProcessed := 0
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
go func() {
for i := 0; i < 1000; i++ {
e := <-out
if got, want := int(e.Object.(apiInt)), i; got != want {
t.Errorf("unexpected event: got=%d, want=%d", got, want)
}
numProcessed++
}
cancel()
}()
for i := 0; i < 1000; i++ {
e.push(watch.Event{Object: apiInt(i)})
}
<-ctx.Done()
e.stop()
if numProcessed != 1000 {
t.Errorf("unexpected number of events processed: %d", numProcessed)
}
}
type byEventTypeAndName []watch.Event
func (a byEventTypeAndName) Len() int { return len(a) }
@ -51,44 +133,6 @@ func (a byEventTypeAndName) Less(i, j int) bool {
return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
}
func TestTicketer(t *testing.T) {
tg := newTicketer()
const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines
var tickets []uint64
for i := 0; i < numTickets; i++ {
ticket := tg.GetTicket()
tickets = append(tickets, ticket)
exp, got := uint64(i), ticket
if got != exp {
t.Fatalf("expected ticket %d, got %d", exp, got)
}
}
// shuffle tickets
rand.Shuffle(len(tickets), func(i, j int) {
tickets[i], tickets[j] = tickets[j], tickets[i]
})
res := make(chan uint64, len(tickets))
for _, ticket := range tickets {
go func(ticket uint64) {
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
tg.WaitForTicket(ticket, func() {
res <- ticket
})
}(ticket)
}
for i := 0; i < numTickets; i++ {
exp, got := uint64(i), <-res
if got != exp {
t.Fatalf("expected ticket %d, got %d", exp, got)
}
}
}
func TestNewInformerWatcher(t *testing.T) {
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
tt := []struct {