NodeController waits for informer sync before doing anything

This commit is contained in:
gmarek 2016-10-14 12:38:39 +02:00
parent 9e3636ae86
commit e2b78ddadc

View File

@ -131,9 +131,13 @@ type NodeController struct {
// The maximum duration before a pod evicted from a node can be forcefully terminated. // The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration maximumGracePeriod time.Duration
recorder record.EventRecorder recorder record.EventRecorder
podStore cache.StoreToPodLister podInformer informers.PodInformer
nodeStore cache.StoreToNodeLister nodeInformer informers.NodeInformer
daemonSetStore cache.StoreToDaemonSetLister daemonSetInformer informers.DaemonSetInformer
podStore cache.StoreToPodLister
nodeStore cache.StoreToNodeLister
daemonSetStore cache.StoreToDaemonSetLister
// allocate/recycle CIDRs for node if allocateNodeCIDRs == true // allocate/recycle CIDRs for node if allocateNodeCIDRs == true
cidrAllocator CIDRAllocator cidrAllocator CIDRAllocator
@ -228,6 +232,9 @@ func NewNodeController(
largeClusterThreshold: largeClusterThreshold, largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold, unhealthyZoneThreshold: unhealthyZoneThreshold,
zoneStates: make(map[string]zoneState), zoneStates: make(map[string]zoneState),
podInformer: podInformer,
nodeInformer: nodeInformer,
daemonSetInformer: daemonSetInformer,
} }
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
@ -351,6 +358,10 @@ func NewNodeController(
func (nc *NodeController) Run() { func (nc *NodeController) Run() {
// Incorporate the results of node status pushed from kubelet to master. // Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() { go wait.Until(func() {
if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformer.Informer().HasSynced, nc.podInformer.Informer().HasSynced, nc.daemonSetInformer.Informer().HasSynced) {
glog.Errorf("NodeController timed out while waiting for informers to sync...")
return
}
if err := nc.monitorNodeStatus(); err != nil { if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err) glog.Errorf("Error monitoring node status: %v", err)
} }
@ -369,6 +380,10 @@ func (nc *NodeController) Run() {
// c. If there are pods still terminating, wait for their estimated completion // c. If there are pods still terminating, wait for their estimated completion
// before retrying // before retrying
go wait.Until(func() { go wait.Until(func() {
if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformer.Informer().HasSynced, nc.podInformer.Informer().HasSynced, nc.daemonSetInformer.Informer().HasSynced) {
glog.Errorf("NodeController timed out while waiting for informers to sync...")
return
}
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor { for k := range nc.zonePodEvictor {
@ -402,6 +417,10 @@ func (nc *NodeController) Run() {
// TODO: replace with a controller that ensures pods that are terminating complete // TODO: replace with a controller that ensures pods that are terminating complete
// in a particular time period // in a particular time period
go wait.Until(func() { go wait.Until(func() {
if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformer.Informer().HasSynced, nc.podInformer.Informer().HasSynced, nc.daemonSetInformer.Informer().HasSynced) {
glog.Errorf("NodeController timed out while waiting for informers to sync...")
return
}
nc.evictorLock.Lock() nc.evictorLock.Lock()
defer nc.evictorLock.Unlock() defer nc.evictorLock.Unlock()
for k := range nc.zoneTerminationEvictor { for k := range nc.zoneTerminationEvictor {