Started commenting processors

Kubernetes-commit: 5546d9f12f266a082805ad3a2293fe04e0029c78
This commit is contained in:
Mike Spreitzer
2020-01-06 01:49:29 -05:00
committed by Kubernetes Publisher
parent 2f9f325a3b
commit 1f6e3b32af
5 changed files with 52 additions and 24 deletions

View File

@@ -484,6 +484,12 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
return nil
}
// sharedProcessor has a collection of processorListener and can
// distribute a notification object to its listeners. Each distribute
// operation is `sync` or not. The sync distributions go to a subset
// of the listeners that (a) is recomputed in the occasional calls to
// shouldResync and (b) every listener is initially put in. The
// non-sync distributions go to every listener.
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
@@ -575,6 +581,17 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
}
}
// processorListener relays notifications from a sharedProcessor to
// one ResourceEventHandler --- using two goroutines, two unbuffered
// channels, and an unbounded ring buffer. The `add(notification)`
// function sends the given notification to `addCh`. One goroutine
// runs `pop()`, which pumps notifications from `addCh` to `nextCh`
// using storage in the ring buffer while `nextCh` is not keeping up.
// Another goroutine runs `run()`, which receives notifications from
// `nextCh` and synchronously invokes the appropriate handler method.
//
// processorListener also keeps track of the adjusted requested resync
// period of the listener.
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
@@ -588,11 +605,17 @@ type processorListener struct {
// we should try to do something better.
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
// requestedResyncPeriod is how frequently the listener wants a
// full resync from the shared informer, but is bounded below by
// `minimumResyncPeriod` and the sharedProcessor's
// `resyncCheckPeriod`.
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
// resyncPeriod is the threshold that will be used in the logic
// for this listener. This value does not differ from
// requestedResyncPeriod. The actual time between resyncs depends
// on when the sharedProcessor's `shouldResync` function is
// invoked and when the sharedIndexInformer processes `Sync` type
// Delta objects.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
@@ -651,7 +674,7 @@ func (p *processorListener) pop() {
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// we will catch it, **the offending item will be skipped!**, and after a short delay (one minute)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})