1
0
mirror of https://github.com/rancher/rke.git synced 2025-09-16 23:20:56 +00:00

K8s Vendor update to 1.10.5

This commit is contained in:
Alena Prokharchyk
2018-07-03 09:45:27 -07:00
parent f9ecba0ab2
commit 2e82c18d4b
1144 changed files with 48967 additions and 83615 deletions

View File

@@ -25,6 +25,8 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/buffer"
"k8s.io/client-go/util/retry"
"github.com/golang/glog"
)
@@ -92,8 +94,13 @@ func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEve
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type InformerSynced func() bool
// syncedPollPeriod controls how often you look at the status of your sync funcs
const syncedPollPeriod = 100 * time.Millisecond
const (
// syncedPollPeriod controls how often you look at the status of your sync funcs
syncedPollPeriod = 100 * time.Millisecond
// initialBufferSize is the initial number of event notifications that can be buffered.
initialBufferSize = 1024
)
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
@@ -182,7 +189,7 @@ type deleteNotification struct {
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
@@ -313,7 +320,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
}
}
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now())
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
if !s.started {
s.processor.addListener(listener)
@@ -328,7 +335,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addAndStartListener(listener)
s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
@@ -366,6 +373,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
}
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
@@ -373,20 +381,15 @@ type sharedProcessor struct {
wg wait.Group
}
func (p *sharedProcessor) addAndStartListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
@@ -417,6 +420,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
@@ -465,6 +469,13 @@ type processorListener struct {
handler ResourceEventHandler
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
@@ -477,11 +488,12 @@ type processorListener struct {
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
@@ -499,25 +511,16 @@ func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better
var pendingNotifications []interface{}
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
if len(pendingNotifications) == 0 { // Nothing to pop
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
notification = nil
} else {
notification = pendingNotifications[0]
pendingNotifications[0] = nil
pendingNotifications = pendingNotifications[1:]
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
@@ -528,27 +531,42 @@ func (p *processorListener) pop() {
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
pendingNotifications = append(pendingNotifications, notificationToAdd)
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
defer utilruntime.HandleCrash()
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}
}, 1*time.Minute, stopCh)
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,