mirror of
https://github.com/kubernetes/client-go.git
synced 2026-01-29 21:38:35 +00:00
There was some weird queuing going on. The queue was implement by spawning goroutines that would block on the "ticketer" until it was their turn to write to the output channel. If N events where in the watch when the consumer of the watch stopped reading events, N goroutines would leak. In unit tests of the certificate manager, this was causing ~10k goroutines to leak. Fix it with a buffering event processor that uses only one routine and cancels correctly. Kubernetes-commit: cafc640bfa0f7362b178b1b896085962d018afe3
151 lines
3.6 KiB
Go
151 lines
3.6 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package watch
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
func newEventProcessor(out chan<- watch.Event) *eventProcessor {
|
|
return &eventProcessor{
|
|
out: out,
|
|
cond: sync.NewCond(&sync.Mutex{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// eventProcessor buffers events and writes them to an out chan when a reader
|
|
// is waiting. Because of the requirement to buffer events, it synchronizes
|
|
// input with a condition, and synchronizes output with a channels. It needs to
|
|
// be able to yield while both waiting on an input condition and while blocked
|
|
// on writing to the output channel.
|
|
type eventProcessor struct {
|
|
out chan<- watch.Event
|
|
|
|
cond *sync.Cond
|
|
buff []watch.Event
|
|
|
|
done chan struct{}
|
|
}
|
|
|
|
func (e *eventProcessor) run() {
|
|
for {
|
|
batch := e.takeBatch()
|
|
e.writeBatch(batch)
|
|
if e.stopped() {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *eventProcessor) takeBatch() []watch.Event {
|
|
e.cond.L.Lock()
|
|
defer e.cond.L.Unlock()
|
|
|
|
for len(e.buff) == 0 && !e.stopped() {
|
|
e.cond.Wait()
|
|
}
|
|
|
|
batch := e.buff
|
|
e.buff = nil
|
|
return batch
|
|
}
|
|
|
|
func (e *eventProcessor) writeBatch(events []watch.Event) {
|
|
for _, event := range events {
|
|
select {
|
|
case e.out <- event:
|
|
case <-e.done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *eventProcessor) push(event watch.Event) {
|
|
e.cond.L.Lock()
|
|
defer e.cond.L.Unlock()
|
|
defer e.cond.Signal()
|
|
e.buff = append(e.buff, event)
|
|
}
|
|
|
|
func (e *eventProcessor) stopped() bool {
|
|
select {
|
|
case <-e.done:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (e *eventProcessor) stop() {
|
|
close(e.done)
|
|
e.cond.Signal()
|
|
}
|
|
|
|
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
|
|
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
|
|
// it also returns a channel you can use to wait for the informers to fully shutdown.
|
|
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
|
|
ch := make(chan watch.Event)
|
|
w := watch.NewProxyWatcher(ch)
|
|
e := newEventProcessor(ch)
|
|
|
|
indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
e.push(watch.Event{
|
|
Type: watch.Added,
|
|
Object: obj.(runtime.Object),
|
|
})
|
|
},
|
|
UpdateFunc: func(old, new interface{}) {
|
|
e.push(watch.Event{
|
|
Type: watch.Modified,
|
|
Object: new.(runtime.Object),
|
|
})
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
|
|
if stale {
|
|
// We have no means of passing the additional information down using
|
|
// watch API based on watch.Event but the caller can filter such
|
|
// objects by checking if metadata.deletionTimestamp is set
|
|
obj = staleObj
|
|
}
|
|
|
|
e.push(watch.Event{
|
|
Type: watch.Deleted,
|
|
Object: obj.(runtime.Object),
|
|
})
|
|
},
|
|
}, cache.Indexers{})
|
|
|
|
go e.run()
|
|
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
defer e.stop()
|
|
informer.Run(w.StopChan())
|
|
}()
|
|
|
|
return indexer, informer, w, doneCh
|
|
}
|