Merge pull request #24841 from sjenning/shared-informer

Automatic merge from submit-queue

update node controller to use shared pod informer

continuing work from #24470 and #23575
This commit is contained in:
k8s-merge-robot
2016-08-02 03:45:01 -07:00
committed by GitHub
5 changed files with 58 additions and 42 deletions

View File

@@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
@@ -124,7 +125,7 @@ type NodeController struct {
maximumGracePeriod time.Duration
recorder record.EventRecorder
// Pod framework and store
podController *framework.Controller
podController framework.ControllerInterface
podStore cache.StoreToPodLister
// Node framework and store
nodeController *framework.Controller
@@ -140,6 +141,13 @@ type NodeController struct {
computeZoneStateFunc func(nodeConditions []*api.NodeCondition) zoneState
zoneStates map[string]zoneState
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewDaemonSetsController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedIndexInformer
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@@ -147,6 +155,7 @@ type NodeController struct {
// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
// currently, this should be handled as a fatal error.
func NewNodeController(
podInformer framework.SharedIndexInformer,
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
@@ -207,30 +216,12 @@ func NewNodeController(
zoneStates: make(map[string]zoneState),
}
nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return nc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod)
},
UpdateFunc: func(_, obj interface{}) {
nc.maybeDeleteTerminatingPod(obj, nc.nodeStore.Store, nc.forcefullyDeletePod)
},
},
// We don't need to build a index for podStore here actually, but build one for consistency.
// It will ensure that if people start making use of the podStore in more specific ways,
// they'll get the benefits they expect. It will also reserve the name for future refactorings.
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: nc.maybeDeleteTerminatingPod,
UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
})
nc.podStore.Indexer = podInformer.GetIndexer()
nc.podController = podInformer.GetController()
nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{}
if nc.allocateNodeCIDRs {
@@ -335,11 +326,36 @@ func NewNodeController(
return nc, nil
}
func NewNodeControllerFromClient(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool) (*NodeController, error) {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, controller.NoResyncPeriodFunc())
nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, nodeMonitorGracePeriod,
nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
if err != nil {
return nil, err
}
nc.internalPodInformer = podInformer
return nc, nil
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration) {
go nc.nodeController.Run(wait.NeverStop)
go nc.podController.Run(wait.NeverStop)
go nc.daemonSetController.Run(wait.NeverStop)
if nc.internalPodInformer != nil {
nc.internalPodInformer.Run(wait.NeverStop)
}
// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {